Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.socket

'''Asynchronous application for serving requests
on sockets. This is the base class of :class:`.WSGIServer`.
All is needed by a :class:`.SocketServer` application is a callable
which build a :class:`.ProtocolConsumer` for each new client request
received.
This is an example of a script for an Echo server::

    import pulsar
    from pulsar.apps.socket import SocketServer

    class EchoServerProtocol(pulsar.ProtocolConsumer):
        ...

    if __name__ == '__main__':
        SocketServer(EchoServerProtocol).start()

Check the :ref:`echo server example <tutorials-writing-clients>` for detailed
implementation of the ``EchoServerProtocol`` class.

.. _socket-server-settings:

Socket Server Settings
==============================
All standard :ref:`application settings <settings>` can be applied to a
:class:`SocketServer`. In addition, the following are
specific to sockets and can be used to fine tune your application:

bind
------
To specify the address to bind the server to::

    python script.py --bind 127.0.0.1:8070

This will listen for both ipv4 and ipv6 sockets on all hosts on port 8080::

    python script.py --bind :8080

backlog
---------
To specify the maximum number of queued connections you can use the
:ref:`backlog <setting-backlog>` settings. For example::

    python script.py --backlog 1000

rarely used.

keep_alive
---------------
To control how long a server :class:`.Connection` is kept alive after the
last read from the remote client, one can use the
:ref:`keep-alive <setting-keep_alive>` setting::

    python script.py --keep-alive 10

will close client connections which have been idle for 10 seconds.

.. _socket-server-ssl:

TLS/SSL support
------------------------
Transport Layer Security (often known as Secure Sockets Layer) is handled by
the :ref:`cert-file <setting-cert_file>` and :ref:`key-file <setting-key_file>`
settings::

    python script.py --cert-file server.crt --key-file server.key


.. _socket-server-concurrency:

Concurrency
==================

When running a :class:`SocketServer` in multi-process mode (default),
the application, create a listening socket in the parent (Arbiter) process
and then spawn several process-based actors which listen on the
same shared socket.
This is how pre-forking servers operate.

When running a :class:`SocketServer` in threading mode::

    python script.py --concurrency thread

the number of :class:`.Actor` serving the application is set
to ``0`` so that the application is actually served by the
arbiter event-loop (we refer this to a single process server).
This configuration is used when debugging, testing, benchmarking or on small
load servers.

In addition, a :class:`SocketServer` in multi-process mode is only available
for:

* Posix systems.
* Windows running python 3.2 or above (python 2 on windows does not support
  the creation of sockets from file descriptors).

Check the :meth:`SocketServer.monitor_start` method for implementation details.
'''
import os
import socket
from math import log
from random import lognormvariate
from functools import partial

import pulsar
from pulsar import (asyncio, From, TcpServer, DatagramServer, Connection,
                    ImproperlyConfigured)
from pulsar.utils.internet import parse_address, SSLContext
from pulsar.utils.config import pass_through


class SocketSetting(pulsar.Setting):
    virtual = True
    app = 'socket'
    section = "Socket Servers"


class Bind(SocketSetting):
    name = "bind"
    flags = ["-b", "--bind"]
    meta = "ADDRESS"
    default = "127.0.0.1:{0}".format(pulsar.DEFAULT_PORT)
    desc = """\
        The socket to bind.

        A string of the form: ``HOST``, ``HOST:PORT``, ``unix:PATH``.
        An IP is a valid HOST.
        """


class KeepAlive(SocketSetting):
    name = "keep_alive"
    flags = ["--keep-alive"]
    validator = pulsar.validate_pos_int
    type = int
    default = 15
    desc = """\
        The number of seconds to keep an idle client connection
        open."""


class Backlog(SocketSetting):
    name = "backlog"
    flags = ["--backlog"]
    validator = pulsar.validate_pos_int
    type = int
    default = 2048
    desc = """\
        The maximum number of queued connections in a socket.

        This refers to the number of clients that can be waiting to be served.
        Exceeding this number results in the client getting an error when
        attempting to connect. It should only affect servers under significant
        load.
        Must be a positive integer. Generally set in the 64-2048 range.
        """


class KeyFile(SocketSetting):
    name = "key_file"
    flags = ["--key-file"]
    meta = "FILE"
    default = None
    desc = """\
    SSL key file
    """


class CertFile(SocketSetting):
    name = "cert_file"
    flags = ["--cert-file"]
    meta = "FILE"
    default = None
    desc = """\
    SSL certificate file
    """


class WrapTransport:

    def __init__(self, transport):
        self.extra = transport._extra
        self.sock = self.extra.pop('socket')
        self.transport = transport.__class__

    def __call__(self, loop, protocol):
        return self.transport(loop, self.sock, protocol, extra=self.extra)


