Documentation for pulsar 0.9.2. For development docs, go here.

Source code for examples.httpbin.manage

'''Pulsar HTTP test application::

    python manage.py

Implementation
======================

.. autoclass:: HttpBin
   :members:
   :member-order: bysource

Server Hooks
===================

This example shows how to use
:ref:`server hooks <setting-section-application-hooks>` to log each request

.. automodule:: examples.httpbin.config
   :members:

'''
import os
import sys
import string
import mimetypes
from itertools import repeat, chain
from random import random

try:
    from pulsar.utils.pep import ispy3k, range
except ImportError:     # pragma    nocover
    sys.path.append('../../')
    from pulsar.utils.pep import ispy3k, range

from pulsar import HttpRedirect, HttpException, version, JAPANESE, CHINESE
from pulsar.utils.httpurl import (Headers, ENCODE_URL_METHODS,
                                  ENCODE_BODY_METHODS)
from pulsar.utils.html import escape
from pulsar.apps import wsgi, ws
from pulsar.apps.wsgi import (route, Html, Json, HtmlDocument, GZipMiddleware,
                              AsyncString)
from pulsar.utils.structures import MultiValueDict
from pulsar.utils.system import json

METHODS = frozenset(chain((m.lower() for m in ENCODE_URL_METHODS),
                          (m.lower() for m in ENCODE_BODY_METHODS)))
pyversion = '.'.join(map(str, sys.version_info[:3]))
ASSET_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'assets')
FAVICON = os.path.join(ASSET_DIR, 'favicon.ico')

if ispy3k:  # pragma nocover
    characters = string.ascii_letters + string.digits
else:   # pragma nocover
    characters = string.letters + string.digits


def asset(name, mode='r', chunk_size=None):
    name = os.path.join(ASSET_DIR, name)
    if os.path.isfile(name):
        content_type, encoding = mimetypes.guess_type(name)
        if chunk_size:
            def _chunks():
                with open(name, mode) as file:
                    while True:
                        data = file.read(chunk_size)
                        if not data:
                            break
                        yield data
            data = _chunks()
        else:
            with open(name, mode) as file:
                data = file.read()
        return data, content_type, encoding


class BaseRouter(wsgi.Router):
    ########################################################################
    #    INTERNALS
    def bind_server_event(self, request, event, handler):
        consumer = request.environ['pulsar.connection'].current_consumer()
        consumer.bind_event(event, handler)

    def info_data_response(self, request, **params):
        data = self.info_data(request, **params)
        return Json(data).http_response(request)

    def info_data(self, request, **params):
        headers = self.getheaders(request)
        data = {'method': request.method,
                'headers': headers,
                'pulsar': self.pulsar_info(request)}
        if request.method in ENCODE_URL_METHODS:
            data['args'] = dict(request.url_data)
        else:
            args, files = request.data_and_files()
            jfiles = MultiValueDict()
            for name, parts in files.lists():
                for part in parts:
                    try:
                        part = part.string()
                    except UnicodeError:
                        part = part.base64()
                    jfiles[name] = part
            data.update((('args', dict(args)),
                         ('files', dict(jfiles))))
        data.update(params)
        return data

    def getheaders(self, request):
        headers = Headers(kind='client')
        for k in request.environ:
            if k.startswith('HTTP_'):
                headers[k[5:].replace('_', '-')] = request.environ[k]
        return dict(headers)

    def pulsar_info(self, request):
        return request.get('pulsar.connection').info()


