###############################################################################
##
## Copyright (C) 2013-2014 Tavendo GmbH
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
###############################################################################
__all__ = (
'WebSocketAdapterProtocol',
'WebSocketServerProtocol',
'WebSocketClientProtocol',
'WebSocketAdapterFactory',
'WebSocketServerFactory',
'WebSocketClientFactory',
'WampWebSocketServerProtocol',
'WampWebSocketClientProtocol',
'WampWebSocketServerFactory',
'WampWebSocketClientFactory',
)
from collections import deque
try:
import asyncio
from asyncio import iscoroutine
from asyncio import Future
except ImportError:
## Trollius >= 0.3 was renamed
# noinspection PyUnresolvedReferences
import trollius as asyncio
from trollius import iscoroutine
from trollius import Future
from autobahn.wamp import websocket
from autobahn.websocket import protocol
from autobahn.websocket import http
def yields(value):
"""
Returns ``True`` iff the value yields.
.. seealso:: http://stackoverflow.com/questions/20730248/maybedeferred-analog-with-asyncio
"""
return isinstance(value, Future) or iscoroutine(value)
[docs]class WebSocketAdapterProtocol(asyncio.Protocol):
"""
Adapter class for asyncio-based WebSocket client and server protocols.
"""
[docs] def connection_made(self, transport):
self.transport = transport
self.receive_queue = deque()
self._consume()
try:
peer = transport.get_extra_info('peername')
try:
## FIXME: tcp4 vs tcp6
self.peer = "tcp:%s:%d" % (peer[0], peer[1])
except:
## e.g. Unix Domain sockets don't have host/port
self.peer = "unix:{0}".format(peer)
except:
self.peer = "?"
self._connectionMade()
[docs] def connection_lost(self, exc):
self._connectionLost(exc)
self.transport = None
def _consume(self):
self.waiter = Future()
def process(_):
while len(self.receive_queue):
data = self.receive_queue.popleft()
if self.transport:
self._dataReceived(data)
else:
print("WebSocketAdapterProtocol._consume: no transport")
self._consume()
self.waiter.add_done_callback(process)
[docs] def data_received(self, data):
self.receive_queue.append(data)
if not self.waiter.done():
self.waiter.set_result(None)
# noinspection PyUnusedLocal
def _closeConnection(self, abort = False):
self.transport.close()
def _onOpen(self):
res = self.onOpen()
if yields(res):
asyncio.async(res)
def _onMessageBegin(self, isBinary):
res = self.onMessageBegin(isBinary)
if yields(res):
asyncio.async(res)
def _onMessageFrameBegin(self, length):
res = self.onMessageFrameBegin(length)
if yields(res):
asyncio.async(res)
def _onMessageFrameData(self, payload):
res = self.onMessageFrameData(payload)
if yields(res):
asyncio.async(res)
def _onMessageFrameEnd(self):
res = self.onMessageFrameEnd()
if yields(res):
asyncio.async(res)
def _onMessageFrame(self, payload):
res = self.onMessageFrame(payload)
if yields(res):
asyncio.async(res)
def _onMessageEnd(self):
res = self.onMessageEnd()
if yields(res):
asyncio.async(res)
def _onMessage(self, payload, isBinary):
res = self.onMessage(payload, isBinary)
if yields(res):
asyncio.async(res)
def _onPing(self, payload):
res = self.onPing(payload)
if yields(res):
asyncio.async(res)
def _onPong(self, payload):
res = self.onPong(payload)
if yields(res):
asyncio.async(res)
def _onClose(self, wasClean, code, reason):
res = self.onClose(wasClean, code, reason)
if yields(res):
asyncio.async(res)
[docs] def registerProducer(self, producer, streaming):
raise Exception("not implemented")
[docs]class WebSocketServerProtocol(WebSocketAdapterProtocol, protocol.WebSocketServerProtocol):
"""
Base class for asyncio-based WebSocket server protocols.
"""
def _onConnect(self, request):
## onConnect() will return the selected subprotocol or None
## or a pair (protocol, headers) or raise an HttpException
##
# noinspection PyBroadException
try:
res = self.onConnect(request)
#if yields(res):
# res = yield from res
except http.HttpException as exc:
self.failHandshake(exc.reason, exc.code)
except Exception:
self.failHandshake(http.INTERNAL_SERVER_ERROR[1], http.INTERNAL_SERVER_ERROR[0])
else:
self.succeedHandshake(res)
[docs]class WebSocketClientProtocol(WebSocketAdapterProtocol, protocol.WebSocketClientProtocol):
"""
Base class for asyncio-based WebSocket client protocols.
"""
def _onConnect(self, response):
res = self.onConnect(response)
if yields(res):
asyncio.async(res)
[docs]class WebSocketAdapterFactory:
"""
Adapter class for asyncio-based WebSocket client and server factories.
"""
def _log(self, msg):
print(msg)
def _callLater(self, delay, fun):
return self.loop.call_later(delay, fun)
def __call__(self):
proto = self.protocol()
proto.factory = self
return proto
[docs]class WebSocketServerFactory(WebSocketAdapterFactory, protocol.WebSocketServerFactory):
"""
Base class for asyncio-based WebSocket server factories.
"""
def __init__(self, *args, **kwargs):
"""
In addition to all arguments to the constructor of
:class:`autobahn.websocket.protocol.WebSocketServerFactory`,
you can supply a ``loop`` keyword argument to specify the
asyncio event loop to be used.
"""
if 'loop' in kwargs:
if kwargs['loop']:
self.loop = kwargs['loop']
else:
self.loop = asyncio.get_event_loop()
del kwargs['loop']
else:
self.loop = asyncio.get_event_loop()
protocol.WebSocketServerFactory.__init__(self, *args, **kwargs)
[docs]class WebSocketClientFactory(WebSocketAdapterFactory, protocol.WebSocketClientFactory):
"""
Base class for asyncio-baseed WebSocket client factories.
"""
def __init__(self, *args, **kwargs):
"""
In addition to all arguments to the constructor of
:class:`autobahn.websocket.protocol.WebSocketClientFactory`,
you can supply a ``loop`` keyword argument to specify the
asyncio event loop to be used.
"""
if 'loop' in kwargs:
if kwargs['loop']:
self.loop = kwargs['loop']
else:
self.loop = asyncio.get_event_loop()
del kwargs['loop']
else:
self.loop = asyncio.get_event_loop()
protocol.WebSocketClientFactory.__init__(self, *args, **kwargs)
[docs]class WampWebSocketServerProtocol(websocket.WampWebSocketServerProtocol, WebSocketServerProtocol):
"""
Base class for asyncio-based WAMP-over-WebSocket server protocols.
"""
[docs]class WampWebSocketServerFactory(websocket.WampWebSocketServerFactory, WebSocketServerFactory):
"""
Base class for asyncio-based WAMP-over-WebSocket server factories.
"""
protocol = WampWebSocketServerProtocol
def __init__(self, factory, *args, **kwargs):
if 'serializers' in kwargs:
serializers = kwargs['serializers']
del kwargs['serializers']
else:
serializers = None
if 'debug_wamp' in kwargs:
debug_wamp = kwargs['debug_wamp']
del kwargs['debug_wamp']
else:
debug_wamp = False
websocket.WampWebSocketServerFactory.__init__(self, factory, serializers, debug_wamp = debug_wamp)
kwargs['protocols'] = self._protocols
# noinspection PyCallByClass
WebSocketServerFactory.__init__(self, *args, **kwargs)
[docs]class WampWebSocketClientProtocol(websocket.WampWebSocketClientProtocol, WebSocketClientProtocol):
"""
Base class for asyncio-based WAMP-over-WebSocket client protocols.
"""
[docs]class WampWebSocketClientFactory(websocket.WampWebSocketClientFactory, WebSocketClientFactory):
"""
Base class for asyncio-based WAMP-over-WebSocket client factories.
"""
protocol = WampWebSocketClientProtocol
def __init__(self, factory, *args, **kwargs):
if 'serializers' in kwargs:
serializers = kwargs['serializers']
del kwargs['serializers']
else:
serializers = None
if 'debug_wamp' in kwargs:
debug_wamp = kwargs['debug_wamp']
del kwargs['debug_wamp']
else:
debug_wamp = False
websocket.WampWebSocketClientFactory.__init__(self, factory, serializers, debug_wamp = debug_wamp)
kwargs['protocols'] = self._protocols
WebSocketClientFactory.__init__(self, *args, **kwargs)