Documentation for pulsar 0.9.2. For development docs, go here.
import sys
import json
import logging
from pulsar import AsyncObject, task, coroutine_return
from pulsar.utils.security import gen_unique_id
from pulsar.utils.tools import checkarity
from pulsar.apps.wsgi import Json
from pulsar.apps.http import HttpClient
from .handlers import RpcHandler, InvalidRequest, exception
__all__ = ['JSONRPC', 'JsonProxy']
logger = logging.getLogger('pulsar.jsonrpc')
[docs]class JSONRPC(RpcHandler):
'''An :class:`.RpcHandler` for JSON-RPC services.
Design to comply with the `JSON-RPC 2.0`_ Specification.
JSON-RPC is a lightweight remote procedure call protocol
designed to be simple.
A remote method is invoked by sending a request to a remote service,
the request is a single object serialised using JSON.
.. _`JSON-RPC 2.0`: http://www.jsonrpc.org/specification
'''
version = '2.0'
def __call__(self, request):
return Json(self._call(request)).http_response(request)
@task
def _call(self, request):
response = request.response
data = {}
exc_info = None
callable = None
try:
try:
data = yield request.body_data()
except ValueError:
raise InvalidRequest(
status=415, msg='Content-Type must be application/json')
if data.get('jsonrpc') != self.version:
raise InvalidRequest(
'jsonrpc must be supplied and equal to "%s"' %
self.version)
params = data.get('params')
if isinstance(params, dict):
args, kwargs = (), params
else:
args, kwargs = tuple(params or ()), {}
#
callable = self.get_handler(data.get('method'))
result = yield callable(request, *args, **kwargs)
except Exception as exc:
result = exc
exc_info = sys.exc_info()
else:
try:
json.dumps(result)
except Exception as exc:
result = exc
exc_info = sys.exc_info()
#
res = {'id': data.get('id'), "jsonrpc": self.version}
if exc_info:
msg = None
code = getattr(result, 'fault_code', None)
if not code:
if isinstance(result, TypeError) and callable:
msg = checkarity(callable, args, kwargs, discount=1)
code = -32602 if msg else -32603
msg = msg or str(result) or 'JSON RPC exception'
code = getattr(result, 'fault_code', code)
if code == -32603:
logger.error(msg, exc_info=exc_info)
else:
logger.warning(msg)
error = {'code': code,
'message': msg,
'data': getattr(result, 'data', '')}
response.status_code = getattr(result, 'status', 400)
res['error'] = error
else:
res['result'] = result
coroutine_return(res)
class JsonCall:
slots = ('_client', '_name')
def __init__(self, client, name):
self._client = client
self._name = name
def __repr__(self):
return self._name
__str__ = __repr__
@property
def url(self):
return self._client.url
@property
def name(self):
return self._name
def __getattr__(self, name):
name = "%s%s%s" % (self._name, self._client.separator, name)
return self.__class__(self._client, name)
def __call__(self, *args, **kwargs):
return self._client._call(self._name, *args, **kwargs)
[docs]class JsonProxy(AsyncObject):
'''A python Proxy class for :class:`.JSONRPC` Servers.
:param url: server location
:param version: JSON-RPC server version. Default ``2.0``
:param id: optional request id, generated if not provided.
Default ``None``.
:param data: Extra data to include in all requests. Default ``None``.
:param full_response: return the full Http response rather than
just the content.
:param http: optional http client. If provided it must have the ``request``
method available which must be of the form::
http.request(url, body=..., method=...)
Default ``None``.
Lets say your RPC server is running at ``http://domain.name.com/``::
>>> a = JsonProxy('http://domain.name.com/')
>>> a.add(3,4)
7
>>> a.ping()
'pong'
'''
separator = '.'
default_version = '2.0'
default_timeout = 30
def __init__(self, url, version=None, data=None,
full_response=False, http=None, timeout=None, **kw):
self._url = url
self._version = version or self.__class__.default_version
self._full_response = full_response
self._data = data if data is not None else {}
if not http:
timeout = timeout if timeout is not None else self.default_timeout
http = HttpClient(timeout=timeout, **kw)
http.headers['accept'] = 'application/json, text/*; q=0.5'
http.headers['content-type'] = 'application/json'
self._http = http
@property
def url(self):
return self._url
@property
def version(self):
return self._version
@property
def _loop(self):
return self._http._loop
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, self.__url)
def __str__(self):
return self.__repr__()
def __getattr__(self, name):
return JsonCall(self, name)
@task
def _call(self, name, *args, **kwargs):
data = self._get_data(name, *args, **kwargs)
body = json.dumps(data).encode('utf-8')
resp = yield self._http.post(self._url, data=body)
if self._full_response:
coroutine_return(resp)
else:
content = resp.decode_content()
if resp.is_error:
if 'error' not in content:
resp.raise_for_status()
coroutine_return(self.loads(content))
def _get_data(self, func_name, *args, **kwargs):
id = self.makeid()
params = self.get_params(*args, **kwargs)
data = {'method': func_name, 'params': params, 'id': id,
'jsonrpc': self._version}
return data
[docs] def get_params(self, *args, **kwargs):
'''
Create an array or positional or named parameters
Mixing positional and named parameters in one
call is not possible.
'''
kwargs.update(self._data)
if args and kwargs:
raise ValueError('Cannot mix positional and named parameters')
if args:
return list(args)
else:
return kwargs
def loads(self, obj):
if isinstance(obj, dict):
if 'error' in obj:
error = obj['error']
raise exception(error.get('code'), error.get('message'))
else:
return obj.get('result')
return obj