Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.data.odm.model

import sys
from inspect import isclass
from copy import copy
from base64 import b64encode
from collections import Mapping

from pulsar.utils.structures import OrderedDict
from pulsar.utils.pep import ispy3k, iteritems, to_string

try:
    from pulsar.utils.libs import Model as CModelBase
except ImportError:
    CModelBase = None

from .manager import class_prepared, makeManyToManyRelatedManager
from .fields import (Field, AutoIdField, ForeignKey, CompositeIdField,
                     FieldError, NONE_EMPTY)
from .query import OdmError

from ..store import Command, REV_KEY


primary_keys = ('id', 'ID', 'pk', 'PK')


def rev_key(value):
    try:
        return int(value)
    except ValueError:
        return to_string(value)


def is_private_field(field):
    return field.startswith('_') or field == 'Type'


def get_fields(bases, attrs):
    #
    fields = []
    for name, field in list(attrs.items()):
        if isinstance(field, Field):
            fields.append((name, attrs.pop(name)))
    #
    fields = sorted(fields, key=lambda x: x[1].creation_counter)
    #
    for base in bases:
        if hasattr(base, '_meta'):
            fields = list((name, copy(field)) for name, field
                          in base._meta.dfields.items()) + fields
    #
    return OrderedDict(fields)


def make_app_label(new_class, app_label=None):
    if app_label is None:
        model_module = sys.modules[new_class.__module__]
        try:
            bits = model_module.__name__.split('.')
            app_label = bits.pop()
            if app_label == 'models':
                app_label = bits.pop()
        except:
            app_label = ''
    return app_label


