Documentation for pulsar 0.9.2. For development docs, go here.
import os
import sys
from time import time
from collections import OrderedDict
from multiprocessing import Process, current_process
import pulsar
from pulsar import system, MonitorStarted, HaltServer, Config
from pulsar.utils.pep import itervalues, iteritems, range
from pulsar.utils.log import logger_fds
from pulsar.utils import autoreload
from pulsar.utils.tools import Pidfile
from .proxy import ActorProxyMonitor, get_proxy, actor_proxy_future
from .access import get_actor, set_actor, logger, _StopError, SELECTORS
from .threads import Thread
from .mailbox import MailboxClient, MailboxProtocol, ProxyMailbox
from .futures import async, add_errback, chain_future, Future, From
from .eventloop import EventLoop
from .protocols import TcpServer
from .actor import Actor, create_aid
from .consts import *
if sys.platform == 'win32': # pragma nocover
signal = None
else:
import signal
__all__ = ['arbiter']
[docs]def arbiter(**params):
'''Obtain the ``arbiter``.
It returns the arbiter instance only if we are on the arbiter
context domain, otherwise it returns nothing.
'''
arbiter = get_actor()
if arbiter is None:
# Create the arbiter
return set_actor(_spawn_actor('arbiter', None, **params))
elif arbiter.is_arbiter():
return arbiter
[docs]class Concurrency(object):
'''Actor :class:`.Concurrency`.
Responsible for the actual spawning of actors according to a
concurrency implementation. Instances are picklable
and are shared between the :class:`.Actor` and its
:class:`.ActorProxyMonitor`.
This is an abstract class, derived classes must implement the
``start`` method.
:param concurrency: string indicating the concurrency implementation.
Valid choices are ``monitor``, ``process``, ``thread``.
:param actor_class: :class:`.Actor` or one of its subclasses.
:param timeout: timeout in seconds for the actor.
:param kwargs: additional key-valued arguments to be passed to the actor
constructor.
'''
_creation_counter = 0
monitors = None
managed_actors = None
terminated_actors = None
registered = None
actor_class = Actor
def make(self, kind, cfg, name, aid, **kw):
self.__class__._creation_counter += 1
self.aid = aid
self.age = self.__class__._creation_counter
self.name = name or self.actor_class.__name__.lower()
self.kind = kind
self.cfg = cfg
self.params = kw
self.identity = aid
return self.create_actor()
@property
def unique_name(self):
return '%s(%s)' % (self.name, self.aid)
def __repr__(self):
return self.unique_name
__str__ = __repr__
def before_start(self, actor):
pass
def is_process(self):
return False
def is_arbiter(self):
return False
def is_monitor(self):
return False
[docs] def selector(self):
'''Return a selector instance.
By default it return nothing so that the best handler for the
system is chosen.
'''
return SELECTORS[self.cfg.selector]()
def get_actor(self, actor, aid, check_monitor=True):
if aid == actor.aid:
return actor
elif aid == 'monitor':
return actor.monitor
[docs] def spawn(self, actor, aid=None, **params):
'''Spawn a new actor from ``actor``.
'''
aid = aid or create_aid()
future = actor.send('arbiter', 'spawn', aid=aid, **params)
return actor_proxy_future(aid, future)
[docs] def run_actor(self, actor):
'''Start running the ``actor``.
'''
set_actor(actor)
actor.mailbox.start_serving()
actor._loop.run_forever()
def add_monitor(self, actor, monitor_name, **params):
raise RuntimeError('Cannot add monitors to %s' % actor)
[docs] def setup_event_loop(self, actor):
'''Set up the event loop for ``actor``.
'''
actor._logger = self.cfg.configured_logger(actor.name)
loop = EventLoop(self.selector(), logger=actor._logger,
iothreadloop=True, cfg=actor.cfg)
actor.mailbox = self.create_mailbox(actor, loop)
return loop
[docs] def hand_shake(self, actor):
'''Perform the hand shake for ``actor``
The hand shake occurs when the ``actor`` is in starting state.
It performs the following actions:
* set the ``actor`` as the actor of the current thread
* bind two additional callbacks to the ``start`` event
* fire the ``start`` event
If the hand shake is successful, the actor will eventually
results in a running state.
'''
try:
assert actor.state == ACTOR_STATES.STARTING
if actor.cfg.debug:
actor.logger.debug('starting handshake')
a = get_actor()
if a is not actor and a is not actor.monitor:
set_actor(actor)
actor.bind_event('start', self._switch_to_run)
actor.bind_event('start', self.periodic_task)
actor.bind_event('start', self._acknowledge_start)
actor.fire_event('start')
except Exception as exc:
actor.stop(exc)
def create_actor(self):
self.daemon = False
self.params['monitor'] = get_proxy(self.params['monitor'])
# make sure these parameters are picklable
# pickle.dumps(self.params)
return ActorProxyMonitor(self)
[docs] def create_mailbox(self, actor, loop):
'''Create the mailbox for ``actor``.'''
client = MailboxClient(actor.monitor.address, actor, loop)
loop.call_soon_threadsafe(self.hand_shake, actor)
client.bind_event('finish', lambda _, **kw: loop.stop())
return client
[docs] def periodic_task(self, actor, **kw):
'''Implement the :ref:`actor period task <actor-periodic-task>`.
This is an internal method called periodically by the
:attr:`.Actor._loop` to ping the actor monitor.
If successful return a :class:`~asyncio.Future` called
back with the acknowledgement from the monitor.
'''
actor.next_periodic_task = None
ack = None
if actor.is_running():
if actor.cfg.debug:
actor.logger.debug('notify monitor')
# if an error occurs, shut down the actor
ack = actor.send('monitor', 'notify', actor.info())
add_errback(ack, actor.stop)
actor.fire_event('periodic_task')
next = max(ACTOR_TIMEOUT_TOLE*actor.cfg.timeout, MIN_NOTIFY)
else:
next = 0
actor.next_periodic_task = actor._loop.call_later(
min(next, MAX_NOTIFY), self.periodic_task, actor)
return ack
[docs] def stop(self, actor, exc=None, exit_code=0):
'''Gracefully stop the ``actor``.
'''
if actor.state <= ACTOR_STATES.RUN:
# The actor has not started the stopping process. Starts it now.
actor.state = ACTOR_STATES.STOPPING
actor.event('start').clear()
if exc:
if not exit_code:
exit_code = getattr(exc, 'exit_code', 1)
if exit_code == 1:
exc_info = sys.exc_info()
if exc_info[0] is not None:
actor.logger.critical('Stopping', exc_info=exc_info)
else:
actor.logger.critical('Stopping: %s', exc)
elif exit_code == 2:
actor.logger.error(str(exc))
elif exit_code:
actor.stream.writeln(str(exc))
#
actor.exit_code = exit_code
stopping = actor.fire_event('stopping')
self.close_executor(actor)
if not stopping.done() and actor._loop.is_running():
actor.logger.debug('asynchronous stopping')
cbk = lambda _: self._stop_actor(actor)
return chain_future(stopping, callback=cbk, errback=cbk)
else:
if actor.logger:
actor.logger.debug('stopping')
return self._stop_actor(actor)
elif actor.stopped():
return self._stop_actor(actor, True)
[docs] def close_executor(self, actor):
'''Close the :meth:`executor`'''
executor = actor._loop._default_executor
if executor:
actor.logger.debug('Waiting for executor shutdown')
executor.shutdown()
actor._loop._default_executor = None
def _install_signals(self, actor):
proc_name = "%s-%s" % (actor.cfg.proc_name, actor.name)
if system.set_proctitle(proc_name):
actor.logger.debug('Set process title to %s',
system.get_proctitle())
system.set_owner_process(actor.cfg.uid, actor.cfg.gid)
if signal:
actor.logger.debug('Installing signals')
for sig in system.EXIT_SIGNALS:
try:
actor._loop.add_signal_handler(
sig, self.handle_exit_signal, actor, sig)
except ValueError:
pass
def _stop_actor(self, actor, finished=False):
'''Exit from the :class:`.Actor` domain.'''
if finished:
if actor._loop.is_running():
actor.logger.critical('Event loop still running when stopping')
actor._loop.stop()
return True
#
actor.state = ACTOR_STATES.CLOSE
if actor._loop.is_running():
actor.logger.debug('Closing mailbox')
actor.mailbox.close()
else:
actor.logger.debug('Exiting actor with exit code 1')
actor.exit_code = 1
actor.mailbox.abort()
return actor.stop()
def _switch_to_run(self, actor, exc=None):
if exc is None and actor.state < ACTOR_STATES.RUN:
actor.state = ACTOR_STATES.RUN
elif exc:
actor.stop(exc)
def _acknowledge_start(self, actor, exc=None):
if exc is None:
actor.logger.info('started')
else:
actor.stop(exc)
def _remove_actor(self, monitor, actor, log=True):
raise RuntimeError('Cannot remove actor')
class ProcessMixin(object):
def is_process(self):
return True
def before_start(self, actor): # pragma nocover
actor.start_coverage()
self._install_signals(actor)
def handle_exit_signal(self, actor, sig):
actor.logger.warning("Got %s. Stopping.", system.SIG_NAMES.get(sig))
actor._loop.exit_signal = sig
raise _StopError
class MonitorMixin(object):
def create_actor(self):
self.managed_actors = {}
self.terminated_actors = []
actor = self.actor_class(self)
actor.bind_event('on_info', self._info_monitor)
return actor
def start(self):
'''does nothing,'''
pass
def get_actor(self, actor, aid, check_monitor=True):
# Delegate get_actor to the arbiter
if aid == actor.aid:
return actor
elif aid == 'monitor':
return actor.monitor or actor
elif aid in self.managed_actors:
return self.managed_actors[aid]
elif actor.monitor and check_monitor:
return actor.monitor.get_actor(aid)
@property
def pid(self):
return current_process().pid
def spawn(self, monitor, kind=None, **params):
'''Spawn a new :class:`Actor` and return its
:class:`.ActorProxyMonitor`.
'''
proxy = _spawn_actor(kind, monitor, **params)
# Add to the list of managed actors if this is a remote actor
if isinstance(proxy, Actor):
self._register(proxy)
return proxy
else:
proxy.monitor = monitor
self.managed_actors[proxy.aid] = proxy
future = actor_proxy_future(proxy)
proxy.start()
return future
def manage_actors(self, monitor, stop=False):
'''Remove :class:`Actor` which are not alive from the
:class:`PoolMixin.managed_actors` and return the number of actors
still alive.
:parameter stop: if ``True`` stops all alive actor.
'''
alive = 0
if self.managed_actors:
for aid, actor in list(iteritems(self.managed_actors)):
alive += self.manage_actor(monitor, actor, stop)
return alive
def manage_actor(self, monitor, actor, stop=False):
'''If an actor failed to notify itself to the arbiter for more than
the timeout, stop the actor.
:param actor: the :class:`Actor` to manage.
:param stop: if ``True``, stop the actor.
:return: if the actor is alive 0 if it is not.
'''
if not monitor.is_running():
stop = True
if not actor.is_alive():
if not actor.should_be_alive() and not stop:
return 1
actor.join()
monitor._remove_actor(actor)
return 0
timeout = None
started_stopping = bool(actor.stopping_start)
# if started_stopping is True, set stop to True
stop = stop or started_stopping
if not stop and actor.notified:
gap = time() - actor.notified
stop = timeout = gap > actor.cfg.timeout
if stop: # we are stopping the actor
dt = actor.should_terminate()
if not actor.mailbox or dt:
if not actor.mailbox:
monitor.logger.warning('Terminating %s. No mailbox.',
actor)
else:
monitor.logger.warning('Terminating %s. Could not stop '
'after %.2f seconds.', actor, dt)
actor.terminate()
self.terminated_actors.append(actor)
self._remove_actor(monitor, actor)
return 0
elif not started_stopping:
if timeout:
monitor.logger.warning('Stopping %s. Timeout %.2f',
actor, timeout)
else:
monitor.logger.info('Stopping %s.', actor)
monitor.send(actor, 'stop')
return 1
def spawn_actors(self, monitor):
'''Spawn new actors if needed.
'''
to_spawn = monitor.cfg.workers - len(self.managed_actors)
if monitor.cfg.workers and to_spawn > 0:
for _ in range(to_spawn):
monitor.spawn()
def stop_actors(self, monitor):
"""Maintain the number of workers by spawning or killing as required
"""
if monitor.cfg.workers:
num_to_kill = len(self.managed_actors) - monitor.cfg.workers
for i in range(num_to_kill, 0, -1):
w, kage = 0, sys.maxsize
for worker in itervalues(self.managed_actors):
age = worker.impl.age
if age < kage:
w, kage = w, age
self.manage_actor(monitor, w, True)
def _close_actors(self, monitor):
# Close all managed actors at once and wait for completion
waiter = Future(loop=monitor._loop)
def _finish():
monitor.remove_callback('periodic_task', _check)
waiter.set_result(None)
def _check(_, **kw):
if not self.managed_actors:
monitor._loop.call_soon(_finish)
monitor.bind_event('periodic_task', _check)
return waiter
def _remove_actor(self, monitor, actor, log=True):
removed = self.managed_actors.pop(actor.aid, None)
if log and removed:
log = False
monitor.logger.warning('Removed %s', actor)
if monitor.monitor:
monitor.monitor._remove_actor(actor, log)
return removed
def _info_monitor(self, actor, info=None):
if actor.started():
info['actor'].update({'concurrency': actor.cfg.concurrency,
'workers': len(self.managed_actors)})
info['workers'] = [a.info for a in itervalues(self.managed_actors)
if a.info]
return info
def _register(self, arbiter):
raise HaltServer('Critical error')
############################################################################
# CONCURRENCY IMPLEMENTATIONS
[docs]class MonitorConcurrency(MonitorMixin, Concurrency):
''':class:`.Concurrency` class for a :class:`.Monitor`.
Monitors live in the **main thread** of the master process and
therefore do not require to be spawned.
'''
def is_monitor(self):
return True
def setup_event_loop(self, actor):
actor._logger = self.cfg.configured_logger(actor.name)
actor.mailbox = ProxyMailbox(actor)
loop = actor.mailbox._loop
loop.call_soon(actor.start)
return loop
def run_actor(self, actor):
actor._loop.call_soon(self.hand_shake, actor)
raise MonitorStarted
def create_mailbox(self, actor, loop):
raise NotImplementedError
def close_executor(self, actor):
pass
[docs] def periodic_task(self, monitor, **kw):
'''Override the :meth:`.Concurrency.periodic_task` to implement
the :class:`.Monitor` :ref:`periodic task <actor-periodic-task>`.'''
interval = 0
monitor.next_periodic_task = None
if monitor.started():
interval = MONITOR_TASK_PERIOD
self.manage_actors(monitor)
#
if monitor.is_running():
self.spawn_actors(monitor)
self.stop_actors(monitor)
elif monitor.cfg.debug:
monitor.logger.debug('still stopping')
#
monitor.fire_event('periodic_task')
#
if not monitor.closed():
monitor.next_periodic_task = monitor._loop.call_later(
interval, self.periodic_task, monitor)
def _stop_actor(self, actor, finished=False):
if finished:
return
def _cleanup(_):
if actor.cfg.debug:
actor.logger.debug('monitor is now stopping')
actor.state = ACTOR_STATES.CLOSE
if actor.next_periodic_task:
actor.next_periodic_task.cancel()
self.stop(actor)
return chain_future(self._close_actors(actor), callback=_cleanup,
errback=_cleanup)
[docs]class ArbiterConcurrency(MonitorMixin, ProcessMixin, Concurrency):
'''Concurrency implementation for the ``arbiter``
'''
pidfile = None
def is_arbiter(self):
return True
def create_actor(self):
if self.cfg.daemon:
if not self.cfg.pidfile:
self.cfg.set('pidfile', 'pulsar.pid')
system.daemonize(keep_fds=logger_fds())
self.identity = self.name
actor = super(ArbiterConcurrency, self).create_actor()
self.monitors = OrderedDict()
self.registered = {'arbiter': actor}
actor.bind_event('start', self._start_arbiter)
return actor
[docs] def get_actor(self, actor, aid, check_monitor=True):
'''Given an actor unique id return the actor proxy.'''
a = super(ArbiterConcurrency, self).get_actor(actor, aid)
if a is None:
if aid in self.monitors: # Check in monitors aid
return self.monitors[aid]
elif aid in self.managed_actors:
return self.managed_actors[aid]
elif aid in self.registered:
return self.registered[aid]
else: # Finally check in workers in monitors
for m in itervalues(self.monitors):
a = m.get_actor(aid, check_monitor=False)
if a is not None:
return a
else:
return a
[docs] def add_monitor(self, actor, monitor_name, **params):
'''Add a new ``monitor``.
:param monitor_class: a :class:`.Monitor` class.
:param monitor_name: a unique name for the monitor.
:param kwargs: dictionary of key-valued parameters for the monitor.
:return: the :class:`.Monitor` added.
'''
if monitor_name in self.registered:
raise KeyError('Monitor "%s" already available' % monitor_name)
params.update(actor.actorparams())
params['name'] = monitor_name
params['kind'] = 'monitor'
return actor.spawn(**params)
[docs] def before_start(self, actor): # pragma nocover
'''Daemonise the system if required.
'''
cfg = actor.cfg
if cfg.reload:
assert not cfg.daemon, "Autoreload not compatible with daemon mode"
if autoreload.start():
return
actor.start_coverage()
self._install_signals(actor)
[docs] def create_mailbox(self, actor, loop):
'''Override :meth:`.Concurrency.create_mailbox` to create the
mailbox server.
'''
mailbox = TcpServer(MailboxProtocol, loop, ('127.0.0.1', 0),
name='mailbox')
# when the mailbox stop, close the event loop too
mailbox.bind_event('stop', lambda _, **kw: loop.stop())
mailbox.bind_event(
'start',
lambda _, **kw: loop.call_soon(self.hand_shake, actor))
return mailbox
[docs] def periodic_task(self, actor, **kw):
'''Override the :meth:`.Concurrency.periodic_task` to implement
the :class:`.Arbiter` :ref:`periodic task <actor-periodic-task>`.'''
interval = 0
actor.next_periodic_task = None
#
if actor.started():
# managed actors job
self.manage_actors(actor)
for m in list(itervalues(self.monitors)):
if m.closed():
actor._remove_actor(m)
interval = MONITOR_TASK_PERIOD
if not actor.is_running() and actor.cfg.debug:
actor.logger.debug('still stopping')
#
actor.fire_event('periodic_task')
if not actor.closed():
actor.next_periodic_task = actor._loop.call_later(
interval, self.periodic_task, actor)
if actor.cfg.reload and autoreload.check_changes():
actor.stop(exit_code=autoreload.EXIT_CODE)
def _stop_actor(self, actor, finished=False):
'''Stop the pools the message queue and remaining actors
'''
if finished:
self._stop_arbiter(actor)
elif actor._loop.is_running():
self._exit_arbiter(actor)
else:
actor.logger.debug('Restarts event loop to stop actors')
actor._loop.call_soon(self._exit_arbiter, actor)
actor._run(False)
def _exit_arbiter(self, actor, done=False):
if done:
actor.logger.debug('Closing mailbox server')
actor.state = ACTOR_STATES.CLOSE
actor.mailbox.close()
else:
monitors = len(self.monitors)
managed = len(self.managed_actors)
if monitors or managed:
actor.logger.debug('Closing %d monitors and %d actors',
monitors, managed)
async(self._close_all(actor))
else:
self._exit_arbiter(actor, True)
def _close_all(self, actor):
# Close al monitors at once
try:
for m in itervalues(self.monitors):
yield From(m.stop())
yield From(self._close_actors(actor))
except Exception:
actor.logger.exception('Exception while closing arbiter')
self._exit_arbiter(actor, True)
def _remove_actor(self, arbiter, actor, log=True):
a = super(ArbiterConcurrency, self)._remove_actor(arbiter, actor,
False)
b = self.registered.pop(actor.name, None)
c = self.monitors.pop(actor.aid, None)
removed = a or b or c
if removed and log:
arbiter.logger.warning('Removed %s', actor)
return removed
def _stop_arbiter(self, actor): # pragma nocover
p = self.pidfile
if p is not None:
actor.logger.debug('Removing %s' % p.fname)
p.unlink()
self.pidfile = None
if self.managed_actors:
actor.state = ACTOR_STATES.TERMINATE
actor.collect_coverage()
exit_code = actor.exit_code or 0
if exit_code == autoreload.EXIT_CODE:
actor.stream.writeln("\nCode changed, reloading server")
else:
actor.stream.writeln("\nBye (exit code = %s)" % exit_code)
try:
actor.cfg.when_exit(actor)
except Exception:
pass
if exit_code:
sys.exit(exit_code)
def _start_arbiter(self, actor, exc=None):
if current_process().daemon:
raise HaltServer('Cannot create the arbiter in a daemon process')
if not os.environ.get('SERVER_SOFTWARE'):
os.environ["SERVER_SOFTWARE"] = pulsar.SERVER_SOFTWARE
pidfile = actor.cfg.pidfile
if pidfile is not None:
actor.logger.info('Create pid file %s', pidfile)
try:
p = Pidfile(pidfile)
p.create(actor.pid)
except RuntimeError as e:
raise HaltServer('ERROR. %s' % str(e), exit_code=2)
self.pidfile = p
def _info_monitor(self, actor, info=None):
data = info
monitors = {}
for m in itervalues(self.monitors):
info = m.info()
if info:
actor = info['actor']
monitors[actor['name']] = info
server = data.pop('actor')
server.update({'version': pulsar.__version__,
'name': pulsar.SERVER_NAME,
'number_of_monitors': len(self.monitors),
'number_of_actors': len(self.managed_actors)})
server.pop('is_process', None)
server.pop('ppid', None)
server.pop('actor_id', None)
server.pop('age', None)
data['server'] = server
data['workers'] = [a.info for a in itervalues(self.managed_actors)]
data['monitors'] = monitors
return data
def _register(self, actor):
self.registered[actor.name] = actor
self.monitors[actor.aid] = actor
def run_actor(self):
self._actor = actor = self.actor_class(self)
try:
actor.start()
finally:
try:
actor.cfg.when_exit(actor)
except Exception: # pragma nocover
pass
log = actor.logger or logger()
self.stop_coverage(actor)
log.info('Bye from "%s"', actor)
class ActorProcess(ProcessMixin, Concurrency, Process):
'''Actor on a Operative system process.
Created using the python multiprocessing module.
'''
def run(self): # pragma nocover
# The coverage for this process has not yet started
run_actor(self)
def stop_coverage(self, actor):
actor.stop_coverage()
class ActorThread(Concurrency, Thread):
'''Actor on a thread of the master process
'''
_actor = None
def before_start(self, actor):
self.set_loop(actor._loop)
def run(self):
run_actor(self)
def stop_coverage(self, actor):
pass
concurrency_models = {'arbiter': ArbiterConcurrency,
'monitor': MonitorConcurrency,
'thread': ActorThread,
'process': ActorProcess}
def _spawn_actor(kind, monitor, cfg=None, name=None, aid=None, **kw):
# Internal function which spawns a new Actor and return its
# ActorProxyMonitor.
# *cls* is the Actor class
# *monitor* can be either the arbiter or a monitor
if monitor:
params = monitor.actorparams()
name = params.pop('name', name)
aid = params.pop('aid', aid)
cfg = params.pop('cfg', cfg)
# get config if not available
if cfg is None:
if monitor:
cfg = monitor.cfg.copy()
else:
cfg = Config()
if not aid:
aid = create_aid()
if not monitor: # monitor not available, this is the arbiter
assert kind == 'arbiter'
name = kind
params = {}
if not cfg.exc_id:
cfg.set('exc_id', aid)
#
for key, value in iteritems(kw):
if key in cfg.settings:
cfg.set(key, value)
else:
params[key] = value
#
if monitor:
kind = kind or cfg.concurrency
if not kind:
raise TypeError('Cannot spawn')
maker = concurrency_models.get(kind)
if maker:
c = maker()
return c.make(kind, cfg, name, aid, monitor=monitor, **params)
else:
raise ValueError('Concurrency %s not supported in pulsar' % kind)