alchy

version travis coveralls license

A SQLAlchemy extension for its declarative ORM that provides enhancements for model classes, queries, and sessions.

Quickstart

Let’s see alchy in action. We’ll start with some model definitions.

from alchy import ModelBase, make_declarative_base
from sqlalchemy import orm, Column, types, ForeignKey

class Base(ModelBase):
    # extend/override ModelBase if necessary
    pass

Model = make_declarative_base(Base=Base)

class User(Model):
    __tablename__ = 'user'

    _id = Column(types.Integer(), primary_key=True)
    name = Column(types.String())
    email = Column(types.String())
    level = Column(types.Integer())

    items = orm.relationship('UserItem')

class UserItem(Model):
    # when no __tablename__ defined,
    # one is autogenerated using class name
    # like this:
    #__tablename__ = 'user_item'

    _id = Column(types.Integer(), primary_key=True)
    user_id = Column(types.Integer(), ForeignKey('user._id'))
    name = Column(types.String())

    user = orm.relationship('User')

Next, we need to interact with our database. For that we will use a alchy.manager.Manager.

from alchy import Manager

# Config can be either (1) dict, (2) class, or (3) module.
config = {
    'SQLALCHEMY_DATABASE_URI': 'sqlite://'
}

# Be sure to pass in our declarative base defined previously.
# This is needed so that Model.metadata operations like
# create_all(), drop_all(), and reflect() work.
db = Manager(config=config, Model=Model)

Create our database tables.

db.create_all()

Now, create some records.

# initialize using keyword args
user1 = User(name='Fred', email='fred@example.com')
# print('user1:', user1)

# ...or initialize using a dict
user2 = User({'name': 'Barney'})
# print('user2:', user2)

# update using either method as well
user2.update(email='barney@example.org')
user2.update({'email': 'barney@example.com'})
# print('user2 updated:', user2)

Add them to the database.

# there are several options for adding records

# add and commit in one step using positional args
db.add_commit(user1, user2)

# ...or add/commit using a list
users = [user1, user2]
db.add_commit(users)

# ...or separate add and commit calls
db.add(user1, user2)
db.commit()

# ...or with a list
db.add(users)
db.commit()

# ...or separate adds and commit
db.add(user1)
db.add(user2)
db.commit()

Fetch model and operate.

# create user
db.add_commit(User(name='Wilma', email='wilma@example.com'))

# fetch from database
user = User.get(user1._id)
# print('user:', user)

# convert to dict
user_dict = user.to_dict()
# print('user dict:', user_dict)

# ...or just pass object directly to dict()
user_dict = dict(user)

# make some changes
user.update(level=5)

# and refresh
user.refresh()

# or flush
user.flush()

# access the session that loaded the model instance
user.session == db.object_session(user)

# delete user
user.delete()
db.commit()

# ...or via db
db.delete(user)
db.commit()

# ...or all-in-one step
db.delete_commit(user)

Query records from the database.

# add some more users
db.add_commit(
    User(items=[UserItem()]),
    User(items=[UserItem()]),
    User(items=[UserItem()]),
    User(items=[UserItem()]),
    User(items=[UserItem()])
)

# there are several syntax options for querying records

# using db.session directly
# print('all users:', db.session.query(User).all())

# ...or using db directly (i.e. db.session proxy)
assert db.query(User).all() == db.session.query(User).all()

# ...or via query property on model class
assert User.query.all() == db.session.query(User).all()

Use features from the enhanced query class.

q = User.query.join(UserItem)

# entities
assert q.entities == [User]
assert q.join_entities == [UserItem]
assert q.all_entities == [User, UserItem]

# paging
assert str(q.page(2, per_page=2)) == str(q.limit(2).offset((2-1) * 2))

# pagination
page2 = q.paginate(2, per_page=2)
assert str(page2.query) == str(q)
assert page2.page == 2
assert page2.per_page == 2
assert page2.total == q.count()
assert page2.items == q.limit(2).offset((2-1) * 2).all()
assert page2.prev_num == 1
assert page2.has_prev == True
assert page2.next_num == 3
assert page2.has_next == True
page_1 = page2.prev()
page_3 = page2.next()

# searching

# ...extend class definitions to support advanced and simple searching
User.__advanced_search__ = User.__simple_search__ = {
    'user_email': lambda value: User.email.like('%{0}%'.format(value)),
    'user_name': lambda value: User.name.like('%{0}%'.format(value))
}

UserItem.__advanced_search__ = {
    'item_name': lambda value: UserItem.name.like('%{0}%'.format(value))
}

search = User.query.search('example.com', {'user_name': 'wilma'})
# print('search:', str(search))
assert search.count() > 0

# entity loading
User.query.join_eager(User.items)
User.query.joinedload(User.items)
User.query.lazyload(User.items)
User.query.immediateload(User.items)
User.query.noload(User.items)
User.query.subqueryload(User.items)

# column loading
User.query.load_only('_id', 'name')
User.query.defer('email')
User.query.undefer('email') # if User.email undeferred in class definition
User.query.undefer_group('group1', 'group2') # if under groups defined in class

# utilities
User.query.map(lambda user: user.level)
User.query.pluck('level')
User.query.reduce(
    lambda result, user: result + 1 if user.level > 5 else result,
    initial=0
)

Utilize ORM events.

from alchy import events

class User(Model):
    __table_args__ = {
        # this is needed since we're replacing the ``User`` class defined above
        'extend_existing': True
    }

    _id = Column(types.Integer(), primary_key=True)
    name = Column(types.String())
    email = Column(types.String())
    level = Column(types.Integer())

    @events.before_insert_update()
    def validate(self, *args, **kargs):
        '''Validate model instance'''
        # do validation
        return

    @events.on_set('email')
    def on_set_email(self, value, oldvalue, initator):
        if self.query.filter(User.email==value, User._id!=self._id).count() > 0:
            raise ValueError('Email already exists in database')

user = User(email='one@example.com')
db.add_commit(user)

try:
    User(email=user.email)
except ValueError as ex:
    pass

Finally, clean up after ourselves.

db.drop_all()

See also

For further details consult API Reference.

API Reference

Includes links to source code.

Indices and Tables