Documentation for pulsar 0.9.2. For development docs, go here.
'''An asynchronous multi-process `HTTP proxy server`_. It works for both
``http`` and ``https`` (tunneled) requests.
Managing Headers
=====================
It is possible to add middleware to manipulate the original request headers.
If the header middleware is
an empty list, the proxy passes requests and responses unmodified.
This is an implementation for a forward-proxy which can be used
to retrieve any type of source from the Internet.
To run the server::
python manage.py
An header middleware is a callable which receives the wsgi *environ* and
the list of request *headers*. By default the example uses:
.. autofunction:: x_forwarded_for
To run with different headers middleware create a new script and do::
from proxyserver.manage import server
if __name__ == '__main__':
server(headers_middleware=[...]).start()
Implemenation
===========================
.. autoclass:: ProxyServerWsgiHandler
:members:
:member-order:
.. _`HTTP proxy server`: http://en.wikipedia.org/wiki/Proxy_server
'''
import io
import sys
import logging
from functools import partial
try:
import pulsar
except ImportError:
sys.path.append('../../')
import pulsar
from pulsar import (asyncio, HttpException, task, async, coroutine_return,
add_errback)
from pulsar.apps import wsgi, http
from pulsar.utils.httpurl import Headers
from pulsar.utils.log import LocalMixin, local_property
SERVER_SOFTWARE = 'Pulsar-proxy-server/%s' % pulsar.version
ENVIRON_HEADERS = ('content-type', 'content-length')
USER_AGENT = SERVER_SOFTWARE
logger = logging.getLogger('pulsar.proxyserver')
[docs]def x_forwarded_for(environ, headers):
'''Add *x-forwarded-for* header'''
headers.add_header('x-forwarded-for', environ['REMOTE_ADDR'])
class user_agent:
'''Override user-agent header'''
def __init__(self, agent):
self.agent = agent
def __call__(self, environ, headers):
headers['user-agent'] = self.agent
[docs]class ProxyServerWsgiHandler(LocalMixin):
'''WSGI middleware for an asynchronous proxy server.
To perform processing on headers you can pass a list of
``headers_middleware``.
An headers middleware is a callable which accepts two parameters, the wsgi
*environ* dictionary and the *headers* container.
'''
def __init__(self, headers_middleware=None):
self.headers_middleware = headers_middleware or []
@local_property
def http_client(self):
'''The :class:`.HttpClient` used by this proxy middleware for
accessing upstream resources'''
return http.HttpClient(decompress=False, store_cookies=False)
@task
def __call__(self, environ, start_response):
uri = environ['RAW_URI']
logger.debug('new request for %r' % uri)
if not uri or uri.startswith('/'): # No proper uri, raise 404
raise HttpException(status=404)
if environ.get('HTTP_EXPECT') != '100-continue':
stream = environ.get('wsgi.input') or io.BytesIO()
data = yield stream.read()
else:
data = None
request_headers = self.request_headers(environ)
method = environ['REQUEST_METHOD']
if method == 'CONNECT':
response = ProxyTunnel(environ, start_response)
else:
response = ProxyResponse(environ, start_response)
request = self.http_client.request(method, uri, data=data,
headers=request_headers,
version=environ['SERVER_PROTOCOL'],
pre_request=response.pre_request)
add_errback(request, response.error)
coroutine_return(response)
[docs] def request_headers(self, environ):
'''Fill request headers from the environ dictionary and
modify them via the list of :attr:`headers_middleware`.
The returned headers will be sent to the target uri.
'''
headers = Headers(kind='client')
for k in environ:
if k.startswith('HTTP_'):
head = k[5:].replace('_', '-')
headers[head] = environ[k]
for head in ENVIRON_HEADERS:
k = head.replace('-', '_').upper()
v = environ.get(k)
if v:
headers[head] = v
for middleware in self.headers_middleware:
middleware(environ, headers)
return headers
############################################################################
# RESPONSE OBJECTS
class ProxyResponse(object):
'''Asynchronous wsgi response for http requests
'''
_started = False
_headers = None
_done = False
def __init__(self, environ, start_response):
self.environ = environ
self.start_response = start_response
self.queue = asyncio.Queue()
def __iter__(self):
while True:
if self._done:
try:
yield self.queue.get_nowait()
except asyncio.QueueEmpty:
break
else:
yield async(self.queue.get())
def pre_request(self, response, exc=None):
self._started = True
response.bind_event('data_processed', self.data_processed)
def error(self, exc):
if not self._started:
request = wsgi.WsgiRequest(self.environ)
content_type = request.content_types.best_match(
('text/html', 'text/plain'))
uri = self.environ['RAW_URI']
msg = 'Could not find %s' % uri
logger.info(msg=msg)
if content_type == 'text/html':
html = wsgi.HtmlDocument(title=msg)
html.body.append('<h1>%s</h1>' % msg)
data = html.render()
resp = wsgi.WsgiResponse(504, data, content_type='text/html')
elif content_type == 'text/plain':
resp = wsgi.WsgiResponse(504, msg, content_type='text/html')
else:
resp = wsgi.WsgiResponse(504, '')
self.start_response(resp.status, resp.get_headers())
self._done = True
self.queue.put_nowait(resp.content[0])
@task
def data_processed(self, response, exc=None, **kw):
'''Receive data from the requesting HTTP client.'''
status = response.get_status()
if status == '100 Continue':
stream = self.environ.get('wsgi.input') or io.BytesIO()
body = yield stream.read()
response.transport.write(body)
if response.parser.is_headers_complete():
if self._headers is None:
headers = self.remove_hop_headers(response.headers)
self._headers = Headers(headers, kind='server')
# start the response
self.start_response(status, list(self._headers))
body = response.recv_body()
if response.parser.is_message_complete():
self._done = True
self.queue.put_nowait(body)
def remove_hop_headers(self, headers):
for header, value in headers:
if header.lower() not in wsgi.HOP_HEADERS:
yield header, value
class ProxyTunnel(ProxyResponse):
'''Asynchronous wsgi response for https requests
'''
def pre_request(self, response, exc=None):
'''Start the tunnel.
This is a callback fired once a connection with upstream server is
established.
Write back to the client the 200 Connection established message.
After this the downstream connection consumer will upgrade to the
DownStreamTunnel.
'''
# Upgrade downstream protocol consumer
# set the request to None so that start_request is not called
assert response._request.method == 'CONNECT'
self._started = True
response._request = None
upstream = response._connection
dostream = self.environ['pulsar.connection']
#
dostream.upgrade(partial(StreamTunnel, upstream))
upstream.upgrade(partial(StreamTunnel, dostream))
response.finished()
self.start_response('200 Connection established', [])
# send empty byte so that headers are sent
self.queue.put_nowait(b'')
self._done = True
return response
class StreamTunnel(pulsar.ProtocolConsumer):
''':class:`.ProtocolConsumer` handling encrypted messages from
downstream client and upstream server.
This consumer is created as an upgrade of the standard Http protocol
consumer.
.. attribute:: tunnel
Connection to the downstream client or upstream server.
'''
headers = None
status_code = None
def __init__(self, tunnel, loop=None):
super(StreamTunnel, self).__init__(loop)
self.tunnel = tunnel
def connection_made(self, connection):
connection.bind_event('connection_lost', self._close_tunnel)
def data_received(self, data):
try:
return self.tunnel.write(data)
except Exception:
if not self.tunnel.closed:
raise
def _close_tunnel(self, arg, exc=None):
if not self.tunnel.closed:
self._loop.call_soon(self.tunnel.close)
def server(name='proxy-server', headers_middleware=None,
server_software=None, **kwargs):
'''Function to Create a WSGI Proxy Server.'''
if headers_middleware is None:
# headers_middleware = [user_agent(USER_AGENT), x_forwarded_for]
headers_middleware = [x_forwarded_for]
wsgi_proxy = ProxyServerWsgiHandler(headers_middleware)
kwargs['server_software'] = server_software or SERVER_SOFTWARE
return wsgi.WSGIServer(wsgi_proxy, name=name, **kwargs)
if __name__ == '__main__':
server().start()