Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.wsgi.wrappers

'''This section introduces two classes used by pulsar
:ref:`wsgi application <apps-wsgi>` to pass a request/response state
during an HTTP request.
The :class:`WsgiRequest` is a thin wrapper around a WSGI ``environ``
dictionary.
It contains only the ``environ`` as its private data.
The :class:`WsgiResponse`, which is available in the
:class:`WsgiRequest.response` attribute, is an iterable over bytestring with
several utility methods for manipulating headers and asynchronous content.


Environ Mixin
=====================

.. autoclass:: EnvironMixin
   :members:
   :member-order: bysource


.. _app-wsgi-request:

Wsgi Request
=====================

.. autoclass:: WsgiRequest
   :members:
   :member-order: bysource


.. _wsgi-response:

Wsgi Response
=====================

.. autoclass:: WsgiResponse
   :members:
   :member-order: bysource

.. _WSGI: http://www.wsgi.org
.. _AJAX: http://en.wikipedia.org/wiki/Ajax_(programming)
.. _TLS: http://en.wikipedia.org/wiki/Transport_Layer_Security
'''
import re
from functools import reduce, partial
from io import BytesIO

from pulsar import Future, chain_future
from pulsar.utils.system import json
from pulsar.utils.multipart import parse_form_data, parse_options_header
from pulsar.utils.structures import AttributeDictionary
from pulsar.utils.httpurl import (Headers, SimpleCookie, responses,
                                  has_empty_content, ispy3k, REDIRECT_CODES,
                                  ENCODE_URL_METHODS, JSON_CONTENT_TYPES,
                                  remove_double_slash, iri_to_uri)

from .content import HtmlDocument
from .utils import (set_wsgi_request_class, set_cookie, query_dict,
                    parse_accept_header)
from .structures import ContentAccept, CharsetAccept, LanguageAccept


__all__ = ['EnvironMixin', 'WsgiResponse',
           'WsgiRequest', 'cached_property']

MAX_BUFFER_SIZE = 2**16
absolute_http_url_re = re.compile(r"^https?://", re.I)


def redirect(path, code=None, permanent=False):
    if code is None:
        code = 301 if permanent else 302

    assert code in REDIRECT_CODES, 'Invalid redirect status code.'
    return WsgiResponse(code, response_headers=[('location', path)])


def cached_property(method):
    name = method.__name__

    def _(self):
        if name not in self.cache:
            self.cache[name] = method(self)
        return self.cache[name]

    return property(_, doc=method.__doc__)


def wsgi_encoder(gen, encoding):
    for data in gen:
        if not isinstance(data, bytes):
            yield data.encode(encoding)
        else:
            yield data


