Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.http

'''Pulsar ships with a thread safe, fully featured, :class:`.HttpClient`
class for multiple asynchronous HTTP requests.

To get started, one builds a client::

    >>> from pulsar.apps import http
    >>> client = http.HttpClient()

and than makes requests, in a coroutine::

    response = yield http.get('http://www.bbc.co.uk')

or (only in python 3)::

    response = yield from http.get('http://www.bbc.co.uk')

Making requests
=================
Pulsar HTTP client has no dependencies and an API similar to requests_::

    from pulsar.apps import http
    client = http.HttpClient()
    response = yield client.get('https://github.com/timeline.json')

``response`` is a :class:`HttpResponse` object which contains all the
information about the request and the result:

    >>> request = response.request
    >>> print(request.headers)
    Connection: Keep-Alive
    User-Agent: pulsar/0.8.2-beta.1
    Accept-Encoding: deflate, gzip
    Accept: */*
    >>> response.status_code
    200
    >>> print(response.headers)
    ...

The :attr:`~.ProtocolConsumer.request` attribute of :class:`HttpResponse`
is an instance of :class:`.HttpRequest`.

.. _http-cookie:

Cookie support
================

Cookies are handled by the client by storing cookies received with responses.
To disable cookie one can pass ``store_cookies=False`` during
:class:`HttpClient` initialisation.

If a response contains some Cookies, you can get quick access to them::

    >>> response = yield client.get(...)
    >>> type(response.cookies)
    <type 'dict'>

To send your own cookies to the server, you can use the cookies parameter::

    response = client.get(..., cookies={'sessionid': 'test'})


.. _http-authentication:

Authentication
======================

Authentication, either ``basic`` or ``digest``, can be added to a
client by invoking

* :meth:`HttpClient.add_basic_authentication` or
* :meth:`HttpClient.add_digest_authentication`

In either case the authentication is handled by adding additional headers
to your requests.

TLS/SSL
=================
Supported out of the box::

    client = HttpClient()
    client.get('https://github.com/timeline.json')

you can include certificate file and key too, either
to a :class:`HttpClient` or to a specific request::

    client = HttpClient(certkey='public.key')
    res1 = client.get('https://github.com/timeline.json')
    res2 = client.get('https://github.com/timeline.json',
                      certkey='another.key')

.. _http-streaming:

Streaming
=========================

This is an event-driven client, therefore streaming support is native.

To stream data received from the client one uses the
:ref:`data_processed <http-many-time-events>` event handler.
For example::

    def new_data(response, **kw):
        if response.status_code == 200:
            data = response.recv_body()
            # do something with this data

    response = http.get(..., data_processed=new_data)

The response :meth:`~.HttpResponse.recv_body` method fetches the parsed body
of the response and at the same time it flushes it.
Check the :ref:`proxy server <tutorials-proxy-server>` example for an
application using the :class:`HttpClient` streaming capabilities.

.. _http-websocket:

WebSocket
==============

The http client support websocket upgrades. First you need to have a
websocket handler::

    from pulsar.apps import ws

    class Echo(ws.WS):

        def on_message(self, websocket, message):
            websocket.write(message)

The websocket response is obtained by::

    ws = yield http.get('ws://...', websocket_handler=Echo())

.. _http-redirects:

Redirects & Decompression
=============================

[TODO]

Synchronous Mode
=====================

Can be used in :ref:`synchronous mode <tutorials-synchronous>`::

    client = HttpClient(loop=new_event_loop())

Events
==============
:ref:`Events <event-handling>` control the behaviour of the
:class:`.HttpClient` when certain conditions occur. They are useful for
handling standard HTTP event such as :ref:`redirects <http-redirects>`,
:ref:`websocket upgrades <http-websocket>`,
:ref:`streaming <http-streaming>` or anything your application
requires.

.. _http-one-time-events:

One time events
~~~~~~~~~~~~~~~~~~~

There are three :ref:`one time events <one-time-event>` associated with an
:class:`HttpResponse` object:

* ``pre_request``, fired before the request is sent to the server. Callbacks
  receive the *response* argument.
* ``on_headers``, fired when response headers are available. Callbacks
  receive the *response* argument.
* ``post_request``, fired when the response is done. Callbacks
  receive the *response* argument.

Adding event handlers can be done at client level::

    def myheader_handler(response, exc=None):
        if not exc:
            print('got headers!')

    client.bind_event('on_headers', myheader_handler)

or at request level::

    response = client.get(..., on_headers=myheader_handler)

By default, the :class:`HttpClient` has one ``pre_request`` callback for
handling `HTTP tunneling`_, three ``on_headers`` callbacks for
handling *100 Continue*, *websocket upgrade* and :ref:`cookies <http-cookie>`,
and one ``post_request`` callback for handling redirects.

.. _http-many-time-events:

Many time events
~~~~~~~~~~~~~~~~~~~

In addition to the three :ref:`one time events <http-one-time-events>`,
the Http client supports two additional
events which can occur several times while processing a given response:

* ``data_received`` is fired when new data has been received but not yet
  parsed
* ``data_processed`` is fired just after the data has been parsed by the
  :class:`.HttpResponse`. This is the event one should bind to when performing
  :ref:`http streaming <http-streaming>`.


both events support handlers with a signature::

    def handler(response, data=None):
        ...

where ``response`` is the :class:`.HttpResponse` handling the request and
``data`` is the **raw** data received.

API
==========

The main class here is the :class:`HttpClient` which is a subclass of
:class:`.AbstractClient`.


HTTP Client
~~~~~~~~~~~~~~~~~~

.. autoclass:: HttpClient
   :members:
   :member-order: bysource


HTTP Request
~~~~~~~~~~~~~~~~~~

.. autoclass:: HttpRequest
   :members:
   :member-order: bysource

HTTP Response
~~~~~~~~~~~~~~~~~~

.. autoclass:: HttpResponse
   :members:
   :member-order: bysource


.. _module:: pulsar.apps.http.oauth

OAuth1
~~~~~~~~~~~~~~~~~~

.. autoclass:: OAuth1
   :members:
   :member-order: bysource

OAuth2
~~~~~~~~~~~~~~~~~~

.. autoclass:: OAuth2
   :members:
   :member-order: bysource


.. _requests: http://docs.python-requests.org/
.. _`uri scheme`: http://en.wikipedia.org/wiki/URI_scheme
.. _`HTTP tunneling`: http://en.wikipedia.org/wiki/HTTP_tunnel
'''
import os
import platform
from functools import partial
from collections import namedtuple
from base64 import b64encode
from io import StringIO, BytesIO