[docs]class ModelMeta(object): '''A class for storing meta data for a :class:`.Model` class. To override default behaviour you can specify the ``Meta`` class as an inner class of :class:`.Model` in the following way:: from pulsar.apps.data import odm class MyModel(odm.Model): timestamp = odm.FloatField() ... class Meta: name = 'custom' :parameter register: if ``True`` (default), this :class:`ModelMeta` is registered in the global models hashtable. :parameter abstract: Check the :attr:`abstract` attribute. :parameter app_label: Check the :attr:`app_label` attribute. :parameter name: Check the :attr:`name` attribute. :parameter table_name: Check the :attr:`table_name` attribute This is the list of attributes and methods available. All attributes, but the ones mentioned above, are initialised by the object relational mapper. .. attribute:: abstract If ``True``, This is an abstract Meta class. .. attribute:: model :class:`Model` for which this instance is the database metadata container. .. attribute:: name Usually it is the :class:`.Model` class name in lower-case, but it can be customised. .. attribute:: app_label Unless specified it is the name of the directory or file (if at top level) containing the :class:`.Model` definition. It can be customised. .. attribute:: table_name The table_name which is by default given by ``<app_label>_<name>``. .. attribute:: dfields dictionary of all :class:`.Field` in :attr:`model` .. attribute:: scalarfields Ordered list of all :class:`Field` which are not :class:`.StructureField`. The order is the same as in the :class:`Model` definition. .. attribute:: indexes List of :class:`.Field` which are indexes (:attr:`~.Field.index` attribute set to ``True``). .. attribute:: pk The :class:`.Field` representing the primary key. .. attribute:: related Dictionary of :class:`.RelatedManager` for the :attr:`model`. It is created at runtime by the object data mapper. .. attribute:: manytomany List of :class:`ManyToManyField` names for the :attr:`model`. This information is useful during registration. ''' def __init__(self, model, fields, app_label=None, table_name=None, name=None, register=True, pkname=None, abstract=False, **kwargs): self.model = model self.abstract = abstract self.scalarfields = [] self.manytomany = [] self.indexes = [] self.manytomany = [] self.dfields = {} self.converters = {REV_KEY: rev_key} self.related = {} self.model._meta = self self.app_label = make_app_label(model, app_label) self.name = (name or model.__name__).lower() if not table_name: if self.app_label: table_name = '%s_%s' % (self.app_label, self.name) else: table_name = self.name self.table_name = table_name # # Check if PK field exists pk = None pkname = pkname or primary_keys[0] scalarfields = [] for name in fields: field = fields[name] if is_private_field(name): raise FieldError("%s is a reserved field name" % name) if field.primary_key: if pk is not None: raise FieldError("Primary key already available %s." % name) pk = field pkname = name elif name in primary_keys: raise FieldError('%s is a reserved field name for primary keys' % name) if pk is None and not self.abstract: # ID field not available, create one pk = AutoIdField() if not self.abstract: fields.pop(pkname, None) for name, field in fields.items(): field.register_with_model(name, model) pk.register_with_model(pkname, model) def __repr__(self): return self.table_name def __str__(self): return self.__repr__() @property def _meta(self): return self
[docs] def pkname(self): '''Primary key name. A shortcut for ``self.pk.name``.''' return self.pk.name
[docs] def pk_to_python(self, value, backend): '''Convert the primary key ``value`` to a valid python representation. ''' return self.pk.to_python(value, backend)
[docs] def store_data(self, instance, store, action): '''Generator of ``field, value`` pair for the data ``store``. Perform validation for ``instance`` and can raise :class:`.FieldError` if invalid values are stored in ``instance``. ''' fields = instance._meta.dfields if action == Command.INSERT: for field in fields.values(): name = field.store_name value = field.to_store(instance.get_raw(name), store) if ((value in NONE_EMPTY) and field.required and not isinstance(field, AutoIdField)): raise FieldError("Field '%s' is required for '%s'." % (name, self)) if value is not None: yield name, value rest = set(instance) - set(fields) else: rest = instance # for name in rest: if name in fields: value = fields[name].to_store(instance[name], store) elif name in self.converters: value = self.converters[name](instance[name]) elif not is_private_field(name): value = instance[name] else: continue if value is not None: yield name, value
class ModelType(type(dict)): '''Model metaclass''' def __new__(cls, name, bases, attrs): meta = attrs.pop('Meta', None) if isclass(meta): meta = dict(((k, v) for k, v in meta.__dict__.items() if not k.startswith('__'))) else: meta = meta or {} cls.extend_meta(meta, attrs) fields = get_fields(bases, attrs) attrs['__slots__'] = ('_access_cache', '_modified') new_class = super(ModelType, cls).__new__(cls, name, bases, attrs) ModelMeta(new_class, fields, **meta) class_prepared.fire(new_class) return new_class @classmethod def extend_meta(cls, meta, attrs): for name in ('register', 'abstract'): if name in attrs: meta[name] = attrs.pop(name)
[docs]class Model(ModelType('ModelBase', (dict,), {'abstract': True})): '''A model is a python ``dict`` which represents and item/row in a data-store collection/table. Fields values can be accessed via the dictionary interface:: model['field1'] or the dotted interface:: model.field1 which is equivalent to:: model.get('field1') .. attribute:: _meta Class attribute which represents the :class:`.ModelMeta` for this model. ''' abstract = True def __init__(self, *args, **kwargs): self._access_cache = set() self._modified = None self.update(*args, **kwargs) # reset the modified field set self._modified = set() def __getitem__(self, field): field = mstr(field) if field in primary_keys: field = self._meta.pkname() value = super(Model, self).__getitem__(field) if field not in self._access_cache: self._access_cache.add(field) if field in self._meta.converters: value = self._meta.converters[field](value) super(Model, self).__setitem__(field, value) return value def __setitem__(self, field, value): field, value = self._get_field_value(field, value) super(Model, self).__setitem__(field, value) def __getattr__(self, field): try: return self.__getitem__(field) except KeyError as e: return None def __eq__(self, other): if other.__class__ == self.__class__: return self.id == other.id else: return False def __ne__(self, other): return not self.__eq__(other) def get(self, field, default=None): try: return self.__getitem__(field) except KeyError: return default
[docs] def set(self, field, value, modify=True): '''Set the ``value`` at ``field`` If ``modify`` is ``True``, this method is equivalent to:: model[field] = value ''' if modify: self[field] = value else: super(Model, self).__setitem__(field, value)
[docs] def get_raw(self, field, default=None): '''Get the raw value at ``field`` This function does not apply field conversion. ''' try: return super(Model, self).__getitem__(field) except KeyError: return default
def update(self, *args, **kwargs): if len(args) == 1: iterable = args[0] super(Model, self).update(self._update_modify(iterable)) elif args: raise TypeError('expected at most 1 arguments, got %s' % len(args)) if kwargs: super(Model, self).update(self._update_modify(kwargs))
[docs] def to_json(self): '''Return a JSON serialisable dictionary representation. ''' return dict(self._to_json())
[docs] def save(self): '''Commit changes to backend data store. ''' mapper = self.get('_mapper') if mapper: return mapper[self._meta].save(self) else: raise OdmError('_mapper not available in %s' % self)
[docs] def delete(self): '''Delete this model from backend data store ''' mapper = self.get('_mapper') if mapper: return mapper[self._meta].delete(self) else: raise OdmError('_mapper not available in %s' % self)
# INTERNALS def _to_json(self): pk = self.get('id') if pk: yield self._meta.pk.name, pk for key in self: if not is_private_field(key): value = self[key] if value is not None: if key in self._meta.dfields: value = self._meta.dfields[key].to_json(value) elif isinstance(value, bytes): try: value = value.decode('utf-8') except Exception: value = b64encode(value).decode('utf-8') yield key, value def _update_modify(self, iterable): if isinstance(iterable, Mapping): iterable = iteritems(iterable) for field, value in iterable: yield self._get_field_value(field, value) def _get_field_value(self, field, value): field = mstr(field) if not is_private_field(field): if field in primary_keys: field = self._meta.pkname() if field in self._meta.dfields: f = self._meta.dfields[field] value = f.get_value(self, value) field = f.store_name if self._modified is not None: self._access_cache.discard(field) if field in self: if super(Model, self).__getitem__(field) != value: self._modified.add(field) else: self._modified.add(field) return field, value @classmethod def _many2many_through_model(cls, field): field.relmodel = cls if not field.related_name: field.related_name = '%s_set' % field.model._meta.name name_model = field.model._meta.name name_relmodel = field.relmodel._meta.name # The two models are the same. if name_model == name_relmodel: name_relmodel += '2' through = field.through # Create the through model if through is None: name = '{0}_{1}'.format(name_model, name_relmodel) class Meta: app_label = field.model._meta.app_label through = ModelType(name, (Model,), {'Meta': Meta}) field.through = through # The first field field1 = ForeignKey(field.model, related_name=field.name, related_manager_class=makeManyToManyRelatedManager( field.relmodel, name_model, name_relmodel) ) field1.register_with_model(name_model, through) # The second field field2 = ForeignKey(field.relmodel, related_name=field.related_name, related_manager_class=makeManyToManyRelatedManager( field.model, name_relmodel, name_model) ) field2.register_with_model(name_relmodel, through) pk = CompositeIdField(name_model, name_relmodel) pk.register_with_model('id', through)
PyModel = Model def create_model(name, **params): '''Create a :class:`.Model` class. :param name: Name of the model class. :param params: key-valued parameter to pass to the :class:`ModelMeta` constructor. :return: a local :class:`Model` class. ''' return ModelType(name, (Model,), params) if ispy3k: def mstr(s): if isinstance(s, bytes): return s.decode('utf-8') elif not isinstance(s, str): return str(s) else: return s else: def mstr(s): if isinstance(s, (bytes, unicode)): return s else: return str(s)