Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.wsgi.server

'''
HTTP Protocol Consumer
==============================

.. autoclass:: HttpServerResponse
   :members:
   :member-order: bysource


Testing WSGI Environ
=========================

.. autofunction:: test_wsgi_environ
'''
import sys
import time
import os
import socket
from wsgiref.handlers import format_date_time

import pulsar
from pulsar import (reraise, HttpException, ProtocolError, Future, task,
                    From, isfuture, chain_future)
from pulsar.utils.pep import is_string, native_str
from pulsar.utils.httpurl import (Headers, unquote, has_empty_content,
                                  host_and_port_default, http_parser,
                                  urlparse, iri_to_uri, DEFAULT_CHARSET)

from pulsar.utils.internet import format_address, is_tls
from pulsar.async.protocols import ProtocolConsumer

from .utils import handle_wsgi_error, wsgi_request, HOP_HEADERS


__all__ = ['HttpServerResponse', 'MAX_CHUNK_SIZE', 'test_wsgi_environ']


MAX_CHUNK_SIZE = 65536
MAX_TIME_IN_LOOP = 0.5


class FakeConnection(object):

    def __init__(self, loop=None):
        self._loop = loop


[docs]def test_wsgi_environ(path=None, method=None, headers=None, extra=None, secure=False, loop=None): '''An function to create a WSGI environment dictionary for testing. :param url: the resource in the ``PATH_INFO``. :param method: the ``REQUEST_METHOD``. :param headers: optional request headers :params secure: a secure connection? :param extra: additional dictionary of parameters to add. :return: a valid WSGI environ dictionary. ''' parser = http_parser(kind=0) method = (method or 'GET').upper() path = iri_to_uri(path or '/') request_headers = Headers(headers, kind='client') # Add Host if not available parsed = urlparse(path) if 'host' not in request_headers: if not parsed.netloc: path = '%s%s' % ('https://:443' if secure else 'http://:80', path) else: request_headers['host'] = parsed.netloc # data = '%s %s HTTP/1.1\r\n\r\n' % (method, path) data = data.encode('latin1') parser.execute(data, len(data)) # headers = Headers() stream = StreamReader(request_headers, parser) extra = extra or {} extra['pulsar.connection'] = FakeConnection(loop=loop) return wsgi_environ(stream, ('127.0.0.1', 8060), '777.777.777.777:8080', headers, https=secure, extra=extra)
class StreamReader: _expect_sent = None _waiting = None def __init__(self, headers, parser, transport=None): self.headers = headers self.parser = parser self.transport = transport self.buffer = b'' self.on_message_complete = Future() def __repr__(self): return repr(self.transport) __str__ = __repr__ def done(self): '''``True`` when the full HTTP message has been read. ''' return self.on_message_complete.done() def protocol(self): version = self.parser.get_version() return "HTTP/%s" % ".".join(('%s' % v for v in version)) def waiting_expect(self): '''``True`` when the client is waiting for 100 Continue. ''' if self._expect_sent is None: if (not self.parser.is_message_complete() and self.headers.has('expect', '100-continue')): return True self._expect_sent = '' return False def recv(self): '''Read bytes in the buffer. ''' if self.waiting_expect(): if self.parser.get_version() < (1, 1): raise HttpException(status=417) else: msg = '%s 100 Continue\r\n\r\n' % self.protocol() self._expect_sent = msg self.transport.write(msg.encode(DEFAULT_CHARSET)) return self.parser.recv_body() def read(self, maxbuf=None): '''Return bytes in the buffer. If the stream is not yet ready, return a :class:`asyncio.Future` which results in the bytes read. ''' if not self._waiting: body = self.recv() if self.done(): return self._getvalue(body, maxbuf) else: self._waiting = chain_future( self.on_message_complete, lambda r: self._getvalue(body, maxbuf)) return self._waiting else: return self._waiting def fail(self): if self.waiting_expect(): raise HttpException(status=417) # INTERNALS def _getvalue(self, body, maxbuf): if self.buffer: body = self.buffer + body body = body + self.recv() if maxbuf and len(body) > maxbuf: body, self.buffer = body[:maxbuf], body[maxbuf:] return body def wsgi_environ(stream, address, client_address, headers, server_software=None, https=False, extra=None): protocol = stream.protocol() parser = stream.parser request_headers = stream.headers raw_uri = parser.get_url() request_uri = urlparse(raw_uri) # # http://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html#sec5.2 # If Request-URI is an absoluteURI, the host is part of the Request-URI. # Any Host header field value in the request MUST be ignored if request_uri.scheme: url_scheme = request_uri.scheme host = request_uri.netloc else: url_scheme = 'https' if https else 'http' host = None # environ = {"wsgi.input": stream, "wsgi.errors": sys.stderr, "wsgi.version": (1, 0), "wsgi.run_once": False, "wsgi.multithread": False, "wsgi.multiprocess": False, "SERVER_SOFTWARE": server_software or pulsar.SERVER_SOFTWARE, "REQUEST_METHOD": native_str(parser.get_method()), "QUERY_STRING": parser.get_query_string(), "RAW_URI": raw_uri, "SERVER_PROTOCOL": protocol, "CONTENT_TYPE": ''} forward = client_address script_name = os.environ.get("SCRIPT_NAME", "") for header, value in request_headers: header = header.lower() if header in HOP_HEADERS: headers[header] = value if header == 'x-forwarded-for': forward = value elif header == "x-forwarded-protocol" and value == "ssl": url_scheme = "https" elif header == "x-forwarded-ssl" and value == "on": url_scheme = "https" elif header == "host" and not host: host = value elif header == "script_name": script_name = value elif header == "content-type": environ['CONTENT_TYPE'] = value continue elif header == "content-length": environ['CONTENT_LENGTH'] = value continue key = 'HTTP_' + header.upper().replace('-', '_') environ[key] = value environ['wsgi.url_scheme'] = url_scheme if url_scheme == 'https': environ['HTTPS'] = 'on' if is_string(forward): # we only took the last one # http://en.wikipedia.org/wiki/X-Forwarded-For if forward.find(",") >= 0: forward = forward.rsplit(",", 1)[1].strip() remote = forward.split(":") if len(remote) < 2: remote.append('80') else: remote = forward environ['REMOTE_ADDR'] = remote[0] environ['REMOTE_PORT'] = str(remote[1]) if not host and protocol == 'HTTP/1.0': host = format_address(address) if host: host = host_and_port_default(url_scheme, host) environ['SERVER_NAME'] = socket.getfqdn(host[0]) environ['SERVER_PORT'] = host[1] path_info = request_uri.path if path_info is not None: if script_name: path_info = path_info.split(script_name, 1)[1] environ['PATH_INFO'] = unquote(path_info) environ['SCRIPT_NAME'] = script_name if extra: environ.update(extra) return environ def chunk_encoding(chunk): '''Write a chunk:: chunk-size(hex) CRLF chunk-data CRLF If the size is 0, this is the last chunk, and an extra CRLF is appended. ''' head = ("%X\r\n" % len(chunk)).encode('utf-8') return head + chunk + b'\r\n' def keep_alive(headers, version): """ return True if the connection should be kept alive""" conn = set((v.lower() for v in headers.get_all('connection', ()))) if "close" in conn: return False elif 'upgrade' in conn: headers['connection'] = 'upgrade' return True elif "keep-alive" in conn: return True elif version == (1, 1): headers['connection'] = 'keep-alive' return True else: return False def keep_alive_with_status(status, headers): code = int(status.split()[0]) if code >= 400: return False return True
[docs]class HttpServerResponse(ProtocolConsumer): '''Server side WSGI :class:`.ProtocolConsumer`. .. attribute:: wsgi_callable The wsgi callable handling requests. ''' _status = None _headers_sent = None _stream = None _buffer = None SERVER_SOFTWARE = pulsar.SERVER_SOFTWARE ONE_TIME_EVENTS = ProtocolConsumer.ONE_TIME_EVENTS + ('on_headers',) def __init__(self, wsgi_callable, cfg, server_software=None, loop=None): super(HttpServerResponse, self).__init__(loop=loop) self.wsgi_callable = wsgi_callable self.cfg = cfg self.parser = http_parser(kind=0) self.headers = Headers() self.keep_alive = False self.SERVER_SOFTWARE = server_software or self.SERVER_SOFTWARE @property def headers_sent(self): '''Available once the headers have been sent to the client. These are the bytes representing the first response line and the headers ''' return self._headers_sent
[docs] def data_received(self, data): '''Implements :meth:`~.ProtocolConsumer.data_received` method. Once we have a full HTTP message, build the wsgi ``environ`` and delegate the response to the :func:`wsgi_callable` function. ''' parser = self.parser processed = parser.execute(data, len(data)) if not self._stream and parser.is_headers_complete(): headers = Headers(parser.get_headers(), kind='client') self._stream = StreamReader(headers, parser, self.transport) self._response(self.wsgi_environ()) # if parser.is_message_complete(): # # Stream has the whole body if not self._stream.on_message_complete.done(): self._stream.on_message_complete.set_result(None) if processed < len(data): if not self._buffer: self._buffer = data[processed:] self.bind_event('post_request', self._new_request) else: self._buffer += data[processed:] # elif processed < len(data): # This is a parsing error, the client must have sent # bogus data raise ProtocolError
@property def status(self): return self._status @property def upgrade(self): return self.headers.get('upgrade') @property def chunked(self): return self.headers.get('Transfer-Encoding') == 'chunked' @property def content_length(self): c = self.headers.get('Content-Length') if c: return int(c) @property def version(self): return self.parser.get_version()
[docs] def start_response(self, status, response_headers, exc_info=None): '''WSGI compliant ``start_response`` callable, see pep3333_. The application may call start_response more than once, if and only if the ``exc_info`` argument is provided. More precisely, it is a fatal error to call ``start_response`` without the ``exc_info`` argument if start_response has already been called within the current invocation of the application. :parameter status: an HTTP ``status`` string like ``200 OK`` or ``404 Not Found``. :parameter response_headers: a list of ``(header_name, header_value)`` tuples. It must be a Python list. Each header_name must be a valid HTTP header field-name (as defined by RFC 2616_, Section 4.2), without a trailing colon or other punctuation. :parameter exc_info: optional python ``sys.exc_info()`` tuple. This argument should be supplied by the application only if ``start_response`` is being called by an error handler. :return: The :meth:`write` method. ``HOP_HEADERS`` are not considered but no error is raised. .. _pep3333: http://www.python.org/dev/peps/pep-3333/ .. _2616: http://www.faqs.org/rfcs/rfc2616.html ''' if exc_info: try: if self._headers_sent: # if exc_info is provided, and the HTTP headers have # already been sent, start_response must raise an error, # and should re-raise using the exc_info tuple reraise(*exc_info) finally: # Avoid circular reference exc_info = None elif self._status: # Headers already set. Raise error raise HttpException("Response headers already set!") self._status = status if type(response_headers) is not list: raise TypeError("Headers must be a list of name/value tuples") for header, value in response_headers: if header.lower() in HOP_HEADERS: # These features are the exclusive province of this class, # this should be considered a fatal error for an application # to attempt sending them, but we don't raise an error, # just log a warning self.logger.warning('Application passing hop header "%s"', header) continue self.headers.add_header(header, value) return self.write
[docs] def write(self, data, force=False): '''The write function returned by the :meth:`start_response` method. Required by the WSGI specification. :param data: bytes to write :param force: Optional flag used internally :return: a :class:`~asyncio.Future` or the number of bytes written ''' write = super(HttpServerResponse, self).write chunks = [] if not self._headers_sent: tosend = self.get_headers() self._headers_sent = tosend.flat(self.version, self.status) self.fire_event('on_headers') chunks.append(self._headers_sent) if data: if self.chunked: while len(data) >= MAX_CHUNK_SIZE: chunk, data = data[:MAX_CHUNK_SIZE], data[MAX_CHUNK_SIZE:] chunks.append(chunk_encoding(chunk)) if data: chunks.append(chunk_encoding(data)) else: chunks.append(data) elif force and self.chunked: chunks.append(chunk_encoding(data)) if chunks: return write(b''.join(chunks))
######################################################################## # INTERNALS @task def _response(self, environ): exc_info = None response = None done = False while not done: done = True try: if exc_info is None: if 'SERVER_NAME' not in environ: raise HttpException(status=400) response = self.wsgi_callable(environ, self.start_response) else: response = handle_wsgi_error(environ, exc_info) # if isfuture(response): response = yield From(response) # if exc_info: self.start_response(response.status, response.get_headers(), exc_info) # # Do the actual writing loop = self._loop start = loop.time() for chunk in response: if isfuture(chunk): chunk = yield From(chunk) start = loop.time() result = self.write(chunk) if isfuture(result): yield From(result) start = loop.time() else: time_in_loop = loop.time() - start if time_in_loop > MAX_TIME_IN_LOOP: self.logger.debug( 'Released the event loop after %.3f seconds', time_in_loop) yield From(None) start = loop.time() # # make sure we write headers and last chunk if needed self.write(b'', True) except IOError: # client disconnected, end this connection self.finished() except Exception: if wsgi_request(environ).cache.handle_wsgi_error: self.keep_alive = False self.connection.close() self.finished() else: done = False exc_info = sys.exc_info() else: if not self.keep_alive: self.connection.close() self.finished() finally: if hasattr(response, 'close'): try: response.close() except Exception: self.logger.exception( 'Error while closing wsgi iterator')
[docs] def is_chunked(self): '''Check if the response uses chunked transfer encoding. Only use chunked responses when the client is speaking HTTP/1.1 or newer and there was no Content-Length header set. ''' if (self.version <= (1, 0) or self._status == '200 Connection established' or has_empty_content(int(self.status[:3]))): return False elif self.headers.get('Transfer-Encoding') == 'chunked': return True else: return self.content_length is None
[docs] def get_headers(self): '''Get the headers to send to the client. ''' if not self._status: # we are sending headers but the start_response was not called raise HttpException('Headers not set.') headers = self.headers # Set chunked header if needed if self.is_chunked(): headers['Transfer-Encoding'] = 'chunked' headers.pop('content-length', None) else: headers.pop('Transfer-Encoding', None) if self.keep_alive: self.keep_alive = keep_alive_with_status(self._status, headers) if not self.keep_alive: headers['connection'] = 'close' return headers
def wsgi_environ(self): # return a the WSGI environ dictionary transport = self.transport https = True if is_tls(transport.get_extra_info('socket')) else False multiprocess = (self.cfg.concurrency == 'process') environ = wsgi_environ(self._stream, transport.get_extra_info('sockname'), self.address, self.headers, self.SERVER_SOFTWARE, https=https, extra={'pulsar.connection': self.connection, 'pulsar.cfg': self.cfg, 'wsgi.multiprocess': multiprocess}) self.keep_alive = keep_alive(self.headers, self.parser.get_version()) self.headers.update([('Server', self.SERVER_SOFTWARE), ('Date', format_date_time(time.time()))]) return environ def _new_request(self, _, exc=None): connection = self._connection connection.data_received(self._buffer)