import pulsar
from pulsar import (AbstractClient, Pool, coroutine_return, task, Connection,
                    ProtocolConsumer, From)
from pulsar.utils.system import json
from pulsar.utils.pep import native_str, is_string, to_bytes, ispy3k
from pulsar.utils.structures import mapping_iterator
from pulsar.utils.websocket import SUPPORTED_VERSIONS
from pulsar.utils.internet import CERT_NONE, SSLContext
from pulsar.utils.multipart import parse_options_header
from pulsar.utils.httpurl import (urlparse, parse_qsl, responses,
                                  http_parser, ENCODE_URL_METHODS,
                                  encode_multipart_formdata, urlencode,
                                  Headers, get_environ_proxies,
                                  choose_boundary, urlunparse, request_host,
                                  is_succesful, HTTPError, URLError,
                                  get_hostport, cookiejar_from_dict,
                                  host_no_default_port, DEFAULT_CHARSET,
                                  JSON_CONTENT_TYPES)

try:
    from pulsar.apps import greenio
except ImportError:
    greenio = None

from .plugins import (handle_cookies, handle_100, handle_101, handle_redirect,
                      Tunneling, TooManyRedirects)

from .auth import Auth, HTTPBasicAuth, HTTPDigestAuth
from .oauth import OAuth1, OAuth2


scheme_host = namedtuple('scheme_host', 'scheme netloc')
tls_schemes = ('https', 'wss')


