Documentation for pulsar 0.9.2. For development docs, go here.
from collections import namedtuple
from pulsar import task
from pulsar.utils.pep import to_string, iteritems
__all__ = ['Query', 'CompiledQuery', 'OdmError',
'ModelNotFound', 'QueryError']
def int_or_float(v):
v = float(v)
i = int(v)
return i if v == i else v
JSPLITTER = '__'
pass_through = lambda x: x
str_lower_case = lambda x: to_string(x).lower()
range_lookups = {
'gt': int_or_float,
'ge': int_or_float,
'lt': int_or_float,
'le': int_or_float,
'contains': pass_through,
'startswith': pass_through,
'endswith': pass_through,
'icontains': str_lower_case,
'istartswith': str_lower_case,
'iendswith': str_lower_case}
lookup_value = namedtuple('lookup_value', 'type value')
class OdmError(Exception):
pass
class QueryError(OdmError):
pass
class ModelNotFound(QueryError):
'''Raised when a :meth:`.Manager.get` method does not find any model
'''
pass
class AbstractQuery(object):
_meta = None
def filter(self, **kwargs):
raise NotImplementedError
def exclude(self, **kwargs):
raise NotImplementedError
def union(self, *queries):
raise NotImplementedError
def intersect(self, *queries):
raise NotImplementedError
def where(self, *expressions):
raise NotImplementedError
def count(self):
raise NotImplementedError
def all(self):
raise NotImplementedError
def query_op(f):
'''Decorator for a :class:`Query` operation.
'''
name = f.__name__
def _(self, *args, **kwargs):
if self._store.has_query(name):
q = self._clone()
return f(q, *args, **kwargs)
else:
raise QueryError('Cannot use "%s" query with %s' %
(name, self._store))
_.__doc__ = f.__doc__
_.__name__ = name
return _
def update_dict(a, b):
if a is None:
a = {}
a.update(b)
return a
def update_tuple(a, b):
if a is None:
a = ()
return a + b
[docs]class Query(object):
'''A query for data in a model store.
A :class:`Query` is produced in terms of a given :class:`.Manager`,
using the :meth:`~.Manager.query` method.
'''
_filters = None
_joins = None
_excludes = None
_unions = None
_intersections = None
_where = None
_compiled = None
def __init__(self, manager, store=None):
self._manager = manager
self._store = store or manager._read_store
@property
def _meta(self):
return self._manager._meta
@property
def _mapper(self):
return self._manager._mapper
@property
def _loop(self):
return self._store._loop
@query_op
[docs] def filter(self, **kwargs):
'''Create a new :class:`Query` with additional clauses.
The clauses corresponds to ``where`` or ``limit`` in a
``SQL SELECT`` statement.
:params kwargs: dictionary of limiting clauses.
For example::
qs = manager.query().filter(group='planet')
'''
if kwargs:
self._filters = update_dict(self._filters, kwargs)
return self
@query_op
[docs] def exclude(self, **kwargs):
'''Create a new :class:`Query` with additional clauses.
The clauses correspond to ``EXCEPT`` in a ``SQL SELECT`` statement.
Using an equivalent example to the :meth:`filter` method::
qs = manager.query()
result1 = qs.exclude(group='planet')
result2 = qs.exclude(group=('planet','stars'))
'''
if kwargs:
self._excludes = update_dict(self._excludes, kwargs)
return self
@query_op
[docs] def union(self, *queries):
'''Create a new :class:`Query` obtained form unions.
:param queries: positional :class:`Query` parameters to create an
union with.
For example, lets say we want to have the union
of two queries obtained from the :meth:`filter` method::
query = mymanager.query()
qs = query.filter(field1='bla').union(query.filter(field2='foo'))
'''
if queries:
self._unions = update(self._unions, queries)
return self
@query_op
[docs] def intersect(self, *queries):
'''Create a new :class:`Query` obtained form intersection.
:param queries: positional :class:`Query` parameters to create an
intersection with.
For example, lets say we want to have the intersection
of two queries obtained from the :meth:`filter` method::
query = mymanager.query()
q1 = query.filter(field2='foo')
qs = query.filter(field1='bla').intersect(q1)
'''
if queries:
self._intersections = update(self._intersections, queries)
return self
@query_op
def where(self, *expressions):
if expressions:
self._where = update(self._where, expressions)
return self
@query_op
def join(self):
raise NotImplementedError
@task
[docs] def count(self):
'''Count the number of objects selected by this :class:`Query`.
This method is efficient since the :class:`Query` does not
receive any data from the server apart from the number of
matched elements.'''
return self.compiled().count()
@task
[docs] def all(self):
'''All objects selected by this :class:`Query`.
'''
return self.compiled().all()
@task
[docs] def delete(self):
'''Delete all objects selected by this :class:`.Query`.
'''
return self.compiled().delete()
# INTERNALS
def compiled(self):
if not self._compiled:
self._compiled = self._manager._read_store.compile_query(self)
return self._compiled
def _clone(self):
cls = self.__class__
q = cls.__new__(cls)
d = q.__dict__
for name, value in self.__dict__.items():
if name not in ('_compiled',):
if isinstance(value, (list, dict)):
value = copy(value)
d[name] = value
return q
[docs]class CompiledQuery(object):
'''A signature class for implementing a :class:`.Query` in a
pulsar data :class:`.Store`.
.. attribute:: _query
The underlying :class:`.Query`
.. attribute:: _store
The :class:`.Store` executing the :attr:`query`
'''
def __init__(self, store, query):
self._store = store
self._query = query
self._build()
@property
def _meta(self):
return self._query._meta
@property
def _manager(self):
return self._query._manager
@property
def _mapper(self):
return self._query._mapper
[docs] def count(self):
'''Count the number of elements matching the :attr:`query`.
'''
raise NotImplementedError
[docs] def all(self):
'''Fetch all matching elements from the server.
Return a :class:`~asyncio.Future`
'''
raise NotImplementedError
[docs] def delete(self):
'''Delete all matching elements from the server.
Return a :class:`~asyncio.Future`
'''
raise NotImplementedError
[docs] def models(self, data):
'''Build a list of models from a list of dictionaries.
Uses the :meth:`.Store.build_model` method
:param data: list of dictionaries
:return: a list of models
'''
build = self._store.build_model
manager = self._manager
return [build(manager, params) for params in data]
def _build(self):
'''Compile the :attr:`query`
'''
raise NotImplementedError
[docs] def aggregate(self, kwargs):
'''Aggregate lookup parameters.'''
meta = self._meta
store = self._store
fields = meta.dfields
field_lookups = {}
for name, value in iteritems(kwargs):
bits = name.split(JSPLITTER)
field_name = bits.pop(0)
if field_name not in fields:
raise QueryError(('Could not filter on model "%s". Field '
'"%s" does not exist.' % (meta, field_name)))
field = fields[field_name]
store_name = field.store_name
lookup = None
if bits:
bits = [n.lower() for n in bits]
if bits[-1] == 'in':
bits.pop()
elif bits[-1] in range_lookups:
lookup = bits.pop()
remaining = JSPLITTER.join(bits)
if lookup: # this is a range lookup
store_name, nested = field.get_lookup(remaining,
QueryError)
lookups = get_lookups(store_name, field_lookups)
lookups.append(lookup_value(lookup, (value, nested)))
continue
elif remaining: # Not a range lookup, must be a nested filter
value = field.filter(self.session, remaining, value)
lookups = get_lookups(store_name, field_lookups)
if not isinstance(value, (list, tuple, set)):
value = (value,)
for v in value:
if isinstance(v, Query):
v = lookup_value('query', v.compiled())
else:
v = lookup_value('value', field.to_store(v, store))
lookups.append(v)
return field_lookups
def get_lookups(store_name, field_lookups):
lookups = field_lookups.get(store_name)
if lookups is None:
lookups = []
field_lookups[store_name] = lookups
return lookups