Source code for thriftasyncioserver
# -*- coding: utf-8 -*-
__author__ = 'Thomas Bartelmess'
__email__ = 'tbartelmess@marketcircle.com'
__version__ = '0.1.7'
__all__ = ['Server']
import asyncio
from thrift.transport import TTransport
from thrift.transport.TTransport import TTransportBase
from struct import pack, unpack
class Protocol(asyncio.Protocol, TTransportBase):
in_protocol = None
out_protocol = None
processor = None
read_buffer = None
write_buffer = None
current_frame_size = None
stream_writer = None
def __init__(self, protocol_factory, processor):
self.loop = asyncio.get_event_loop()
self.in_protocol = protocol_factory.getProtocol(self)
self.out_protocol = protocol_factory.getProtocol(self)
self.processor = processor
self.read_buffer = b''
self.write_buffer = b''
def data_received(self, data):
self.read_buffer += data
self.processor.process(self.in_protocol, self.out_protocol)
def connection_made(self, transport):
self.stream_writer = asyncio.StreamWriter(transport, self,
None,
self.loop)
self.transport = transport
loop = asyncio.get_event_loop()
def open(self):
pass
def isOpen(self):
return True
def read(self, length):
data = self.read_buffer[:length]
if length >= len(self.read_buffer):
self.read_buffer = b''
else:
self.read_buffer = self.read_buffer[length:]
return data
def write(self, data):
self.stream_writer.write(data)
def flush(self):
yield from self.stream_writer.drain()
[docs]class Server(object):
"""
Thrift Server using the Python asyncio module.
"""
server_ready_event = None
server_stop_event = None
def __init__(self, host, port, protocol_factory, processor, ssl=None):
self.host = host
self.port = port
self.protocol_factory = protocol_factory
self.processor = processor
self.ssl = ssl
[docs] def make_protocol(self):
return Protocol(self.protocol_factory, self.processor)
[docs] def make_server(self, loop):
return loop.create_server(self.make_protocol,
host=self.host,
port=self.port,
ssl=self.ssl)
[docs] def serve(self):
loop = asyncio.get_event_loop()
coro = self.make_server(loop)
server = loop.run_until_complete(coro)
if self.server_ready_event:
self.server_ready_event.set()
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.run_until_complete(server.wait_closed())
if self.server_stop_event:
self.server_stop_event.set()
class UnixServer(Server):
def __init__(self, path, ssl=None):
self.path = path
self.ssl = ssl
def make_server(self, loop):
return loop.create_unix_server(self.make_protocol, path=self.path)