def guess_filename(obj):
    """Tries to guess the filename of the given object."""
    name = getattr(obj, 'name', None)
    if name and name[0] != '<' and name[-1] != '>':
        return os.path.basename(name)


class RequestBase(object):
    inp_params = None
    release_connection = True
    history = None
    full_url = None
    scheme = None

    @property
    def unverifiable(self):
        '''Unverifiable when a redirect.

        It is a redirect when :attr:`history` has past requests.
        '''
        return bool(self.history)

    @property
    def origin_req_host(self):
        if self.history:
            return self.history[0].request.origin_req_host
        else:
            return request_host(self)

    @property
    def type(self):
        return self.scheme

    def get_full_url(self):
        return self.full_url


if not ispy3k:  # pragma     nocover
    _RequestBase = RequestBase

    class RequestBase(_RequestBase):

        def is_unverifiable(self):
            return self.unverifiable

        def get_origin_req_host(self):
            return self.origin_req_host

        def get_type(self):
            return self.scheme


class HttpTunnel(RequestBase):
    first_line = None

    def __init__(self, request, scheme, host):
        self.request = request
        self.scheme = scheme
        self.host, self.port = get_hostport(scheme, host)
        self.full_url = '%s://%s:%s' % (scheme, self.host, self.port)
        self.parser = request.parser
        request.new_parser()
        self.headers = request.client.tunnel_headers.copy()

    def __repr__(self):
        return 'Tunnel %s' % self.full_url
    __str__ = __repr__

    @property
    def key(self):

        return self.request.key

    @property
    def address(self):
        return (self.host, self.port)

    @property
    def client(self):
        return self.request.client

    def encode(self):
        req = self.request
        self.headers['host'] = req.get_header('host')
        bits = req.target_address + (req.version,)
        self.first_line = 'CONNECT %s:%s %s\r\n' % bits
        return b''.join((self.first_line.encode('ascii'), bytes(self.headers)))

    def has_header(self, header_name):
        return header_name in self.headers

    def get_header(self, header_name, default=None):
        return self.headers.get(header_name, default)

    def remove_header(self, header_name):
        self.headers.pop(header_name, None)


