Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.async.protocols

import sys

import pulsar
from pulsar.utils.internet import nice_address, format_address

from .futures import multi_async, task, Future
from .events import EventHandler
from .mixins import FlowControl, Timeout
from .access import asyncio, get_io_loop, From, ConnectionResetError


__all__ = ['ProtocolConsumer',
           'Protocol',
           'DatagramProtocol',
           'Connection',
           'Producer',
           'TcpServer',
           'DatagramServer']


[docs]class ProtocolConsumer(EventHandler): '''The consumer of data for a server or client :class:`.Connection`. It is responsible for receiving incoming data from an end point via the :meth:`Connection.data_received` method, decoding (parsing) and, possibly, writing back to the client or server via the :attr:`transport` attribute. .. note:: For server consumers, :meth:`data_received` is the only method to implement. For client consumers, :meth:`start_request` should also be implemented. A :class:`ProtocolConsumer` is a subclass of :class:`.EventHandler` and it has two default :ref:`one time events <one-time-event>`: * ``pre_request`` fired when the request is received (for servers) or just before is sent (for clients). This occurs just before the :meth:`start_request` method. * ``post_request`` fired when the request is done. The :attr:`on_finished` attribute is a shortcut for the ``post_request`` :class:`.OneTime` event and therefore can be used to wait for the request to have received a full response (clients). In addition, it has two :ref:`many times events <many-times-event>`: * ``data_received`` fired when new data is received from the transport but not yet processed (before the :meth:`data_received` method is invoked) * ``data_processed`` fired just after data has been consumed (after the :meth:`data_received` method) .. note:: A useful example on how to use the ``data_received`` event is the :ref:`wsgi proxy server <tutorials-proxy-server>`. ''' _connection = None _data_received_count = 0 ONE_TIME_EVENTS = ('pre_request', 'post_request') MANY_TIMES_EVENTS = ('data_received', 'data_processed') @property def connection(self): '''The :class:`Connection` of this consumer.''' return self._connection @property def request(self): '''The request. Used for clients only and available only after the :meth:`start` method is invoked. ''' return getattr(self, '_request', None) @property def transport(self): '''The :class:`Transport` of this consumer''' if self._connection: return self._connection.transport @property def address(self): if self._connection: return self._connection.address @property def producer(self): '''The :class:`Producer` of this consumer.''' if self._connection: return self._connection.producer @property def on_finished(self): '''The ``post_request`` one time event. ''' return self.event('post_request')
[docs] def connection_made(self, connection): '''Called by a :class:`Connection` when it starts using this consumer. By default it does nothing. '''
[docs] def data_received(self, data): '''Called when some data is received. **This method must be implemented by subclasses** for both server and client consumers. The argument is a bytes object. '''
[docs] def start_request(self): '''Starts a new request. Invoked by the :meth:`start` method to kick start the request with remote server. For server :class:`ProtocolConsumer` this method is not invoked at all. **For clients this method should be implemented** and it is critical method where errors caused by stale socket connections can arise. **This method should not be called directly.** Use :meth:`start` instead. Typically one writes some data from the :attr:`request` into the transport. Something like this:: self.transport.write(self.request.encode()) ''' raise NotImplementedError
[docs] def start(self, request=None): '''Starts processing the request for this protocol consumer. There is no need to override this method, implement :meth:`start_request` instead. If either :attr:`connection` or :attr:`transport` are missing, a :class:`RuntimeError` occurs. For server side consumer, this method simply fires the ``pre_request`` event.''' if hasattr(self, '_request'): raise RuntimeError('Consumer already started') conn = self._connection if not conn: raise RuntimeError('Cannot start new request. No connection.') if not conn._transport: raise RuntimeError('%s has no transport.' % conn) conn._processed += 1 if conn._producer: p = getattr(conn._producer, '_requests_processed', 0) conn._producer._requests_processed = p + 1 self.bind_event('post_request', self._finished) self._request = request self.fire_event('pre_request') if self._request is not None: try: self.start_request() except Exception as exc: self.finished(exc=exc)
[docs] def connection_lost(self, exc): '''Called by the :attr:`connection` when the transport is closed. By default it calls the :meth:`finished` method. It can be overwritten to handle the potential exception ``exc``.''' # TODO: decide how to handle connection_lost when no exception occurs # Set the first positional parameter to None so that if the # connection was dropped without exception it returns None # rather than the protocol consumer return self.finished(None)
[docs] def finished(self, *arg, **kw): '''Fire the ``post_request`` event if it wasn't already fired. ''' if not self.event('post_request').fired(): return self.fire_event('post_request', *arg, **kw)
[docs] def write(self, data): '''Delegate writing to the underlying :class:`.Connection` Return an empty tuple or a :class:`~asyncio.Future` ''' c = self._connection if c: return c.write(data) else: raise RuntimeError('No connection')
def _data_received(self, data): # Called by Connection, it updates the counters and invoke # the high level data_received method which must be implemented # by subclasses if not hasattr(self, '_request'): self.start() self._data_received_count = self._data_received_count + 1 self.fire_event('data_received', data=data) result = self.data_received(data) self.fire_event('data_processed', data=data) return result def _finished(self, _, exc=None): c = self._connection if c and c._current_consumer is self: c._current_consumer = None
[docs]class PulsarProtocol(EventHandler, FlowControl): '''A mixin class for both :class:`.Protocol` and :class:`.DatagramProtocol`. A :class:`PulsarProtocol` is an :class:`.EventHandler` which has two :ref:`one time events <one-time-event>`: * ``connection_made`` * ``connection_lost`` ''' ONE_TIME_EVENTS = ('connection_made', 'connection_lost') MANY_TIMES_EVENTS = ('data_received', 'data_processed', 'before_write', 'after_write') _transport = None _address = None _type = 'server' def __init__(self, loop=None, session=1, producer=None, **kw): super(PulsarProtocol, self).__init__(loop) FlowControl.__init__(self, **kw) self._session = session self._producer = producer def __repr__(self): address = self._address if address: return '%s %s session %s' % (self._type, nice_address(address), self._session) else: return '<pending> session %s' % self._session __str__ = __repr__ @property def session(self): '''Connection session number. Passed during initialisation by the :attr:`producer`. Usually an integer representing the number of separate connections the producer has processed at the time it created this :class:`Protocol`. ''' return self._session @property def transport(self): '''The :ref:`transport <asyncio-transport>` for this protocol. Available once the :meth:`connection_made` is called.''' return self._transport @property def sock(self): '''The socket of :attr:`transport`. ''' if self._transport: return self._transport.get_extra_info('socket') @property def address(self): '''The address of the :attr:`transport`. ''' return self._address @property def producer(self): '''The producer of this :class:`Protocol`. ''' return self._producer @property def closed(self): '''``True`` if the :attr:`transport` is closed.''' return self._transport._closing if self._transport else True
[docs] def close(self): '''Close by closing the :attr:`transport`.''' if self._transport: self._transport.close()
[docs] def abort(self): '''Abort by aborting the :attr:`transport`.''' if self._transport: self._transport.abort()
[docs] def connection_made(self, transport): '''Sets the :attr:`transport`, fire the ``connection_made`` event and adds a :attr:`timeout` for idle connections. ''' self._transport = transport addr = self._transport.get_extra_info('peername') if not addr: self._type = 'client' addr = self._transport.get_extra_info('sockname') self._address = addr # let everyone know we have a connection with endpoint self.fire_event('connection_made')
[docs] def connection_lost(self, exc=None): '''Fires the ``connection_lost`` event. ''' self.fire_event('connection_lost')
[docs] def eof_received(self): '''The socket was closed from the remote end'''
def info(self): info = {'connection': {'session': self._session}} if self._producer: info.update(self._producer.info()) return info
[docs]class Protocol(PulsarProtocol, asyncio.Protocol): '''An :class:`asyncio.Protocol` with :ref:`events <event-handling>` ''' _data_received_count = 0
[docs] def write(self, data): '''Write ``data`` into the wire. Returns an empty tuple or a :class:`~asyncio.Future` if this protocol has paused writing. ''' t = self._transport if t: if t._closing: # Uses private variable. raise ConnectionResetError('Connection lost') if self._paused: # # Uses private variable once again! # This occurs when the protocol is paused from writing # but another data ready callback is fired in the same # event-loop frame self.logger.debug('protocol cannot write, add data to the ' 'transport buffer') t._buffer.extend(data) else: self.fire_event('before_write') t.write(data) self.fire_event('after_write') return self._write_waiter or () else: raise ConnectionResetError('No Transport')
[docs]class DatagramProtocol(PulsarProtocol, asyncio.DatagramProtocol): '''An ``asyncio.DatagramProtocol`` with events`'''
[docs]class Connection(Protocol, Timeout): '''A :class:`.FlowControl` to handle multiple TCP requests/responses. It is a class which acts as bridge between a :ref:`transport <asyncio-transport>` and a :class:`.ProtocolConsumer`. It routes data arriving from the transport to the :meth:`current_consumer`. .. attribute:: _consumer_factory A factory of :class:`.ProtocolConsumer`. .. attribute:: _processed number of separate requests processed. ''' def __init__(self, consumer_factory=None, timeout=None, low_limit=None, high_limit=None, **kw): super(Connection, self).__init__(**kw) self.bind_event('connection_lost', self._connection_lost) self._processed = 0 self._current_consumer = None self._consumer_factory = consumer_factory self.timeout = timeout
[docs] def current_consumer(self): '''The :class:`ProtocolConsumer` currently handling incoming data. This instance will receive data when this connection get data from the :attr:`~Protocol.transport` via the :meth:`data_received` method. ''' if self._current_consumer is None: self._build_consumer(None) return self._current_consumer
def set_consumer(self, consumer): assert self._current_consumer is None, 'Consumer is not None' self._current_consumer = consumer consumer._connection = self consumer.connection_made(self)
[docs] def data_received(self, data): '''Delegates handling of data to the :meth:`current_consumer`. Once done set a timeout for idle connections when a :attr:`~Protocol.timeout` is a positive number (of seconds). ''' self._data_received_count = self._data_received_count + 1 self.fire_event('data_received', data=data) while data: consumer = self.current_consumer() data = consumer._data_received(data) if isinstance(data, Future): break self.fire_event('data_received', data=data)
[docs] def upgrade(self, consumer_factory): '''Upgrade the :func:`_consumer_factory` callable. This method can be used when the protocol specification changes during a response (an example is a WebSocket request/response, or HTTP tunneling). This method adds a ``post_request`` callback to the :meth:`current_consumer` to build a new consumer with the new :func:`_consumer_factory`. :param consumer_factory: the new consumer factory (a callable accepting no parameters) :return: ``None``. ''' self._consumer_factory = consumer_factory consumer = self._current_consumer if consumer: consumer.bind_event('post_request', self._build_consumer) else: self._build_consumer(None)
def info(self): info = super(Connection, self).info() c = info['connection'] c['request_processed'] = self._processed c['data_processed_count'] = self._data_received_count c['timeout'] = self.timeout return info def _build_consumer(self, _, exc=None): if not exc: consumer = self._producer.build_consumer(self._consumer_factory) self.set_consumer(consumer) def _connection_lost(self, _, exc=None): '''It performs these actions in the following order: * Fires the ``connection_lost`` :ref:`one time event <one-time-event>` if not fired before, with ``exc`` as event data. * Cancel the idle timeout if set. * Invokes the :meth:`ProtocolConsumer.connection_lost` method in the :meth:`current_consumer`. ''' if self._current_consumer: self._current_consumer.connection_lost(exc)
[docs]class Producer(EventHandler): '''An Abstract :class:`.EventHandler` class for all producers of socket (client and servers) ''' protocol_factory = None '''A callable producing protocols. The signature of the protocol factory callable must be:: protocol_factory(session, producer, **params) ''' def __init__(self, loop, protocol_factory=None, name=None, max_requests=None, logger=None): super(Producer, self).__init__(get_io_loop(loop)) self.protocol_factory = protocol_factory or self.protocol_factory self._name = name or self.__class__.__name__ self._requests_processed = 0 self._sessions = 0 self._max_requests = max_requests self._logger = logger @property def sessions(self): '''Total number of protocols created by the :class:`Producer`. ''' return self._sessions @property def requests_processed(self): '''Total number of requests processed. ''' return self._requests_processed
[docs] def create_protocol(self, **kw): '''Create a new protocol via the :meth:`protocol_factory` This method increase the count of :attr:`sessions` and build the protocol passing ``self`` as the producer. ''' self._sessions = self._sessions + 1 kw['session'] = self.sessions kw['producer'] = self kw['loop'] = self._loop return self.protocol_factory(**kw)
[docs] def build_consumer(self, consumer_factory): '''Build a consumer for a protocol. This method can be used by protocols which handle several requests, for example the :class:`Connection` class. :param consumer_factory: consumer factory to use. ''' consumer = consumer_factory(loop=self._loop) consumer.copy_many_times_events(self) return consumer
[docs]class TcpServer(Producer): '''A :class:`.Producer` of server :class:`Connection` for TCP servers. .. attribute:: _server A :class:`.Server` managed by this Tcp wrapper. Available once the :meth:`start_serving` method has returned. ''' ONE_TIME_EVENTS = ('start', 'stop') MANY_TIMES_EVENTS = ('connection_made', 'pre_request', 'post_request', 'connection_lost') _server = None _started = None def __init__(self, protocol_factory, loop, address=None, name=None, sockets=None, max_requests=None, keep_alive=None, logger=None): super(TcpServer, self).__init__(loop, protocol_factory, name=name, max_requests=max_requests, logger=logger) self._params = {'address': address, 'sockets': sockets} self._keep_alive = max(keep_alive or 0, 0) self._concurrent_connections = set() def __repr__(self): address = self.address if address: return '%s %s' % (self.__class__.__name__, address) else: return self.__class__.__name__ __str_ = __repr__ @property def address(self): '''Socket address of this server. It is obtained from the first socket ``getsockname`` method. ''' if self._server is not None: return self._server.sockets[0].getsockname() @task
[docs] def start_serving(self, backlog=100, sslcontext=None): '''Start serving. :param backlog: Number of maximum connections :param sslcontext: optional SSLContext object. :return: a :class:`.Future` called back when the server is serving the socket.''' if hasattr(self, '_params'): address = self._params['address'] sockets = self._params['sockets'] del self._params create_server = self._loop.create_server try: if sockets: server = None for sock in sockets: srv = yield From(create_server(self.create_protocol, sock=sock, backlog=backlog, ssl=sslcontext)) if server: server.sockets.extend(srv.sockets) else: server = srv else: if isinstance(address, tuple): server = yield From(create_server(self.create_protocol, host=address[0], port=address[1], backlog=backlog, ssl=sslcontext)) else: raise NotImplementedError self._server = server self._started = self._loop.time() for sock in server.sockets: address = sock.getsockname() self.logger.info('%s serving on %s', self._name, format_address(address)) self.fire_event('start') except Exception as exc: self.fire_event('start', exc=exc)
[docs] def stop_serving(self): '''Stop serving the :attr:`.Server.sockets`. ''' if self._server: server, self._server = self._server, None server.close()
@task
[docs] def close(self): '''Stop serving the :attr:`.Server.sockets` and close all concurrent connections. ''' if not self.fired_event('stop'): if self._server: server, self._server = self._server, None server.close() yield From(None) yield From(self._close_connections()) self.fire_event('stop')
def info(self): sockets = [] up = int(self._loop.time() - self._started) if self._started else 0 server = {'pulsar_version': pulsar.__version__, 'python_version': sys.version, 'uptime_in_seconds': up, 'sockets': sockets, 'max_requests': self._max_requests, 'keep_alive': self._keep_alive} clients = {'processed_clients': self._sessions, 'connected_clients': len(self._concurrent_connections), 'requests_processed': self._requests_processed} if self._server: for sock in self._server.sockets: sockets.append({ 'address': format_address(sock.getsockname())}) return {'server': server, 'clients': clients}
[docs] def create_protocol(self): '''Override :meth:`Producer.create_protocol`. ''' protocol = super(TcpServer, self).create_protocol( timeout=self._keep_alive) protocol.bind_event('connection_made', self._connection_made) protocol.bind_event('connection_lost', self._connection_lost) if (self._server and self._max_requests and self._sessions >= self._max_requests): self.logger.info('Reached maximum number of connections %s. ' 'Stop serving.' % self._max_requests) self.close() return protocol
# INTERNALS def _connection_made(self, connection, exc=None): if not exc: self._concurrent_connections.add(connection) def _connection_lost(self, connection, exc=None): self._concurrent_connections.discard(connection) def _close_connections(self, connection=None): '''Close ``connection`` if specified, otherwise close all connections. Return a list of :class:`.Future` called back once the connection/s are closed. ''' all = [] if connection: all.append(connection.event('connection_lost')) connection.transport.close() else: connections = list(self._concurrent_connections) self._concurrent_connections = set() for connection in connections: all.append(connection.event('connection_lost')) connection.transport.close() if all: self.logger.info('%s closing %d connections', self, len(all)) return multi_async(all)
[docs]class DatagramServer(Producer): '''An :class:`.Producer` for serving UDP sockets. .. attribute:: _transports A list of :class:`.DatagramTransport`. Available once the :meth:`create_endpoint` method has returned. ''' _transports = None _started = None ONE_TIME_EVENTS = ('start', 'stop') MANY_TIMES_EVENTS = ('pre_request', 'post_request') def __init__(self, protocol_factory, loop=None, address=None, name=None, sockets=None, max_requests=None, logger=None): super(DatagramServer, self).__init__(loop, protocol_factory, name=name, max_requests=max_requests, logger=logger) self._params = {'address': address, 'sockets': sockets} @task
[docs] def create_endpoint(self, **kw): '''create the server endpoint. :return: a :class:`~asyncio.Future` called back when the server is serving the socket. ''' if hasattr(self, '_params'): address = self._params['address'] sockets = self._params['sockets'] del self._params try: transports = [] if sockets: for transport in sockets: proto = self.create_protocol() transports.append(transport(self._loop, proto)) else: transport, _ = yield From( self._loop.create_datagram_endpoint( self.protocol_factory, local_addr=address)) transports.append(transport) self._transports = transports self._started = self._loop.time() for transport in self._transports: address = transport.get_extra_info('sockname') self.logger.info('%s serving on %s', self._name, format_address(address)) self.fire_event('start') except Exception as exc: self.logger.exception('Error while starting UDP server') self.fire_event('start', exc=exc) self.fire_event('stop')
@task
[docs] def close(self): '''Stop serving the :attr:`.Server.sockets` and close all concurrent connections. ''' if not self.fired_event('stop'): transports, self._transports = self._transports, None if transports: for transport in transports: transport.close() self.fire_event('stop')
def info(self): sockets = [] up = int(self._loop.time() - self._started) if self._started else 0 server = {'pulsar_version': pulsar.__version__, 'python_version': sys.version, 'uptime_in_seconds': up, 'sockets': sockets, 'max_requests': self._max_requests} clients = {'requests_processed': self._requests_processed} if self._transports: for transport in self._transports: sockets.append({ 'address': format_address(transport._sock.getsockname())}) return {'server': server, 'clients': clients}