Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.data.stores.redis.client

from itertools import chain
from hashlib import sha1

import pulsar
from pulsar.utils.structures import mapping_iterator, Zset
from pulsar.utils.pep import native_str, zip, ispy3k, iteritems
from pulsar.apps.ds import COMMANDS_INFO

from .pubsub import PubSub


if ispy3k:
    str_or_bytes = (bytes, str)
else:   # pragma    nocover
    str_or_bytes = basestring

INVERSE_COMMANDS_INFO = dict(((i.method_name, i.name)
                              for i in COMMANDS_INFO.values()))


class CommandError(pulsar.PulsarException):
    pass


class Executor:
    __slots__ = ('client', 'command')

    def __init__(self, client, command):
        self.client = client
        self.command = command

    def __call__(self, *args, **options):
        return self.client.execute(self.command, *args, **options)


class ResponseError:
    __slots__ = ('exception',)

    def __init__(self, exception):
        self.exception = exception


def dict_merge(*dicts):
    merged = {}
    [merged.update(d) for d in dicts]
    return merged


def pairs_to_object(response, factory=None):
    it = iter(response)
    return (factory or dict)(zip(it, it))


def values_to_object(response, fields=None, factory=None):
    if fields is not None:
        return (factory or dict)(zip(fields, response))
    else:
        return response


def string_keys_to_dict(key_string, callback):
    return dict.fromkeys(key_string.split(), callback)


def parse_info(response):
    info = {}
    response = native_str(response)

    def get_value(value):
        if ',' not in value or '=' not in value:
            try:
                if '.' in value:
                    return float(value)
                else:
                    return int(value)
            except ValueError:
                return value
        else:
            sub_dict = {}
            for item in value.split(','):
                k, v = item.rsplit('=', 1)
                sub_dict[k] = get_value(v)
            return sub_dict

    for line in response.splitlines():
        if line and not line.startswith('#'):
            key, value = line.split(':', 1)
            info[key] = get_value(value)
    return info


def values_to_zset(response, withscores=False, **kw):
    if withscores:
        it = iter(response)
        return Zset(((float(score), value) for value, score in zip(it, it)))
    else:
        return response


def sort_return_tuples(response, groups=None, **options):
    """
    If ``groups`` is specified, return the response as a list of
    n-element tuples with n being the value found in options['groups']
    """
    if not response or not groups:
        return response
    return list(zip(*[response[i::groups] for i in range(groups)]))


def pubsub_callback(response, subcommand=None):
    if subcommand == 'numsub':
        it = iter(response)
        return dict(((k, int(v)) for k, v in zip(it, it)))
        return pairs_to_object(response)
    elif subcommand == 'numpat':
        return int(response)
    else:
        return response


class Consumer(pulsar.ProtocolConsumer):

    RESPONSE_CALLBACKS = dict_merge(
        string_keys_to_dict(
            'BGSAVE FLUSHALL FLUSHDB HMSET LSET LTRIM MSET RENAME RESTORE '
            'SAVE SELECT SHUTDOWN SLAVEOF SET WATCH UNWATCH',
            lambda r: r == b'OK'
        ),
        string_keys_to_dict('SORT', sort_return_tuples),
        string_keys_to_dict('BLPOP BRPOP', lambda r: r and tuple(r) or None),
        string_keys_to_dict('SMEMBERS SDIFF SINTER SUNION', set),
        string_keys_to_dict('ZINCRBY ZSCORE',
                            lambda v: float(v) if v is not None else v),
        string_keys_to_dict('ZRANGE ZRANGEBYSCORE ZREVRANGE ZREVRANGEBYSCORE',
                            values_to_zset),
        string_keys_to_dict('EXISTS EXPIRE EXPIREAT PEXPIRE PEXPIREAT '
                            'PERSIST RENAMENX',
                            lambda r: bool(r)),
        {
            'PING': lambda r: r == b'PONG',
            'PUBSUB': pubsub_callback,
            'INFO': parse_info,
            'TIME': lambda x: (int(float(x[0])), int(float(x[1]))),
            'HGETALL': pairs_to_object,
            'HINCRBYFLOAT': lambda r: float(r),
            'HMGET': values_to_object,
            'TYPE': lambda r: r.decode('utf-8')
        }
    )

    def start_request(self):
        conn = self._connection
        args = self._request[0]
        if len(self._request) == 2:
            chunk = conn.parser.pack_command(args)
        else:
            chunk = conn.parser.pack_pipeline(args)
        conn._transport.write(chunk)

    def parse_response(self, response, command, options):
        callback = self.RESPONSE_CALLBACKS.get(command.upper())
        return callback(response, **options) if callback else response

    def data_received(self, data):
        conn = self._connection
        parser = conn.parser
        parser.feed(data)
        response = parser.get()
        request = self._request
        if len(request) == 2:
            if response is not False:
                if not isinstance(response, Exception):
                    cmnd = request[0][0]
                    response = self.parse_response(response, cmnd, request[1])
                else:
                    response = ResponseError(response)
                self.finished(response)
        else:   # pipeline
            commands, raise_on_error, responses = request
            error = None
            while response is not False:
                if (isinstance(response, Exception) and raise_on_error
                        and not error):
                    error = response
                responses.append(response)
                response = parser.get()
            if len(responses) == len(commands):
                response = []
                for cmds, resp in zip(commands[1:-1], responses[-1]):
                    args, options = cmds
                    if isinstance(resp, Exception) and not error:
                        error = resp
                    resp = self.parse_response(resp, args[0], options)
                    response.append(resp)
                if error and raise_on_error:
                    response = ResponseError(error)
                self.finished(response)