[docs]class HttpRequest(RequestBase): '''An :class:`HttpClient` request for an HTTP resource. This class has a similar interface to :class:`urllib.request.Request`. :param files: optional dictionary of name, file-like-objects. :param allow_redirects: allow the response to follow redirects. .. attribute:: method The request method .. attribute:: version HTTP version for this request, usually ``HTTP/1.1`` .. attribute:: history List of past :class:`.HttpResponse` (collected during redirects). .. attribute:: wait_continue if ``True``, the :class:`HttpRequest` includes the ``Expect: 100-Continue`` header. ''' CONNECT = 'CONNECT' _proxy = None _ssl = None _tunnel = None def __init__(self, client, url, method, inp_params, headers=None, data=None, files=None, timeout=None, history=None, charset=None, encode_multipart=True, multipart_boundary=None, source_address=None, allow_redirects=False, max_redirects=10, decompress=True, version=None, wait_continue=False, websocket_handler=None, cookies=None, **ignored): self.client = client self._data = None self.files = files self.inp_params = inp_params self.unredirected_headers = Headers(kind='client') self.timeout = timeout self.method = method.upper() self.full_url = url self.set_proxy(None) self.history = history self.wait_continue = wait_continue self.max_redirects = max_redirects self.allow_redirects = allow_redirects self.charset = charset or 'utf-8' self.version = version self.decompress = decompress self.encode_multipart = encode_multipart self.multipart_boundary = multipart_boundary self.websocket_handler = websocket_handler self.source_address = source_address self.new_parser() if self._scheme in tls_schemes: self._ssl = client.ssl_context(**ignored) self.headers = client.get_headers(self, headers) cookies = cookiejar_from_dict(client.cookies, cookies) if cookies: cookies.add_cookie_header(self) self.unredirected_headers['host'] = host_no_default_port(self._scheme, self._netloc) client.set_proxy(self) self.data = data @property def address(self): '''``(host, port)`` tuple of the HTTP resource''' return self._tunnel.address if self._tunnel else (self.host, self.port) @property def target_address(self): return (self.host, int(self.port)) @property def ssl(self): '''Context for TLS connections. If this is a tunneled request and the tunnel connection is not yet established, it returns ``None``. ''' if not self._tunnel: return self._ssl @property def key(self): return (self.scheme, self.host, self.port, self.timeout) @property def proxy(self): '''Proxy server for this request.''' return self._proxy @property def netloc(self): if self._proxy: return self._proxy.netloc else: return self._netloc def __repr__(self): return self.first_line() __str__ = __repr__ @property def full_url(self): '''Full url of endpoint''' return urlunparse((self._scheme, self._netloc, self.path, self.params, self.query, self.fragment)) @full_url.setter def full_url(self, url): self._scheme, self._netloc, self.path, self.params,\ self.query, self.fragment = urlparse(url) if not self._netloc and self.method == 'CONNECT': self._scheme, self._netloc, self.path, self.params,\ self.query, self.fragment = urlparse('http://%s' % url) @property def data(self): '''Body of request''' return self._data @data.setter def data(self, data): self._data = self._encode_data(data) def first_line(self): if not self._proxy and self.method != self.CONNECT: url = urlunparse(('', '', self.path or '/', self.params, self.query, self.fragment)) else: url = self.full_url return '%s %s %s' % (self.method, url, self.version) def new_parser(self): self.parser = self.client.http_parser(kind=1, decompress=self.decompress, method=self.method) def set_proxy(self, scheme, *host): if not host and scheme is None: self.scheme = self._scheme self._set_hostport(self._scheme, self._netloc) else: le = 2 + len(host) if not le == 3: raise TypeError( 'set_proxy() takes exactly three arguments (%s given)' % le) if not self._ssl: self.scheme = scheme self._set_hostport(scheme, host[0]) self._proxy = scheme_host(scheme, host[0]) else: self._tunnel = HttpTunnel(self, scheme, host[0]) def _set_hostport(self, scheme, host): self._tunnel = None self._proxy = None self.host, self.port = get_hostport(scheme, host)
[docs] def encode(self): '''The bytes representation of this :class:`HttpRequest`. Called by :class:`HttpResponse` when it needs to encode this :class:`HttpRequest` before sending it to the HTTP resource. ''' # Call body before fist_line in case the query is changes. first_line = self.first_line() body = self.data if body and self.wait_continue: self.headers['expect'] = '100-continue' body = None headers = self.headers if self.unredirected_headers: headers = self.unredirected_headers.copy() headers.update(self.headers) buffer = [first_line.encode('ascii'), b'\r\n', bytes(headers)] if body: buffer.append(body) return b''.join(buffer)
def add_header(self, key, value): self.headers[key] = value
[docs] def has_header(self, header_name): '''Check ``header_name`` is in this request headers. ''' return (header_name in self.headers or header_name in self.unredirected_headers)
[docs] def get_header(self, header_name, default=None): '''Retrieve ``header_name`` from this request headers. ''' return self.headers.get( header_name, self.unredirected_headers.get(header_name, default))
[docs] def remove_header(self, header_name): '''Remove ``header_name`` from this request. ''' self.headers.pop(header_name, None) self.unredirected_headers.pop(header_name, None)
def add_unredirected_header(self, header_name, header_value): self.unredirected_headers[header_name] = header_value # INTERNAL ENCODING METHODS def _encode_data(self, data): body = None if self.method in ENCODE_URL_METHODS: self.files = None self._encode_url(data) elif isinstance(data, bytes): assert self.files is None, ('data cannot be bytes when files are ' 'present') body = data elif is_string(data): assert self.files is None, ('data cannot be string when files are ' 'present') body = to_bytes(data, self.charset) elif data or self.files: if self.files: body, content_type = self._encode_files(data) else: body, content_type = self._encode_params(data) # set files to None, Important! self.files = None self.headers['Content-Type'] = content_type if body: self.headers['content-length'] = str(len(body)) elif 'expect' not in self.headers: self.headers.pop('content-length', None) self.headers.pop('content-type', None) return body def _encode_url(self, data): query = self.query if data: data = native_str(data) if isinstance(data, str): data = parse_qsl(data) else: data = mapping_iterator(data) query = parse_qsl(query) query.extend(data) query = urlencode(query) self.query = query def _encode_files(self, data): fields = [] for field, val in mapping_iterator(data or ()): if (is_string(val) or isinstance(val, bytes) or not hasattr(val, '__iter__')): val = [val] for v in val: if v is not None: if not isinstance(v, bytes): v = str(v) fields.append((field.decode('utf-8') if isinstance(field, bytes) else field, v.encode('utf-8') if isinstance(v, str) else v)) for (k, v) in mapping_iterator(self.files): # support for explicit filename ft = None if isinstance(v, (tuple, list)): if len(v) == 2: fn, fp = v else: fn, fp, ft = v else: fn = guess_filename(v) or k fp = v if isinstance(fp, bytes): fp = BytesIO(fp) elif is_string(fp): fp = StringIO(fp) if ft: new_v = (fn, fp.read(), ft) else: new_v = (fn, fp.read()) fields.append((k, new_v)) # return encode_multipart_formdata(fields, charset=self.charset) def _encode_params(self, data): content_type = self.headers.get('content-type') # No content type given if not content_type: if self.encode_multipart: return encode_multipart_formdata( data, boundary=self.multipart_boundary, charset=self.charset) else: content_type = 'application/x-www-form-urlencoded' body = urlencode(data).encode(self.charset) elif content_type in JSON_CONTENT_TYPES: body = json.dumps(data).encode(self.charset) else: raise ValueError("Don't know how to encode body for %s" % content_type) return body, content_type
[docs]class HttpResponse(ProtocolConsumer): '''A :class:`.ProtocolConsumer` for the HTTP client protocol. Initialised by a call to the :class:`HttpClient.request` method. There are two events you can yield in a coroutine: .. attribute:: on_headers fired once the response headers are received. .. attribute:: on_finished Fired once the whole request has finished Public API: ''' _tunnel_host = None _has_proxy = False _content = None _data_sent = None _status_code = None _cookies = None request_again = None ONE_TIME_EVENTS = ProtocolConsumer.ONE_TIME_EVENTS + ('on_headers',) @property def parser(self): request = self.request if request: return request.parser def __str__(self): return '%s' % (self.status_code or '<None>') def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self) @property def status_code(self): '''Numeric status code such as 200, 404 and so forth. Available once the :attr:`on_headers` has fired.''' return self._status_code @property def url(self): '''The request full url.''' request = self.request if request: return request.full_url @property def history(self): request = self.request if request: return request.history @property def headers(self): if not hasattr(self, '_headers'): if self.parser and self.parser.is_headers_complete(): self._headers = Headers(self.parser.get_headers()) return getattr(self, '_headers', None) @property def is_error(self): if self.status_code: return not is_succesful(self.status_code) else: return False @property def cookies(self): '''Dictionary of cookies set by the server or ``None``. ''' return self._cookies
[docs] def recv_body(self): '''Flush the response body and return it.''' return self.parser.recv_body()
def get_status(self): code = self.status_code if code: return '%d %s' % (code, responses.get(code, 'Unknown'))
[docs] def get_content(self): '''Retrieve the body without flushing''' b = self.parser.recv_body() if b or self._content is None: self._content = self._content + b if self._content else b return self._content
[docs] def content_string(self, charset=None, errors=None): '''Decode content as a string.''' data = self.get_content() if data is not None: return data.decode(charset or 'utf-8', errors or 'strict')
[docs] def json(self, charset=None): '''Decode content as a JSON object.''' return json.loads(self.content_string(charset))
[docs] def decode_content(self): '''Return the best possible representation of the response body. ''' ct = self.headers.get('content-type') if ct: ct, options = parse_options_header(ct) charset = options.get('charset') if ct in JSON_CONTENT_TYPES: return self.json(charset) elif ct.startswith('text/'): return self.content_string(charset) return self.get_content()
[docs] def raise_for_status(self): '''Raises stored :class:`HTTPError` or :class:`URLError`, if occured. ''' if self.is_error: if self.status_code: raise HTTPError(self.url, self.status_code, self.content_string(), self.headers, None) else: raise URLError(self.on_finished.result.error)
[docs] def info(self): '''Required by python CookieJar. Return :attr:`headers`.''' return self.headers
# ##################################################################### # # PROTOCOL IMPLEMENTATION def start_request(self): self.transport.write(self._request.encode()) def data_received(self, data): request = self._request # request.parser my change (100-continue) # Always invoke it via request if request.parser.execute(data, len(data)) == len(data): if request.parser.is_headers_complete(): self._status_code = request.parser.get_status_code() if not self.event('on_headers').fired(): self.fire_event('on_headers') if (not self.event('post_request').fired() and request.parser.is_message_complete()): self.finished() else: raise pulsar.ProtocolError('%s\n%s' % (self, self.headers))
[docs]class HttpClient(AbstractClient): '''A client for HTTP/HTTPS servers. It handles pool of asynchronous connections. :param encode_multipart: optional flag for setting the :attr:`encode_multipart` attribute :param pool_size: set the :attr:`pool_size` attribute. :param store_cookies: set the :attr:`store_cookies` attribute .. attribute:: headers Default headers for this :class:`HttpClient`. Default: :attr:`DEFAULT_HTTP_HEADERS`. .. attribute:: cookies Default cookies for this :class:`HttpClient`. .. attribute:: store_cookies If ``True`` it remebers response cookies and send them back to serves. Default: ``True`` .. attribute:: timeout Default timeout for the connecting sockets. If 0 it is an asynchronous client. .. attribute:: encode_multipart Flag indicating if body data is by default encoded using the ``multipart/form-data`` or ``application/x-www-form-urlencoded`` encoding. It can be overwritten during a :meth:`request`. Default: ``True`` .. attribute:: proxy_info Dictionary of proxy servers for this client. .. attribute:: pool_size The size of a pool of connection for a given host. .. attribute:: connection_pools Dictionary of connection pools for different hosts .. attribute:: green If ``True`` the requests are processed using the :mod:`~pulsar.apps.greenio` application. When operating in this mode, the :class:`.HttpClient` must be used from a greenlet ather than the main one. .. attribute:: DEFAULT_HTTP_HEADERS Default headers for this :class:`HttpClient` ''' MANY_TIMES_EVENTS = ('connection_made', 'pre_request', 'on_headers', 'post_request', 'connection_lost') protocol_factory = partial(Connection, HttpResponse) allow_redirects = False max_redirects = 10 '''Maximum number of redirects. It can be overwritten on :meth:`request`.''' connection_pool = Pool '''Connection :class:`.Pool` factory ''' client_version = pulsar.SERVER_SOFTWARE '''String for the ``User-Agent`` header.''' version = 'HTTP/1.1' '''Default HTTP request version for this :class:`HttpClient`. It can be overwritten on :meth:`request`.''' DEFAULT_HTTP_HEADERS = Headers([ ('Connection', 'Keep-Alive'), ('Accept', '*/*'), ('Accept-Encoding', 'deflate'), ('Accept-Encoding', 'gzip')], kind='client') DEFAULT_TUNNEL_HEADERS = Headers([ ('Connection', 'Keep-Alive'), ('Proxy-Connection', 'Keep-Alive')], kind='client') request_parameters = ('encode_multipart', 'max_redirects', 'decompress', 'allow_redirects', 'multipart_boundary', 'version', 'timeout', 'websocket_handler') # Default hosts not affected by proxy settings. This can be overwritten # by specifying the "no" key in the proxy_info dictionary no_proxy = set(('localhost', platform.node())) def __init__(self, proxy_info=None, cache=None, headers=None, encode_multipart=True, multipart_boundary=None, keyfile=None, certfile=None, cert_reqs=CERT_NONE, ca_certs=None, cookies=None, store_cookies=True, max_redirects=10, decompress=True, version=None, websocket_handler=None, parser=None, trust_env=True, loop=None, client_version=None, timeout=None, pool_size=10, green=False): super(HttpClient, self).__init__(loop) self.client_version = client_version or self.client_version self.connection_pools = {} self.pool_size = pool_size self.trust_env = trust_env self.timeout = timeout self.store_cookies = store_cookies self.max_redirects = max_redirects self.cookies = cookiejar_from_dict(cookies) self.decompress = decompress self.version = version or self.version self.green = green and greenio dheaders = self.DEFAULT_HTTP_HEADERS.copy() dheaders['user-agent'] = self.client_version if headers: dheaders.override(headers) self.headers = dheaders self.tunnel_headers = self.DEFAULT_TUNNEL_HEADERS.copy() self.proxy_info = dict(proxy_info or ()) if not self.proxy_info and self.trust_env: self.proxy_info = get_environ_proxies() if 'no' not in self.proxy_info: self.proxy_info['no'] = ','.join(self.no_proxy) self.encode_multipart = encode_multipart self.multipart_boundary = multipart_boundary or choose_boundary() self.websocket_handler = websocket_handler self.https_defaults = {'keyfile': keyfile, 'certfile': certfile, 'cert_reqs': cert_reqs, 'ca_certs': ca_certs} self.http_parser = parser or http_parser # Add hooks self.bind_event('pre_request', Tunneling(self._loop)) self.bind_event('on_headers', handle_101) self.bind_event('on_headers', handle_100) self.bind_event('on_headers', handle_cookies) self.bind_event('post_request', handle_redirect) # greenlet if self.green: self.request = partial(green_request, self.request) @property def websocket_key(self): if not hasattr(self, '_websocket_key'): self._websocket_key = native_str(b64encode(os.urandom(16)), DEFAULT_CHARSET) return self._websocket_key def connect(self, address): if isinstance(address, tuple): address = ':'.join(('%s' % v for v in address)) return self.request('CONNECT', address)
[docs] def get(self, url, **kwargs): '''Sends a GET request and returns a :class:`HttpResponse` object. :params url: url for the new :class:`HttpRequest` object. :param \*\*kwargs: Optional arguments for the :meth:`request` method. ''' kwargs.setdefault('allow_redirects', True) return self.request('GET', url, **kwargs)
[docs] def options(self, url, **kwargs): '''Sends a OPTIONS request and returns a :class:`HttpResponse` object. :params url: url for the new :class:`HttpRequest` object. :param \*\*kwargs: Optional arguments for the :meth:`request` method. ''' kwargs.setdefault('allow_redirects', True) return self.request('OPTIONS', url, **kwargs)
[docs] def head(self, url, **kwargs): '''Sends a HEAD request and returns a :class:`HttpResponse` object. :params url: url for the new :class:`HttpRequest` object. :param \*\*kwargs: Optional arguments for the :meth:`request` method. ''' return self.request('HEAD', url, **kwargs)
[docs] def post(self, url, **kwargs): '''Sends a POST request and returns a :class:`HttpResponse` object. :params url: url for the new :class:`HttpRequest` object. :param \*\*kwargs: Optional arguments for the :meth:`request` method. ''' return self.request('POST', url, **kwargs)
[docs] def put(self, url, **kwargs): '''Sends a PUT request and returns a :class:`HttpResponse` object. :params url: url for the new :class:`HttpRequest` object. :param \*\*kwargs: Optional arguments for the :meth:`request` method. ''' return self.request('PUT', url, **kwargs)
[docs] def patch(self, url, **kwargs): '''Sends a PATCH request and returns a :class:`HttpResponse` object. :params url: url for the new :class:`HttpRequest` object. :param \*\*kwargs: Optional arguments for the :meth:`request` method. ''' return self.request('PATCH', url, **kwargs)
[docs] def delete(self, url, **kwargs): '''Sends a DELETE request and returns a :class:`HttpResponse` object. :params url: url for the new :class:`HttpRequest` object. :param \*\*kwargs: Optional arguments for the :meth:`request` method. ''' return self.request('DELETE', url, **kwargs)
@task
[docs] def request(self, method, url, **params): '''Constructs and sends a request to a remote server. It returns a :class:`.Future` which results in a :class:`HttpResponse` object. :param method: request method for the :class:`HttpRequest`. :param url: URL for the :class:`HttpRequest`. :parameter response: optional pre-existing :class:`HttpResponse` which starts a new request (for redirects, digest authentication and so forth). :param params: optional parameters for the :class:`HttpRequest` initialisation. :rtype: a :class:`.Future` ''' nparams = params.copy() nparams.update(((name, getattr(self, name)) for name in self.request_parameters if name not in params)) request = HttpRequest(self, url, method, params, **nparams) pool = self.connection_pools.get(request.key) if pool is None: host, port = request.address pool = self.connection_pool( partial(self._connect, host, port, request.ssl), pool_size=self.pool_size, loop=self._loop) self.connection_pools[request.key] = pool conn = yield pool.connect() with conn: consumer = conn.current_consumer() # bind request-specific events consumer.bind_events(**request.inp_params) consumer.start(request) response = yield consumer.on_finished if response is not None: consumer = response if consumer.request_again: if isinstance(consumer.request_again, Exception): raise consumer.request_again elif isinstance(consumer.request_again, ProtocolConsumer): consumer = consumer.request_again headers = consumer.headers if (not headers or not headers.has('connection', 'keep-alive') or consumer.status_code == 101): conn.detach() if isinstance(consumer.request_again, tuple): method, url, params = consumer.request_again consumer = yield self.request(method, url, **params) coroutine_return(consumer)
[docs] def close(self, async=True, timeout=5): '''Close all connections. Fire the ``finish`` :ref:`one time event <one-time-event>` once done. Return the :class:`.Future` fired by the ``finish`` event. ''' for p in self.connection_pools.values(): p.close() self.connection_pools.clear()
[docs] def add_basic_authentication(self, username, password): '''Add a :class:`HTTPBasicAuth` handler to the ``pre_requests`` hook. ''' self.bind_event('pre_request', HTTPBasicAuth(username, password))
[docs] def add_digest_authentication(self, username, password): '''Add a :class:`HTTPDigestAuth` handler to the ``pre_requests`` hook. ''' self.bind_event('pre_request', HTTPDigestAuth(username, password))
# INTERNALS def get_headers(self, request, headers=None): # Returns a :class:`Header` obtained from combining # :attr:`headers` with *headers*. Can handle websocket requests. if request.scheme in ('ws', 'wss'): d = Headers(( ('Connection', 'Upgrade'), ('Upgrade', 'websocket'), ('Sec-WebSocket-Version', str(max(SUPPORTED_VERSIONS))), ('Sec-WebSocket-Key', self.websocket_key), ('user-agent', self.client_version) ), kind='client') else: d = self.headers.copy() if headers: d.override(headers) return d def ssl_context(self, **kwargs): params = self.https_defaults.copy() for name in kwargs: if name in params: params[name] = kwargs[name] return SSLContext(**params) def set_proxy(self, request): if request.scheme in self.proxy_info: hostonly = request.host no_proxy = [n for n in self.proxy_info.get('no', '').split(',') if n] if not any(map(hostonly.endswith, no_proxy)): url = self.proxy_info[request.scheme] p = urlparse(url) if not p.scheme: raise ValueError('Could not understand proxy %s' % url) request.set_proxy(p.scheme, p.netloc) def _connect(self, host, port, ssl): _, connection = yield From(self._loop.create_connection( self.create_protocol, host, port, ssl=ssl)) # Wait for the connection made event yield From(connection.event('connection_made')) coroutine_return(connection)
def green_request(request, method, url, **params): return greenio.wait(request(method, url, **params))