Documentation for pulsar 0.9.2. For development docs, go here.

Datastore Clients

Tha main component for pulsar datastore clients is the Store class which encapsulates the essential API for communicating and executing commands on remote servers. A Store can also implement several methods for managing the higher level object data mapper.

Getting Started

To create a data Store client one uses the create_store() function with a valid connection string:

>>> from pulsar.apps.data import create_store
>>> store = create_store('redis://user:password@127.0.0.1:6500/11')
>>> store.name
'redis'
>>> store.database
'11'
>>> store.dns
'redis://user:password@127.0.0.1:6500/11'

Additional parameters can be passed via the connection string or as key-valued parameters. For example:

>>> store = create_store('redis://user:password@127.0.0.1:6500/11',
                         namespace='test_')
>>> store.dns
'redis://user:password@127.0.0.1:6500/11?namespace=test_'

Connection String

The connection string is a way to specify the various parameters for the data store connection. All Store support a connection string of the form:

<scheme>://<username>:<password>@<host>/<database>?param1=...&param2=...

where:

  • schema is the store name
  • database is the database name (or number for redis)
  • username is an optional database username
  • password is an optional database password

Implement a Store

When implementing a new Store there are several methods which need to be covered:

  • ping() to check if the server is available
  • connect() to create a new connection
  • execute() to execute a command on the store server

These additional methods must be implemented only if the store supports the object data mapper:

A new store needs to be registered via the register_store() function.

All registered data stores are stored in the data_stores dictionary:

from pulsar.apps.data import data_stores

Pulsar provides two implementations, the redis client and the pulsards client.

Publish/Subscribe

A Store can implement the pubsub() method to return a valid PubSub handler.

API

The main component of the data store API is the create_store() function which creates in one function call a new Store object which can be used to interact directly with the backend database or indirectly via the object data mapper.

Create store

pulsar.apps.data.store.create_store(url, **kw)[source]

Create a new Store for a valid url.

Parameters:
  • url

    a valid url takes the following forms:

    Pulsar datastore:

    pulsar://user:password@127.0.0.1:6410
    

    Redis:

    redis://user:password@127.0.0.1:6500/11?namespace=testdb
    

    CouchDb:

    couchdb://user:password@127.0.0.1:6500/testdb
    https+couchdb://user:password@127.0.0.1:6500/testdb
    
  • kw – additional key-valued parameters to pass to the Store initialisation method. It can contains parameters such as database, user and password to override the url values. Additional parameters are processed by the Store._init() method.
Returns:

a Store.

Start store

pulsar.apps.data.stores.pulsards.startds.start_store(*args, **kwargs)[source]

Equivalent to create_store() for most cases excepts when the url is for a pulsar store not yet started. In this case, a PulsarDS is started.

Register a new store

pulsar.apps.data.store.register_store(name, dotted_path)[source]

Register a new Store with schema name which can be found at the python dotted_path.

Store

class pulsar.apps.data.store.Store(name, host, loop=None, database=None, user=None, password=None, encoding=None, **kw)[source]

Base class for an asynchronous data stores.

It is an Producer for accessing and retrieving data from remote data servers such as redis, couchdb and so forth. A Store should not be created directly, the high level create_store() function should be used instead.

_host

The remote host, tuple or string

_user

The user name

_password

The user password

name

Store name

database

Database name/number associated with this store.

encoding

Store encoding (usually utf-8)

dns

Domain name server

connect()[source]

Connect with store server

execute(*args, **options)[source]

Execute a command

ping()[source]

Used to check if the data server is available

client()[source]

Get a client for the Store if implemented

pubsub(**kw)[source]

Obtain a PubSub handler for the Store if implemented

create_database(dbname=None, **kw)[source]

Create a new database in this store.

By default it does nothing, stores must implement this method only if they support database creation.

Parameters:dbname – optional database name. If not supplied a database with database is created.
delete_database(dbname=None)[source]