[docs]class SocketServer(pulsar.Application): '''A :class:`.Application` which serve application on a socket. It bind a socket to a given address and listen for requests. The request handler is constructed from the callable passed during initialisation. .. attribute:: address The socket address, available once the application has started. ''' name = 'socket' cfg = pulsar.Config(apps=['socket'])
[docs] def protocol_factory(self): '''Factory of :class:`.ProtocolConsumer` used by the server. By default it returns the :meth:`.Application.callable`. ''' return partial(Connection, self.cfg.callable)
[docs] def monitor_start(self, monitor): '''Create the socket listening to the ``bind`` address. If the platform does not support multiprocessing sockets set the number of workers to 0. ''' cfg = self.cfg loop = monitor._loop if (not pulsar.platform.has_multiProcessSocket or cfg.concurrency == 'thread'): cfg.set('workers', 0) if not cfg.address: raise ImproperlyConfigured('Could not open a socket. ' 'No address to bind to') ssl = None if cfg.cert_file or cfg.key_file: if cfg.cert_file and not os.path.exists(cfg.cert_file): raise ImproperlyConfigured('cert_file "%s" does not exist' % cfg.cert_file) if cfg.key_file and not os.path.exists(cfg.key_file): raise ImproperlyConfigured('key_file "%s" does not exist' % cfg.key_file) ssl = SSLContext(keyfile=cfg.key_file, certfile=cfg.cert_file) address = parse_address(self.cfg.address) # First create the sockets try: server = yield From(loop.create_server(asyncio.Protocol, *address)) except socket.error as e: raise ImproperlyConfigured(e) else: addresses = [] sockets = [] for sock in server.sockets: addresses.append(sock.getsockname()) sockets.append(sock) loop.remove_reader(sock.fileno()) monitor.sockets = sockets monitor.ssl = ssl cfg.addresses = addresses
def actorparams(self, monitor, params): params.update({'sockets': monitor.sockets, 'ssl': monitor.ssl})
[docs] def worker_start(self, worker, exc=None): '''Start the worker by invoking the :meth:`create_server` method. ''' if not exc: server = self.create_server(worker) server.bind_event('stop', lambda _, **kw: worker.stop()) worker.servers[self.name] = server
def worker_stopping(self, worker, exc=None): server = worker.servers.get(self.name) if server: return server.close() def worker_info(self, worker, info): server = worker.servers.get(self.name) if server: info['%sserver' % self.name] = server.info() return info
[docs] def server_factory(self, *args, **kw): '''Create a :class:`.TcpServer`. ''' return TcpServer(*args, **kw)
# INTERNALS
[docs] def create_server(self, worker): '''Create the Server which will listen for requests. :return: a :class:`.TcpServer`. ''' sockets = worker.sockets cfg = self.cfg max_requests = cfg.max_requests if max_requests: max_requests = int(lognormvariate(log(max_requests), 0.2)) server = self.server_factory(self.protocol_factory(), worker._loop, sockets=sockets, max_requests=max_requests, keep_alive=cfg.keep_alive, name=self.name, logger=self.logger) for event in ('connection_made', 'pre_request', 'post_request', 'connection_lost'): callback = getattr(cfg, event) if callback != pass_through: server.bind_event(event, callback) server.start_serving(cfg.backlog, sslcontext=worker.ssl) return server
[docs]class UdpSocketServer(SocketServer): '''A :class:`.SocketServer` which serves application on a UDP sockets. It binds a socket to a given address and listen for requests. The request handler is constructed from the callable passed during initialisation. .. attribute:: address The socket address, available once the application has started. ''' name = 'udpsocket' cfg = pulsar.Config(apps=['socket'])
[docs] def protocol_factory(self): '''Return the :class:`.DatagramProtocol` factory. ''' return self.cfg.callable
[docs] def monitor_start(self, monitor): '''Create the socket listening to the ``bind`` address. If the platform does not support multiprocessing sockets set the number of workers to 0. ''' cfg = self.cfg loop = monitor._loop if (not pulsar.platform.has_multiProcessSocket or cfg.concurrency == 'thread'): cfg.set('workers', 0) if not cfg.address: raise pulsar.ImproperlyConfigured('Could not open a socket. ' 'No address to bind to') address = parse_address(self.cfg.address) # First create the sockets t, _ = yield From(loop.create_datagram_endpoint( asyncio.DatagramProtocol, address)) sock = t.get_extra_info('socket') assert loop.remove_reader(sock.fileno()) monitor.sockets = [WrapTransport(t)] cfg.addresses = [sock.getsockname()]
def actorparams(self, monitor, params): params.update({'sockets': monitor.sockets})
[docs] def server_factory(self, *args, **kw): '''By default returns a new :class:`.DatagramServer`. ''' return DatagramServer(*args, **kw)
# INTERNALS
[docs] def create_server(self, worker): '''Create the Server which will listen for requests. :return: the server obtained from :meth:`server_factory`. ''' cfg = self.cfg max_requests = cfg.max_requests if max_requests: max_requests = int(lognormvariate(log(max_requests), 0.2)) server = self.server_factory(self.protocol_factory(), worker._loop, sockets=worker.sockets, max_requests=max_requests, name=self.name, logger=self.logger) server.bind_event('stop', lambda _, **kw: worker.stop()) for event in ('pre_request', 'post_request'): callback = getattr(cfg, event) if callback != pass_through: server.bind_event(event, callback) server.create_endpoint() return server