[docs]class RedisClient(object): '''Client for :class:`.RedisStore`. .. attribute:: store The :class:`.RedisStore` for this client. ''' def __init__(self, store): self.store = store def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.store) __str__ = __repr__ def pubsub(self, **kw): return PubSub(self.store, **kw)
[docs] def pipeline(self): '''Create a :class:`.Pipeline` for pipelining commands ''' return Pipeline(self.store)
def execute(self, command, *args, **options): return self.store.execute(command, *args, **options) execute_command = execute # special commands # STRINGS def decr(self, key, ammount=None): if ammount is None: return self.execute('decr', key) else: return self.execute('decrby', key, ammount) def incr(self, key, ammount=None): if ammount is None: return self.execute('incr', key) else: return self.execute('incrby', key, ammount) # HASHES def hmget(self, key, *fields): return self.execute('hmget', key, *fields, fields=fields) def hmset(self, key, iterable): args = [] [args.extend(pair) for pair in mapping_iterator(iterable)] return self.execute('hmset', key, *args) # LISTS def blpop(self, keys, timeout=0): if timeout is None: timeout = 0 if isinstance(keys, str_or_bytes): keys = [keys] else: keys = list(keys) keys.append(timeout) return self.execute_command('BLPOP', *keys) def brpop(self, keys, timeout=0): if timeout is None: timeout = 0 if isinstance(keys, str_or_bytes): keys = [keys] else: keys = list(keys) keys.append(timeout) return self.execute_command('BRPOP', *keys) # SORTED SETS
[docs] def zadd(self, name, *args, **kwargs): """ Set any number of score, element-name pairs to the key ``name``. Pairs can be specified in two ways: As ``*args``, in the form of:: score1, name1, score2, name2, ... or as ``**kwargs``, in the form of:: name1=score1, name2=score2, ... The following example would add four values to the 'my-key' key:: client.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4) """ pieces = [] if args: if len(args) % 2 != 0: raise ValueError("ZADD requires an equal number of " "values and scores") pieces.extend(args) for pair in iteritems(kwargs): pieces.append(pair[1]) pieces.append(pair[0]) return self.execute_command('ZADD', name, *pieces)
def zinterstore(self, des, keys, weights=None, aggregate=None): numkeys = len(keys) pieces = list(keys) if weights: pieces.append(b'WEIGHTS') pieces.extend(weights) if aggregate: pieces.append(b'AGGREGATE') pieces.append(aggregate) return self.execute_command('ZINTERSTORE', des, numkeys, *pieces) def zunionstore(self, des, keys, weights=None, aggregate=None): numkeys = len(keys) pieces = list(keys) if weights: pieces.append(b'WEIGHTS') pieces.extend(weights) if aggregate: pieces.append(b'AGGREGATE') pieces.append(aggregate) return self.execute_command('ZUNIONSTORE', des, numkeys, *pieces) def zrange(self, key, start, stop, withscores=False): if withscores: return self.execute_command('ZRANGE', key, start, stop, b'WITHSCORES', withscores=True) else: return self.execute_command('ZRANGE', key, start, stop) def zrangebyscore(self, key, min, max, withscores=False, offset=None, count=None): pieces = [] if withscores: pieces.append(b'WITHSCORES') if offset: pieces.append(b'LIMIT') pieces.append(offset) pieces.append(count) return self.execute_command('ZRANGEBYSCORE', key, min, max, *pieces, withscores=withscores) def zrevrange(self, key, start, stop, withscores=False): if withscores: return self.execute_command('ZREVRANGE', key, start, stop, 'WITHSCORES', withscores=True) else: return self.execute_command('ZRANGE', key, start, stop) def zrevrangebyscore(self, key, min, max, withscores=False, offset=None, count=None): pieces = [] if withscores: pieces.append(b'WITHSCORES') if offset: pieces.append(b'LIMIT') pieces.append(offset) pieces.append(count) return self.execute_command('ZREVRANGEBYSCORE', key, min, max, *pieces, withscores=withscores) def eval(self, script, keys=None, args=None): return self._eval('eval', script, keys, args) def evalsha(self, sha, keys=None, args=None): return self._eval('evalsha', sha, keys, args)
[docs] def sort(self, key, start=None, num=None, by=None, get=None, desc=False, alpha=False, store=None, groups=False): '''Sort and return the list, set or sorted set at ``key``. ``start`` and ``num`` allow for paging through the sorted data ``by`` allows using an external key to weight and sort the items. Use an "*" to indicate where in the key the item value is located ``get`` allows for returning items from external keys rather than the sorted data itself. Use an "*" to indicate where int he key the item value is located ``desc`` allows for reversing the sort ``alpha`` allows for sorting lexicographically rather than numerically ``store`` allows for storing the result of the sort into the key ``store`` ``groups`` if set to True and if ``get`` contains at least two elements, sort will return a list of tuples, each containing the values fetched from the arguments to ``get``. ''' if ((start is not None and num is None) or (num is not None and start is None)): raise CommandError("``start`` and ``num`` must both be specified") pieces = [key] if by is not None: pieces.append('BY') pieces.append(by) if start is not None and num is not None: pieces.append('LIMIT') pieces.append(start) pieces.append(num) if get is not None: # If get is a string assume we want to get a single value. # Otherwise assume it's an interable and we want to get multiple # values. We can't just iterate blindly because strings are # iterable. if isinstance(get, str): pieces.append('GET') pieces.append(get) else: for g in get: pieces.append('GET') pieces.append(g) if desc: pieces.append('DESC') if alpha: pieces.append('ALPHA') if store is not None: pieces.append('STORE') pieces.append(store) if groups: if not get or isinstance(get, str) or len(get) < 2: raise CommandError('when using "groups" the "get" argument ' 'must be specified and contain at least ' 'two keys') options = {'groups': len(get) if groups else None} return self.execute_command('SORT', *pieces, **options)
def __getattr__(self, name): command = INVERSE_COMMANDS_INFO.get(name) if command: return Executor(self, command) else: raise AttributeError("'%s' object has no attribute '%s'" % (type(self), name)) def _eval(self, command, script, keys, args): all = keys if keys is not None else () num_keys = len(all) if args: all = tuple(chain(all, args)) return self.execute(command, script, num_keys, *all)
[docs]class Pipeline(RedisClient): '''A :class:`.RedisClient` for pipelining commands ''' def __init__(self, store): self.store = store self.reset() def execute(self, *args, **kwargs): self.command_stack.append((args, kwargs)) execute_command = execute def reset(self): self.command_stack = []
[docs] def commit(self, raise_on_error=True): '''Send commands to redis. ''' cmds = list(chain([(('multi',), {})], self.command_stack, [(('exec',), {})])) self.reset() return self.store.execute_pipeline(cmds, raise_on_error)
class RedisScriptMeta(type): def __new__(cls, name, bases, attrs): super_new = super(RedisScriptMeta, cls).__new__ abstract = attrs.pop('abstract', False) new_class = super_new(cls, name, bases, attrs) if not abstract: o = new_class(new_class.script, new_class.__name__) new_class._scripts[o.name] = o return new_class class RedisScript(RedisScriptMeta('_RS', (object,), {'abstract': True})): '''Class which helps the sending and receiving lua scripts. It uses the ``evalsha`` command. .. attribute:: script The lua script to run .. attribute:: required_scripts A list/tuple of other :class:`RedisScript` names required by this script to properly execute. .. attribute:: sha1 The SHA-1_ hexadecimal representation of :attr:`script` required by the ``EVALSHA`` redis command. This attribute is evaluated by the library, it is not set by the user. .. _SHA-1: http://en.wikipedia.org/wiki/SHA-1 ''' abstract = True script = None _scripts = {} required_scripts = () def __init__(self, script, name): if isinstance(script, (list, tuple)): script = '\n'.join(script) self.__name = name self.script = script rs = set((name,)) rs.update(self.required_scripts) self.required_scripts = rs @property def name(self): return self.__name @property def sha1(self): if not hasattr(self, '_sha1'): self._sha1 = sha1(self.script.encode('utf-8')).hexdigest() return self._sha1 def __repr__(self): return self.name if self.name else self.__class__.__name__ __str__ = __repr__ def preprocess_args(self, client, args): return args def callback(self, response, **options): '''Called back after script execution. This is the only method user should override when writing a new :class:`RedisScript`. By default it returns ``response``. :parameter response: the response obtained from the script execution. :parameter options: Additional options for the callback. ''' return response def __call__(self, client, keys, args, options): args = self.preprocess_args(client, args) numkeys = len(keys) keys_args = tuple(keys) + args options.update({'script': self, 'redis_client': client}) return client.execute_command('EVALSHA', self.sha1, numkeys, *keys_args, **options)