Delete a database dbname

By default it does nothing, stores must implement this method only if they support database deletion.

Parameters:dbname – optional database name. If not supplied a database named database is deleted.
close()[source]

Close all open connections

flush()[source]

Flush the store.

encode_bytes(data)[source]

Encode bytes data

Parameters:data – a bytes string
Returns:bytes or string
dencode_bytes(data)[source]

Decode bytes data

Parameters:data – bytes or string
Returns:bytes
transaction()[source]

Create a transaction for this store.

create_table(model, remove_existing=False)[source]

Create the table for model.

This method is used by the object data mapper. By default it does nothing.

drop_table(model, remove_existing=False)[source]

Drop the table for model.

This method is used by the object data mapper. By default it does nothing.

table_info(model)[source]

Information about the table/collection mapping model

create_model(manager, *args, **kwargs)[source]

Create a new model from a manager

Method used by the Manager callable method.

execute_transaction(transaction)[source]

Execute a transaction() in a multi-store Transaction.

THis methid is used by the object data mapper and should not be invoked directly. It returns a list of models committed to the backend server.

compile_query(query)[source]

Compile the Query query.

Method required by the Object data mapper.

Returns:an instance of CompiledQuery if implemented
get_model(manager, pkvalue)[source]

Fetch an instance of a model with primary key pkvalue.

This method required by the object data mapper.

Parameters:
  • manager – the Manager calling this method
  • pkvalue – the primary key of the model to retrieve
has_query(query_type)[source]

Check if this Store supports query_type.

Parameters:query_type – a string indicating the query type to check (filter, exclude, search).

This method is used by the object data mapper.

model_data(model, action)[source]

Generator of field, value pair for the data store.

By default invokes the ModelMeta.store_data method.

Command

class pulsar.apps.data.store.Command(args, action=0)[source]

A command executed during a in a execute_transaction()

action

Type of action: * 0 custom command * 1 equivalent to an SQL INSERT * 2 equivalent to an SQL DELETE

PubSub

class pulsar.apps.data.store.PubSub(store, protocol=None)[source]

A Publish/Subscriber interface.

A PubSub handler is never initialised directly, instead, the pubsub() method of a data Store is used.

To listen for messages one adds clients to the handler:

def do_somethind(channel, message):
    ...

pubsub = client.pubsub()
pubsub.add_client(do_somethind)
pubsub.subscribe('mychannel')

You can add as many listening clients as you like. Clients are functions which receive two parameters only, the channel sending the message and the message.

A PubSub handler can be used to publish messages too:

pubsub.publish('mychannel', 'Hello')

An additional protocol object can be supplied. The protocol must implement the encode and decode methods.

publish(channel, message)[source]

Publish a new message to a channel.

count(*channels)[source]

Returns the number of subscribers (not counting clients subscribed to patterns) for the specified channels.

channels(pattern=None)[source]

Lists the currently active channels.

An active channel is a Pub/Sub channel with one ore more subscribers (not including clients subscribed to patterns). If no pattern is specified, all the channels are listed, otherwise if pattern is specified only channels matching the specified glob-style pattern are listed.

psubscribe(pattern, *patterns)[source]

Subscribe to a list of patterns.

punsubscribe(*channels)[source]

Unsubscribe from a list of patterns.

subscribe(channel, *channels)[source]

Subscribe to a list of channels.

unsubscribe(*channels)[source]

Un-subscribe from a list of channels.

close()[source]

Stop listening for messages.

add_client(client)[source]

Add a new client to the set of all clients.

Clients must be callable accepting two parameters, the channel and the message. When a new message is received from the publisher, the broadcast() method will notify all clients via the callable method.

remove_client(client)[source]

Remove client from the set of all clients.

broadcast(response)[source]

Broadcast message to all clients.



Table Of Contents

Previous topic

Data Stores

Next topic

Object Data Mapper

This Page