[docs]class WsgiResponse(object): '''A WSGI response. Instances are callable using the standard WSGI call and, importantly, iterable:: response = WsgiResponse(200) A :class:`WsgiResponse` is an iterable over bytes to send back to the requesting client. .. attribute:: status_code Integer indicating the HTTP status, (i.e. 200) .. attribute:: response String indicating the HTTP status (i.e. 'OK') .. attribute:: status String indicating the HTTP status code and response (i.e. '200 OK') .. attribute:: content_type The content type of this response. Can be ``None``. .. attribute:: headers The :class:`.Headers` container for this response. .. attribute:: environ The dictionary of WSGI environment if passed to the constructor. .. attribute:: cookies A python :class:`SimpleCookie` container of cookies included in the request as well as cookies set during the response. ''' _started = False DEFAULT_STATUS_CODE = 200 def __init__(self, status=None, content=None, response_headers=None, content_type=None, encoding=None, environ=None, can_store_cookies=True): self.environ = environ self.status_code = status or self.DEFAULT_STATUS_CODE self.encoding = encoding self.cookies = SimpleCookie() self.headers = Headers(response_headers, kind='server') self.content = content self._can_store_cookies = can_store_cookies if content_type is not None: self.content_type = content_type @property def started(self): return self._started @property def path(self): if self.environ: return self.environ.get('PATH_INFO', '') @property def method(self): if self.environ: return self.environ.get('REQUEST_METHOD') @property def connection(self): if self.environ: return self.environ.get('pulsar.connection') @property def environ_cache(self): if self.environ: return self.environ.get('pulsar.cache') @property def content(self): return self._content @content.setter def content(self, content): if not self._started: if content is None: content = () elif ispy3k: if isinstance(content, str): if not self.encoding: # use utf-8 if not set self.encoding = 'utf-8' content = content.encode(self.encoding) else: # pragma nocover if isinstance(content, unicode): if not self.encoding: # use utf-8 if not set self.encoding = 'utf-8' content = content.encode(self.encoding) if isinstance(content, bytes): content = (content,) self._content = content else: raise RuntimeError('Cannot set content. Already iterated') def _get_content_type(self): return self.headers.get('content-type') def _set_content_type(self, typ): if typ: self.headers['content-type'] = typ else: self.headers.pop('content-type', None) content_type = property(_get_content_type, _set_content_type) @property def response(self): return responses.get(self.status_code) @property def status(self): return '%s %s' % (self.status_code, self.response) def __str__(self): return self.status def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self) @property def is_streamed(self): '''Check if the response is streamed. A streamed response is an iterable with no length information. In this case streamed means that there is no information about the number of iterations. This is usually `True` if a generator is passed to the response object. ''' try: len(self.content) except TypeError: return True return False def can_set_cookies(self): if self.status_code < 400: return self._can_store_cookies def length(self): if not self.is_streamed: return reduce(lambda x, y: x+len(y), self.content, 0) def __iter__(self): if self._started: raise RuntimeError('WsgiResponse can be iterated once only') self._started = True if self.is_streamed: return wsgi_encoder(self.content, self.encoding or 'utf-8') else: return iter(self.content)
[docs] def close(self): '''Close this response, required by WSGI ''' if self.is_streamed: if hasattr(self.content, 'close'): self.content.close()
def delete_cookie(self, key, path='/', domain=None): set_cookie(self.cookies, key, max_age=0, path=path, domain=domain, expires='Thu, 01-Jan-1970 00:00:00 GMT')
[docs] def get_headers(self): '''The list of headers for this response ''' headers = self.headers if has_empty_content(self.status_code, self.method): headers.pop('content-type', None) headers.pop('content-length', None) self._content = () else: if not self.is_streamed: cl = 0 for c in self.content: cl += len(c) if cl == 0 and self.content_type in JSON_CONTENT_TYPES: self._content = (b'{}',) cl = len(self._content[0]) headers['Content-Length'] = str(cl) ct = self.content_type # content type encoding available if self.encoding: ct = ct or 'text/plain' if 'charset=' not in ct: ct = '%s; charset=%s' % (ct, self.encoding) if ct: headers['Content-Type'] = ct if self.can_set_cookies(): for c in self.cookies.values(): headers.add_header('Set-Cookie', c.OutputString()) return list(headers)
def has_header(self, header): return header in self.headers __contains__ = has_header def __setitem__(self, header, value): self.headers[header] = value def __getitem__(self, header): return self.headers[header]
[docs]class EnvironMixin(object): '''A wrapper around a WSGI_ environ. Instances of this class have the :attr:`environ` attribute as their only private data. Every other attribute is stored in the :attr:`environ` itself at the ``pulsar.cache`` wsgi-extension key. .. attribute:: environ WSGI_ environ dictionary ''' __slots__ = ('environ',) def __init__(self, environ, name=None): self.environ = environ if 'pulsar.cache' not in environ: environ['pulsar.cache'] = AttributeDictionary() self.cache.mixins = {} if name: self.cache.mixins[name] = self @property def cache(self): '''An :ref:`attribute dictionary <attribute-dictionary>` of pulsar-specific data stored in the :attr:`environ` at the wsgi-extension key ``pulsar.cache`` ''' return self.environ['pulsar.cache'] @property def connection(self): '''The :class:`.Connection` handling the request ''' return self.environ.get('pulsar.connection') @property def _loop(self): '''Event loop if :attr:`connection` is available. ''' c = self.connection if c: return c._loop def __getattr__(self, name): mixin = self.cache.mixins.get(name) if mixin is None: raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, name)) return mixin
[docs] def get(self, key, default=None): '''Shortcut to the :attr:`environ` get method.''' return self.environ.get(key, default)
[docs]class WsgiRequest(EnvironMixin): '''An :class:`EnvironMixin` for wsgi requests.''' def __init__(self, environ, app_handler=None, urlargs=None): super(WsgiRequest, self).__init__(environ) self.cache.cfg = environ.get('pulsar.cfg', {}) if app_handler: self.cache.app_handler = app_handler self.cache.urlargs = urlargs def __repr__(self): return self.path def __str__(self): return self.__repr__() @cached_property def content_types(self): '''List of content types this client supports as a :class:`.ContentAccept` object. Obtained form the ``Accept`` request header. ''' return parse_accept_header(self.environ.get('HTTP_ACCEPT'), ContentAccept) @cached_property def charsets(self): '''List of charsets this client supports as a :class:`.CharsetAccept` object. Obtained form the ``Accept-Charset`` request header. ''' return parse_accept_header(self.environ.get('HTTP_ACCEPT_CHARSET'), CharsetAccept) @cached_property def encodings(self): """List of encodings this client supports as :class:`.Accept` object. Obtained form the ``Accept-Charset`` request header. Encodings in a HTTP term are compression encodings such as gzip. For charsets have a look at :attr:`charsets` attribute. """ return parse_accept_header(self.environ.get('HTTP_ACCEPT_ENCODING')) @cached_property def languages(self): """List of languages this client accepts as :class:`.LanguageAccept` object. Obtained form the ``Accept-Language`` request header. """ return parse_accept_header(self.environ.get('HTTP_ACCEPT_LANGUAGE'), LanguageAccept) @cached_property def cookies(self): '''Container of request cookies ''' cookies = SimpleCookie() cookie = self.environ.get('HTTP_COOKIE') if cookie: cookies.load(cookie) return cookies @property def app_handler(self): '''The WSGI application handling this request. The WSGI handler is responsible for setting this value in the same way as the :class:`.Router` does. ''' return self.cache.app_handler @property def urlargs(self): '''Dictionary of url parameters obtained when matching a :ref:`router <wsgi-router>` with this request :attr:`path`.''' return self.cache.urlargs @property def cfg(self): '''The :ref:`config container <settings>` of the server ''' return self.cache.cfg @property def ipaddress(self): '''internet protocol address of the client ''' return self.environ.get('REMOTE_ADDR') @cached_property def response(self): '''The :class:`WsgiResponse` for this client request. ''' return WsgiResponse(environ=self.environ) ####################################################################### # environ shortcuts @property def is_xhr(self): '''``True`` if this is an AJAX_ request ''' return self.environ.get('HTTP_X_REQUESTED_WITH') == 'XMLHttpRequest' @property def is_secure(self): '''``True`` if this request is via a TLS_ connection ''' return self.environ.get('HTTPS') == 'on' @property def path(self): '''Shortcut to the :attr:`~EnvironMixin.environ` ``PATH_INFO`` value. ''' return self.environ.get('PATH_INFO', '/') @property def uri(self): return self.absolute_uri() @property def method(self): '''The request method (uppercase).''' return self.environ['REQUEST_METHOD'] @cached_property def encoding(self): return self.content_type_options[1].get('charset', 'utf-8') @cached_property def content_type_options(self): content_type = self.environ.get('CONTENT_TYPE') if content_type: return parse_options_header(content_type) else: return None, {}
[docs] def data_and_files(self, data=True, files=True): '''Retrieve body data. Returns a two-elements tuple of a :class:`~.MultiValueDict` containing data from the request body, and data from uploaded files. If the body data is not ready, return a :class:`~asyncio.Future` which results in the tuple. The result is cached. ''' value = self.cache.data_and_files if not value: return self._data_and_files(data, files) elif data and files: return value elif data: return value[0] elif files: return value[1] else: return None
[docs] def body_data(self): '''A :class:`~.MultiValueDict` containing data from the request body. ''' return self.data_and_files(files=False)
def _data_and_files(self, data=True, files=True, future=None): result = {}, None chunk = None if future is None: stream = self.environ.get('wsgi.input') if self.method not in ENCODE_URL_METHODS and stream: chunk = stream.read() if isinstance(chunk, Future): return chain_future( chunk, partial(self._data_and_files, data, files)) else: chunk = future if chunk is not None: content_type, options = self.content_type_options charset = options.get('charset', 'utf-8') if content_type in JSON_CONTENT_TYPES: result = json.loads(chunk.decode(charset)), None else: self.environ['wsgi.input'] = BytesIO(chunk) result = parse_form_data(self.environ, charset) self.environ['wsgi.input'] = BytesIO(chunk) self.cache.data_and_files = result return self.data_and_files(data, files) @cached_property def url_data(self): '''A (cached) dictionary containing data from the ``QUERY_STRING`` in :attr:`~.EnvironMixin.environ`. ''' return query_dict(self.environ.get('QUERY_STRING', ''), encoding=self.encoding) @cached_property def html_document(self): '''Return a cached instance of :class:`.HtmlDocument`.''' return HtmlDocument()
[docs] def get_host(self, use_x_forwarded=True): """Returns the HTTP host using the environment or request headers.""" # We try three options, in order of decreasing preference. if use_x_forwarded and ('HTTP_X_FORWARDED_HOST' in self.environ): host = self.environ['HTTP_X_FORWARDED_HOST'] elif 'HTTP_HOST' in self.environ: host = self.environ['HTTP_HOST'] else: # Reconstruct the host using the algorithm from PEP 333. host = self.environ['SERVER_NAME'] server_port = str(self.environ['SERVER_PORT']) if server_port != ('443' if self.is_secure else '80'): host = '%s:%s' % (host, server_port) return host
[docs] def get_client_address(self, use_x_forwarded=True): '''Obtain the client IP address ''' xfor = self.environ.get('HTTP_X_FORWARDED_FOR') if use_x_forwarded and xfor: return xfor.split(',')[-1].strip() else: return self.environ['REMOTE_ADDR']
[docs] def full_path(self, *args, **query): '''Return a full path''' path = None if args: if len(args) > 1: raise TypeError("full_url() takes exactly 1 argument " "(%s given)" % len(args)) path = args[0] if not path: path = self.path elif not path.startswith('/'): path = remove_double_slash('%s/%s' % (self.path, path)) return iri_to_uri(path, query)
[docs] def absolute_uri(self, location=None, scheme=None): '''Builds an absolute URI from ``location`` and variables available in this request. If no ``location`` is specified, the relative URI is built from :meth:`full_path`. ''' if not location or not absolute_http_url_re.match(location): location = self.full_path(location) if not scheme: scheme = self.is_secure and 'https' or 'http' base = '%s://%s' % (scheme, self.get_host()) return '%s%s' % (base, location) elif not scheme: return iri_to_uri(location) else: raise ValueError('Absolute location with scheme not valid')
[docs] def redirect(self, path, **kw): '''Redirect to a different ``path`` ''' return redirect(path, **kw)
set_wsgi_request_class(WsgiRequest)