[docs]class HttpBin(BaseRouter): '''The main :class:`.Router` for the HttpBin application '''
[docs] def get(self, request): '''The home page of this router''' ul = Html('ul') for router in sorted(self.routes, key=lambda r: r.creation_count): a = router.link(escape(router.route.path)) a.addClass(router.name) for method in METHODS: if router.getparam(method): a.addClass(method) li = Html('li', a, ' %s' % router.getparam('title', '')) ul.append(li) title = 'Pulsar' html = request.html_document html.head.title = title html.head.links.append('httpbin.css') html.head.links.append('favicon.ico', rel="icon", type='image/x-icon') html.head.scripts.append('jquery') html.head.scripts.append('httpbin.js') ul = ul.render(request) templ, _, _ = asset('template.html') body = templ % (title, JAPANESE, CHINESE, version, pyversion, ul) html.body.append(body) return html.http_response(request)
@route(title='Returns GET data') def get_get(self, request): return self.info_data_response(request) @route(title='Returns POST data') def post_post(self, request): return self.info_data_response(request) @route(title='Returns PATCH data') def patch_patch(self, request): return self.info_data_response(request) @route(title='Returns PUT data') def put_put(self, request): return self.info_data_response(request) @route(title='Returns DELETE data') def delete_delete(self, request): return self.info_data_response(request) @route('redirect/<int(min=1,max=10):times>', defaults={'times': 5}, title='302 Redirect n times') def redirect(self, request): num = request.urlargs['times'] - 1 if num: raise HttpRedirect('/redirect/%s' % num) else: raise HttpRedirect('/get') @route('getsize/<int(min=1,max=8388608):size>', defaults={'size': 150000}, title='Returns a preset size of data (limit at 8MB)') def getsize(self, request): size = request.urlargs['size'] data = {'size': size, 'data': 'd' * size} return self.info_data_response(request, **data) @route(title='Returns gzip encoded data') def gzip(self, request): response = self.info_data_response(request, gzipped=True) return GZipMiddleware(10)(request.environ, response) @route(title='Returns cookie data') def cookies(self, request): cookies = request.cookies d = dict(((c.key, c.value) for c in cookies.values())) return Json({'cookies': d}).http_response(request) @route('cookies/set/<name>/<value>', title='Sets a simple cookie', defaults={'name': 'package', 'value': 'pulsar'}) def request_cookies_set(self, request): key = request.urlargs['name'] value = request.urlargs['value'] request.response.set_cookie(key, value=value) request.response.status_code = 302 request.response.headers['location'] = '/cookies' return request.response @route('status/<int(min=100,max=505):status>', title='Returns given HTTP Status code', defaults={'status': 418}) def status(self, request): request.response.content_type = 'text/html' raise HttpException(status=request.urlargs['status']) @route(title='Returns response headers') def response_headers(self, request): class Gen: headers = None def __call__(self, server, **kw): self.headers = server.headers def generate(self): # yield a byte so that headers are sent yield b'' # we must have the headers now yield json.dumps(dict(self.headers)) gen = Gen() self.bind_server_event(request, 'on_headers', gen) request.response.content = gen.generate() request.response.content_type = 'application/json' return request.response @route('basic-auth/<username>/<password>', title='Challenges HTTPBasic Auth', defaults={'username': 'username', 'password': 'password'}) def challenge_auth(self, request): auth = request.get('http.authorization') if auth and auth.authenticated(request.environ, **request.urlargs): return Json({'authenticated': True, 'username': auth.username}).http_response(request) raise wsgi.HttpAuthenticate('basic') @route('digest-auth/<username>/<password>/<qop>', title='Challenges HTTP Digest Auth', defaults={'username': 'username', 'password': 'password', 'qop': 'auth'}) def challenge_digest_auth(self, request): auth = request.get('http.authorization') if auth and auth.authenticated(request.environ, **request.urlargs): return Json({'authenticated': True, 'username': auth.username}).http_response(request) raise wsgi.HttpAuthenticate('digest', qop=[request.urlargs['qop']]) @route('stream/<int(min=1):m>/<int(min=1):n>', title='Stream m chunk of data n times', defaults={'m': 300, 'n': 20}) def request_stream(self, request): m = request.urlargs['m'] n = request.urlargs['n'] request.response.content_type = 'text/plain' request.response.content = repeat(b'a' * m, n) return request.response @route(title='A web socket graph') def websocket(self, request): data = open(os.path.join(os.path.dirname(__file__), 'assets', 'websocket.html')).read() scheme = 'wss' if request.is_secure else 'ws' host = request.get('HTTP_HOST') data = data % {'address': '%s://%s/graph-data' % (scheme, host)} request.response.content_type = 'text/html' request.response.content = data return request.response @route(title='Live server statistics')
[docs] def stats(self, request): '''Live stats for the server. Try sending lots of requests ''' # scheme = 'wss' if request.is_secure else 'ws' # host = request.get('HTTP_HOST') # address = '%s://%s/stats' % (scheme, host) doc = HtmlDocument(title='Live server stats', media_path='/assets/') # docs.head.scripts return doc.http_response(request)
@route('clip/<int(min=256,max=16777216):chunk_size>', defaults={'chunk_size': 4096}, title='Show a video clip') def clip(self, request): c = request.urlargs['chunk_size'] data, ct, encoding = asset('clip.mp4', 'rb', chunk_size=c) response = request.response response.content_type = ct response.encoding = encoding response.content = data return response ######################################################################## # BENCHMARK ROUTES @route() def json(self, request): return Json({'message': "Hello, World!"}).http_response(request) @route() def plaintext(self, request): return AsyncString('Hello, World!').http_response(request)
class ExpectFail(BaseRouter): def post(self, request): chunk = request.get('wsgi.input') if not chunk.done(): chunk.fail() else: return self.info_data_response(request) class Graph(ws.WS): def on_message(self, websocket, msg): websocket.write(json.dumps([(i, random()) for i in range(100)])) class Site(wsgi.LazyWsgi): def setup(self, environ): router = HttpBin('/') return wsgi.WsgiHandler([ExpectFail('expect'), wsgi.wait_for_body_middleware, wsgi.clean_path_middleware, wsgi.authorization_middleware, wsgi.MediaRouter('media', ASSET_DIR, show_indexes=True), ws.WebSocket('/graph-data', Graph()), router]) def server(description=None, **kwargs): description = description or 'Pulsar HttpBin' return wsgi.WSGIServer(Site(), description=description, **kwargs) if __name__ == '__main__': # pragma nocover server().start()