Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.data.odm.transaction

from pulsar import (EventHandler, InvalidOperation, chain_future, multi_async,
                    task)
from pulsar.utils.pep import iteritems

from .model import Model

from ..store import Command


__all__ = ['ModelDictionary']


class ModelDictionary(dict):

    def __contains__(self, model):
        return super(ModelDictionary, self).__contains__(self.meta(model))

    def __getitem__(self, model):
        return super(ModelDictionary, self).__getitem__(self.meta(model))

    def __setitem__(self, model, value):
        super(ModelDictionary, self).__setitem__(self.meta(model), value)

    def get(self, model, default=None):
        return super(ModelDictionary, self).get(self.meta(model), default)

    def pop(self, model, *args):
        return super(ModelDictionary, self).pop(self.meta(model), *args)

    def meta(self, model):
        return getattr(model, '_meta', model)


[docs]class Transaction(EventHandler): '''Transaction class for pipelining commands to a :class:`.Store`. A :class:`Transaction` is usually obtained via the :meth:`.Mapper.begin` method:: t = models.begin() or using the ``with`` context manager:: with models.begin() as t: ... .. attribute:: name Optional :class:`Transaction` name .. attribute:: mapper the :class:`.Mapper` which initialised this transaction. .. attribute:: _commands dictionary of commands for each :class:`.Store` in this transaction. .. attribute:: deleted Dictionary of list of ids deleted from the backend server after a commit operation. This dictionary is only available once the transaction has :attr:`finished`. .. attribute:: saved Dictionary of list of ids saved in the backend server after a commit operation. This dictionary is only available once the transaction has :attr:`finished`. ''' MANY_TIMES_EVENTS = ('pre_commit', 'pre_delete', 'post_commit', 'post_delete') def __init__(self, mapper, name=None): super(Transaction, self).__init__(mapper._loop) self.name = name or 'transaction' self.mapper = mapper self._commands = {} self._executed = None self.copy_many_times_events(mapper) def __enter__(self): return self def __exit__(self, type, value, traceback): if type is None: self.commit()
[docs] def execute(self, *args, **kw): '''Queue a command in the default data store. This method does not use the object data mapper. ''' ts = self.tstore(kw.get('store') or self.mapper._default_store) ts.commands.append(Command(args)) return self
[docs] def add(self, model): '''Add a ``model`` to the transaction. :param model: a :class:`.Model` instance. It must be registered with the :attr:`mapper` which created this :class:`Transaction`. :param action: Optional CRUD action to perform :return: the ``model``. ''' manager = self.mapper[model] ts = self.tstore(manager._store) ts.add(model) return model
[docs] def update(self, instance_or_query, **kw): '''Update an ``instance`` or a ``query``''' if isinstance(instance_or_query, Model): pkvalue = instance_or_query.id data = dict(instance_or_query) data.update(kw) manager = self.mapper[model] store = manager._store if store not in self._commands: self._commands[store] = [] self._commands[store].append(Command(instance_or_query, Command.UPDATE))
[docs] def tstore(self, store): '''Returns the :class:`TransactionStore` for ``store`` ''' if store not in self._commands: self._commands[store] = store.transaction() return self._commands[store]
[docs] def commit(self): '''Commit the transaction. This method can be invoked once only otherwise an :class:`.InvalidOperation` occurs. :return: a :class:`~asyncio.Future` which results in the list of transaction ''' if self._executed is None: self._executed = self._commit() else: raise InvalidOperation('Transaction already executed.')
[docs] def wait(self, callback=None): '''Waits for the transaction have finished. :param callback: optional function called back once the transaction has finished. The function receives one parameter only, the transaction. :return: a :class:`~asyncio.Future` ''' executed = self._executed if executed is None: executed = self.commit() return chain_future(executed, callback) if callback else executed
# INTERNAL COMMIT METHOD @task def _commit(self): # Run this method in the event loop so that it is thread safe executed = dict(((store, store.execute_transaction(commands)) for store, commands in iteritems(self._commands))) return multi_async(executed, loop=self._loop)