>>> from __future__ import print_function
>>> from dagpype import *
>>> import re
>>> import numpy
>>> import math

A framework for data processing and data preparation DAG (directed acyclic graph) pipelines.

The examples in the documentation assume

>>> from __future__ import print_function

if running pre Py3K, as well as

>>> from dagpype import *
exception dagpype.Error(msg)

Base class for data-pipe errors.

exception dagpype.InvalidParamError(param, value, msg)

Invalid parameter passed.

param()

Returns offending parameter name.

value()

Returns offending parameter value.

exception dagpype.NoResultError

Indicates a pipeline terminated with no result.

dagpype.filters(fn)

Decorator signifying a (generator) function is a filter function.

Arguments:
fn – Decorated function.

Example:

>>> # Function adding 1 to anything (there are simpler ways of doing this):
>>> def add_1():
...     @filters
...     def _act(target):
...         try:
...             while True:
...                 target.send((yield) + 1)
...         except GeneratorExit:
...             target.close()
...     return _act    
dagpype.freeze(target)
freeze and thaw are used together to create a “frozen” target, which can be connected to pipelines multiple times. Feeding values to a frozen target does not result in anything. Only when the target is “thawed”, a result is returned.

Examples:

>>> # Print the smallest value in three files
>>> a = freeze(min_())
>>> for f_name in ['data1.txt', 'data2.txt', 'data3.txt']:
...     stream_vals(f_name) | cast(float) | a
>>> print(thaw(a))
2.0
>>> lists = [range(100), range(10), range(500)]
>>> # Calculate the mean and standard deviation of the elements in lists
>>> #   which are longer than 100.
>>> a = freeze(mean() + stddev())
>>> for l in lists:
...     if len(l) > 100:
...         source(l) | a
>>> # Get the mean and standard deviation
>>> m, s = thaw(a)
dagpype.sinks(fn)

Decorator signifying a (generator) function is a sink function.

Arguments:
fn – Decorated function.

Example:

>>> # Function returning the sum of the first two values sent to it:
... def add_1():
...     @filters
...     def _act(target):
...         i, sum = 0, 0
...         try:
...             while True:
...                 if i < 2:       
...                     sum += (yield)
...                 i += 1
...         except GeneratorExit:
...             if i >= 2:
...                 target.send(sum)
...             target.close()
...     return _act    
dagpype.source(iterable)

Creates a source from any iterable sequence.

Arguments:
iterable – Iterable sequence whose values will be sent on.
See Also:
dagpype.np.chunk_source()

Examples:

>>> source([1, 2, 3, 4]) | count()
4
>>> source(()) | count()
0
dagpype.sources(fn)

Decorator signifying a (generator) function is a source function.

Arguments:
fn – Decorated function.

Example:

>>> # Function returning 10 '1's (there are simpler ways of doing this):
>>> def ten_ones():
...     @sources
...     def _act():
...         for i in range(10):
...             yield '1'
...     return _act
dagpype.sub_pipe_target(pipe, target)
Call, from within an implementation of a filter, to dynamically create a sub-target formed by concatenating a pipe to the current target.
Arguments:
pipe – Typically a pipe created within the implementation. target – The target provided to the implementation.
Returns:If the pipe is a source pipe - None. Otherwise a subtarget to which elements can be passed through send(), closed via close(), etc.

Example:

>>> def some_implementation(target):
...     pipe = filt(lambda x : 2 * x)
...     sub_target = sub_pipe_target(pipe, target)
...     try:
...         while True:
...             sub_target.send((yield))
...     except GeneratorExit:
...         sub_target.close()
...         target.close()                
dagpype.thaw(target)
freeze and thaw are used together to create a “frozen” target, which can be connected to pipelines multiple times. Feeding values to a frozen target does not result in anything. Only when the target is “thawed”, a result is returned.

Examples:

>>> # Print the smallest value in three files
>>> a = freeze(min_())
>>> for f_name in ['data1.txt', 'data2.txt', 'data3.txt']:
...     stream_vals(f_name) | cast(float) | a
>>> print(thaw(a))
2.0
>>> lists = [range(100), range(10), range(500)]
>>> # Calculate the mean and standard deviation of the elements in lists
>>> #   which are longer than 100.
>>> a = freeze(mean() + stddev())
>>> for l in lists:
...     if len(l) > 100:
...         source(l) | a
>>> # Get the mean and standard deviation
>>> m, s = thaw(a)
exception dagpype.Error(msg)

Base class for data-pipe errors.

exception dagpype.UnknownNamedCSVColError(name)

Indicates a name was not found in a CSV header.

name()

Returns problematic name.

dagpype.os_walk(directory='.')

Recursively iterate through files.

Keyword Arguments: directory – Directory to perform the search (default ‘.’)

See Also:
dagpype.filename_filt()

Example:

>>> # Creates a list of files in the current directory.
>>> l = os_walk() | to_list()
dagpype.parse_xml(stream, events=('end', ))
Parses XML. Yields a sequence of (event, elem) pairs, where event is the event for yielding the element (e.g., ‘end’ for tag end), and elem is a xml.etree.ElementTree element whose tag and text can be obtained through elem.tag and elem.text, respectively.
Arguments:
stream – Either a stream, e.g., as returned by open(), or a name of a file.
Keyword Arguments:
events – Tuple of xml.etree.ElementTree events (default (‘end’,))

See the online documentation for an example.

dagpype.source(iterable)

Creates a source from any iterable sequence.

Arguments:
iterable – Iterable sequence whose values will be sent on.
See Also:
dagpype.np.chunk_source()

Examples:

>>> source([1, 2, 3, 4]) | count()
4
>>> source(()) | count()
0
dagpype.sources(fn)

Decorator signifying a (generator) function is a source function.

Arguments:
fn – Decorated function.

Example:

>>> # Function returning 10 '1's (there are simpler ways of doing this):
>>> def ten_ones():
...     @sources
...     def _act():
...         for i in range(10):
...             yield '1'
...     return _act
dagpype.stream_lines(stream, rstrip=True)

Streams the lines from some stream.

Arguments:
stream – Either a stream, e.g., as returned by open(), or a name of a file.
Keyword Arguments:
rstrip – if True, right-strips lines (default True).
See Also:
dagpype.stream_vals() dagpype.np.chunk_stream_lines()

Examples:

>>> # Places a file's lines in a list.
>>> stream_lines('data.csv') | to_list()
['wind rain', '1 2', '3 4', '5 6']
dagpype.stream_vals(stream, cols=None, types_=None, delimit=', ', comment=None, skip_init_space=True)

Streams delimited (e.g., by commas for CSV files, or by tabs for TAB files) values as tuples.

Arguments:
stream – Either the name of a file or a binary stream.
Keyword Arguments:
cols – Indication of which columns to read. If either an integer or a tuple of integers,
the corresponding columns will be read. If either a string or a tuple of strings, the columns whose first rows have these string names (excluding the first row) will be read. If None (which is the default), everything will be read.
types_ – Either a type or a tuple of types. If this is given, the read values will
be cast to these types. Otherwise, if this is None (which is the default) the read values will be cast into floats.

delimit – Delimiting binary character (default b’,’). comment – Comment-starting binary character or None (default).

Any character starting from this one until the line end will be ignored.

skip_init_space – Whether spaces starting a field will be ignored (default True).

See Also:
dagpype.csv_split() dagpype.stream_lines() dagpype.np.chunk_stream_vals()

Examples:

>>> # Find the correlation between two named columns.
>>> stream_vals('meteo.csv', (b'wind', b'rain')) | corr()
-0.005981379045088809
>>> # Find the correlation between two indexed columns.
>>> stream_vals('neat_data.csv', (0, 3)) | corr()
-0.0840752963937695
dagpype.append(e)

Appends to the end of all elements.

Arguments:
e – What to append.
See Also:
dagpype.prepend()

Example:

>>> source([1, 2, 3, 4]) | append(5) | to_list()
[1, 2, 3, 4, 5]
dagpype.cast(types_)

Returns a cast of elements.

Arguments:
types_ – either an type, or a tuple of types. This corresponds to whether
each element is a single item or a tuple of items. each element passed through it. Otherwise, it will pass a tuple.

Examples:

>>> source(['1']) | cast(float) | to_list()
[1.0]
>>> source([('1', '2')]) | cast((int, float)) | to_list()
[(1, 2.0)]
dagpype.csv_split(cols=None, types_=None, delimit=', ', comment=None, skip_init_space=True)

Splits the values in a delimited stream (e.g., by commas for CSV files, or by tabs for TAB files) as tuples.

Keyword Arguments:
cols – Indication of which columns to read. If either an integer or a tuple of integers,
the corresponding columns will be read. If either a string or a tuple of strings, the columns whose first rows have these string names (excluding the first row) will be read. If None (which is the default), everything will be read.
types_ – Either a type or a tuple of types. If this is given, the read values will
be cast to these types. Otherwise, if this is None (which is the default) the read values will be cast into floats.

delimit – Delimiting binary character (default b’,’). comment – Comment-starting binary character or None (default).

Any character starting from this one until the line end will be ignored.

skip_init_space – Whether spaces starting a field will be ignored (default True).

See Also:
dagpype.split() dagpype.stream_vals()

Examples:

>>> # Assume the file 'junky.txt' contains lines, those containing the string
>>> # 'moshe' are split by tabs, and we wish to find the correlation between the
>>> #  3nd and 6th values in these lines.
>>> stream_lines('junky.txt') | grep(b'moshe') | csv_split((2, 5), delimit = b';') | corr()
0.4472135954999579
dagpype.cum_ave()

Transforms a sequence into a cumulative average of it.

If the input sequence is x[0], x[1], ..., then the output sequence is
{{{ y[i] = (x[0] + ... + x[i]) / (i + 1) }}}
See Also:
dagpype.cum_sum() dagpype.window_simple_ave() dagpype.exp_ave() dagpype.np.cum_ave()

Examples:

>>> source([1., 2., 3., 4.]) | cum_ave() | to_list()
[1.0, 1.5, 2.0, 2.5]
dagpype.cum_sum()

Transforms a sequence into a cumulative sum of it.

If the input sequence is x[0], x[1], ..., then the output sequence is
{{{ y[i] = x[0] + ... + x[i] }}}
See Also:
dagpype.cum_ave() dagpype.np.cum_sum()

Examples:

>>> source([1., 2., 3., 4.]) | cum_sum() | to_list()
[1.0, 3.0, 6.0, 10.0]
dagpype.enumerate_(start=0)
Enumerates a stream. Transforms it into a stream of pairs, where the first of each is a running index.
Arguments:
start – Starting value of running index (default 0).
See Also:
dagpype.np.enumerate_()

Examples:

>>> source(['a', 'b', 'c', 'd']) | enumerate_() | to_list()
[(0, 'a'), (1, 'b'), (2, 'c'), (3, 'd')]
dagpype.exp_ave(alpha)

Transforms a sequence into an exponential moving average of it.

If the input sequence is x[0], x[1], ..., then the output sequence is
{{{ y[0] = x[0] y[i] = alpha * x[i] + (1 - alpha) * y[i - 1] }}}
All but an epsilon of relevant weight is stored at each point in the last
log(epsilon) / log(1 - alpha) time units.

Arguments: alpha – Responsiveness factor, should be between 0 and 1.

See Also:
dagpype.window_simple_ave() dagpype.cum_ave() dagpype.np.exp_ave()

Example:

>>> source([1., 2., 3., 4.]) | exp_ave(0.75) | to_list()
[1.0, 1.75, 2.6875, 3.671875]
dagpype.filename_filt(pattern, skip_files=False, skip_dirs=True)

Filters filenames - checks if they pass some criteria.

Arguments:
pattern – Glob type of pattern.
Keyword Arguments:
skip_files – Whether to skip regular files (default False) skip_dirs – Whether to skip directories.
See Also:
dagpype.os_walk()

Example:

>>> # Counts the number of files of the form 'data?.csv'
>>> print(os_walk() | filename_filt('data?.csv') | count())
0
dagpype.filt(trans=None, pre=None, post=None)

Filter (transform elements and / or suppress them).

Keyword Arguments: trans – Transformation function for each element (default None). pre – Suppression function checked against each element before

transformation function, if any (default None).
post – Suppression function checked against each element after
transformation function, if any (default None).
See Also:
dagpype.from_() dagpype.to() dagpype.from_to() dagpype.skip() dagpype.nth() dagpype.slice_() dagpype.tail() dagpype.sink() dagpype.grep()

Example:

>>> # square-root of non-negative elements
>>> source([-1, 4, -3, 16]) | filt(trans = lambda x : math.sqrt(x), pre = lambda x : x >= 0) | to_list()
[2.0, 4.0]
dagpype.filters(fn)

Decorator signifying a (generator) function is a filter function.

Arguments:
fn – Decorated function.

Example:

>>> # Function adding 1 to anything (there are simpler ways of doing this):
>>> def add_1():
...     @filters
...     def _act(target):
...         try:
...             while True:
...                 target.send((yield) + 1)
...         except GeneratorExit:
...             target.close()
...     return _act    
dagpype.from_(cond, inclusive=True)

Stream elements starting from one that fits some condition.

Arguments:
cond – Either a function or some other object. In the first case, the
function will be applied to each element; in the second case, the object will be compared (using ==) with each element.
Keyword Arguments:
inclusive – Whether the element first matching the criteria is streamed
(default True)
See Also:
dagpype.filt() dagpype.to() dagpype.from_to() dagpype.skip() dagpype.nth() dagpype.slice_() dagpype.tail()

Examples:

>>> source([1, 2, 3, 4, 3, 2, 1]) | from_(2) | to_list()
[2, 3, 4, 3, 2, 1]
>>> source([1, 2, 3, 4, 3, 2, 1]) | from_(2, False) | to_list()
[3, 4, 3, 2, 1]
>>> source([1, 2, 3, 4, 3, 2, 1]) | from_(lambda d: d % 3 == 0) | to_list()
[3, 4, 3, 2, 1]
dagpype.from_to(from_cond, to_cond, from_inclusive=True, to_inclusive=True, strict=False)
Stream elements each time they are between one that fits a condition and up to some other one that fits some condition.
Arguments:
from_cond – Either a function or some other object. In the first case, the
function will be applied to each element; in the second case, the object will be compared (using ==) with each element.
to_cond – Either a function or some other object. In the first case, the
function will be applied to each element; in the second case, the object will be compared (using ==) with each element.
Keyword Arguments:
from_inclusive – Whether the element first matching the criteria is streamed
(default True)
to_inclusive – Whether the element first matching the criteria is streamed
(default True)

strict – Whether

See Also:
dagpype.filt() dagpype.from_() dagpype.to() dagpype.skip() dagpype.nth() dagpype.slice_() dagpype.tail()

Examples:

>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3) | to_list()
[2, 3, 2, 1, 3, 2]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3, from_inclusive = False) | to_list()
[3, 1, 3]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3, to_inclusive = False) | to_list()
[2, 2, 1, 2]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3, from_inclusive = False, to_inclusive = False) | to_list()
[1]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3, strict = True) | to_list()
[2, 3, 2, 1, 3]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3, from_inclusive = False, strict = True) | to_list()
[3, 1, 3]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3, to_inclusive = False, strict = True) | to_list()
[2, 2, 1]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(2, 3, from_inclusive = False, to_inclusive = False, strict = True) | to_list()
[1]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(lambda d: d % 2 == 0, lambda d: d % 4 == 0) | to_list()
[2, 3, 4, 2, 1, 3, 7, 2]
>>> source([1, 2, 3, 4, 3, 2, 1, 3, 7, 2]) | from_to(lambda d: d % 2 == 0, lambda d: d % 4 == 0, strict = True) | to_list()
[2, 3, 4]
dagpype.grep(what)

Filters strings based on the occurrence of a substring or a regular expression.

Arguments:
what – Either a string or a compiled regular expression.
See Also:
dagpype.filt()

Examples:

>>> source([b'aa', b'aab', b'b']) | grep(b'b') | to_list()
['aab', 'b']
>>> source(['aa', 'aab', 'b']) | grep(re.compile(r'(a+)b')) | to_list()
['aab']
dagpype.prepend(what)

Prepends to the start of all elements.

Arguments:
e – What to prepend.
See Also:
dagpype.append()

Example:

>>> source([1, 2, 3, 4]) | prepend(0) | to_list()
[0, 1, 2, 3, 4]
dagpype.prob_rand_sample(prob)

Randomly passes some of the elements, with a given probability.

Arguments:
prob – Probability an element will pass.
See Also:
dagpype.size_rand_sample()

Example:

>>> n = 99999
>>> assert 0.5 <= (source(range(n)) | prob_rand_sample(0.7) | count()) / float(n) <= 1
dagpype.relay()

Sends on whatever is passed to it.

Example:

>>> # Find the rain auto-correlation relative to the signal 5 time units in the future.
>>> stream_vals(open('meteo.csv'), 'rain') | relay() + skip(5) | corr()
1.0
dagpype.select_inds(inds)

Returns a selection of the selected indices of indexable elements.

Arguments: inds – either an integer, or an iterable of integers.

If inds is an integer, this filter will pass on a single element for
each element passed through it. Otherwise, it will pass a tuple.

Examples:

>>> source([(1, 2, 3), (4, 5, 6)]) | select_inds(2) | to_list()
[3, 6]
>>> source([(1, 2, 3), (4, 5, 6)]) | select_inds((0, 2)) | to_list()
[(1, 3), (4, 6)]
>>> source([(1, 2, 3), (4, 5, 6)]) | select_inds(()) | to_list()
[(), ()]
dagpype.skip(n)

Skips n elements.

Arguments: n - If a positive integer, skips n elements from start, else

skips n element from the end
See Also:
dagpype.nth() dagpype.np.skip()

Example:

>>> source([1, 2, 3, 4]) | skip(2) | to_list()
[3, 4]
>>> source([1, 2, 3, 4]) | skip(-2) | to_list()
[1, 2]
dagpype.slice_(start=None, stop=None, step=None)

Similar to itertools.islice.

See Also:
dagpype.from_() dagpype.to() dagpype.from_to() dagpype.nth() dagpype.skip() dagpype.tail()

Examples:

>>> source(range(100)) | slice_(10) | to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> source(range(100)) | slice_(0, 100, 10) | to_list()
[0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
>>> source(range(100)) | slice_(0, 10) | to_list()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
dagpype.split(delimit=', ')
Splits a stream of strings to a stream of tuples resulting from the strings being split by a delimiter.
Keyword Arguments:
delimit – Delimiting character (default ‘,’)
See Also:
dagpype.csv_split()

Example:

>>> source(['a,b', 'c,d,e']) | split() | to_list()
[('a', 'b'), ('c', 'd', 'e')]
dagpype.tail(num)

Takes the tail of a stream.

Arguments:
num – How many elements from the tail to take.
See Also:
dagpype.from_() dagpype.to() dagpype.from_to() dagpype.skip() dagpype.nth() dagpype.slice_()

Examples:

>>> source(range(10)) | tail(4) | to_list()
[6, 7, 8, 9]
>>> source(range(10)) | tail(0) | to_list()
[]
>>> source(range(2)) | tail(4) | to_list()
[0, 1]
dagpype.to(cond, inclusive=True)

Stream elements until the one that fits some condition.

Arguments:
cond – Either a function or some other object. In the first case, the
function will be applied to each element; in the second case, the object will be compared (using ==) with each element.
Keyword Arguments:
inclusive – Whether the element first matching the criteria is streamed
(default True)
See Also:
dagpype.filt() dagpype.from_() dagpype.from_to() dagpype.skip() dagpype.nth() dagpype.slice_() dagpype.tail()

Examples:

>>> source([1, 2, 3, 4, 3, 2, 1]) | to(2) | to_list()
[1, 2]
>>> source([1, 2, 3, 4, 3, 2, 1]) | to(2, False) | to_list()
[1]
>>> source([1, 2, 3, 4, 3, 2, 1]) | to(lambda d: d % 3 == 0) | to_list()
[1, 2, 3]
dagpype.trace(stream=<open file '<stdout>', mode 'w' at 0x2aaaaaae91e0>, enum=True, format_=<function <lambda> at 0x225c668>)

Traces elements to a stream. Useful for debugging problematic streams.

Keyword Arguments:
stream – Stream to which to trace (default sys.stdout) enum – Whether to enumerate each element by its order (default True) format_ – Format function for elements (default lambda e : str(e))

Example:

>>> source([1, 2, 3, 'd']) | trace() | sum_()
0 : 1
1 : 2
2 : 3
3 : d
Traceback (most recent call last):
  ...
TypeError: unsupported operand type(s) for +=: 'int' and 'str'
dagpype.window_max(wnd_len, lt=<function <lambda> at 0x225c1b8>)

Examples:

>>> source([1, 2, 3, 4, 1, 0, 4, 4]) | window_max(2) | to_list()
[1, 2, 3, 4, 4, 1, 4, 4]
dagpype.window_min(wnd_len, lt=<function <lambda> at 0x225c0c8>)

Transforms a sequence into its minimum within some window. Uses an algorithm from http://home.tiac.net/~cri/2001/slidingmin.html

If the input sequence is x[0], x[1], ..., then the output sequence is
{{{ y[i] = min(x[i], x[i - 1], ..., x[max(0, i - wnd_len)]) }}}

Arguments: wnd_len – Averaging window length.

Keyword Arguments: lt – Comparison function (default: lambda x, y: x < y) used for deciding which is smaller than which.

See Also:
dagpype.window_max()

Examples:

>>> source([1, 2, 3, 4, 1, 0, 4, 4]) | window_min(2) | to_list()
[1, 1, 2, 3, 1, 0, 0, 4]
dagpype.window_quantile(wnd_len, quantile=0.5, lt=<function <lambda> at 0x225c2a8>)

Transforms a sequence into its quantiles within some window.

If the input sequence is x[0], x[1], ..., then the output sequence is
{{{ y[i] = q_{quantile}(x[i], x[i - 1], ..., x[max(0, i - wnd_len)]) }}}
where q_{p}(A) is the smallest element larger than a p-th of A’s elements (e.g., 0.5 is the
median).
Arguments:
wnd_len – Window length.
Keyword Arguments:
quantile – Quantile fraction; should be between 0 and 1 (default 0.5, which is the median). lt – Comparison function (default: lambda x, y: x < y) used for deciding which is smaller than which.

Examples:

>>> source([1, 4, 2, 4, 6, 9, 2, 4, 5]) | window_quantile(2, 0.5) | to_list()
[1, 4, 4, 4, 6, 9, 9, 4, 5]
>>> source([1, 4, 2, 4, 6, 9, 2, 4, 5]) | window_quantile(3, 0.5) | to_list()
[1, 4, 2, 4, 4, 6, 6, 4, 4]
dagpype.window_simple_ave(wnd_len)
Transforms a sequence into a simple moving average of its values within some window.
If the input sequence is x[0], x[1], ..., then the output sequence is
{{{ y[i] = (x[max(0, i - len)] + ... + x[i]) / min(i + 1, wnd_len) }}}
Arguments:
wnd_len – Averaging window length.
See Also:
dagpype.cum_ave() dagpype.exp_ave()

Examples:

>>> source([1., 2., 3., 4.]) | window_simple_ave(2) | to_list()
[1.0, 1.5, 2.5, 3.5]
dagpype.corr()

Calculates the Pearson correlation coefficient between tuples.

if:

n is the number of elements

sx is the sum of xs

sy is the sum of ys

sxx is the sum of squared xs

syy is the sum of squared ys

sxy is the sum of xys

then:

corr = (n * sxy - sx * sy) / math.sqrt(n * sxx - sx * sx) / math.sqrt(n * syy - sy * sy)
See Also:
dagpype.np.corr()

Examples:

>>> source([1, 2, 3, 4]) + source([1, 2, 3, 4]) | corr() 
1.0

1.0

>>> source([(60, 3.1), (61, 3.6), (62, 3.8), (63, 4), (65, 4.1)]) | corr()
0.9118724953377315
dagpype.count()

Computes the number of piped elements.

See Also:
dagpype.np.count()

Example:

>>> source([1, 2, 3, 4]) | count()
4
dagpype.max_()

Computes the largest element.

See Also:
dagpype.min_()

Example:

>>> source([1, 2, 3, 4]) | max_()
4
dagpype.mean()

Calculates the mean of all elements.

See Also:
dagpype.stddev() dagpype.np.mean()

Example:

>>> int(source([2, 4, 4, 4, 5, 5, 7, 9]) | mean())
5
dagpype.min_()

Computes the smallest element.

See Also:
dagpype.max_() dagpype.np.min_()

Example: >>> source([1, 2, 3, 4]) | min_() 1

dagpype.nth(n)

Returns the n-th piped element.

Arguments:
n - If a positive integer, returns nth element from start, else
return nth element from n.
See Also:
dagpype.skip()

Example:

>>> source([1, 2, 3, 4]) | nth(0) 
1
>>> source([1, 2, 3, 4]) | nth(-1)
4
dagpype.sink(res)

General purpose sink.

Arguments:
res – Result. If this is a function, the result is the function applied
to the last argument. Otherwise, the result is this parameter independent from the sequence
See Also:
dagpype.filt()

Examples:

>>> source([1, 2, 3]) | sink(lambda x : x ** 2)
9
>>> source([1, 2]) | sink('hello')
'hello'
>>> source([1, 2, 3]) | sink('hello')
'hello'
dagpype.sinks(fn)

Decorator signifying a (generator) function is a sink function.

Arguments:
fn – Decorated function.

Example:

>>> # Function returning the sum of the first two values sent to it:
... def add_1():
...     @filters
...     def _act(target):
...         i, sum = 0, 0
...         try:
...             while True:
...                 if i < 2:       
...                     sum += (yield)
...                 i += 1
...         except GeneratorExit:
...             if i >= 2:
...                 target.send(sum)
...             target.close()
...     return _act    
dagpype.size_rand_sample(size)
Randomly samples (with replacement) a random sample with a given size. Returns a list of the sampled elements.
Arguments:
size – Sample size.
See Also:
dagpype.prob_rand_sample()

Example:

>>> # Create a sample of size 2 from 0 : 100.
>>> sample = source(range(100)) | size_rand_sample(2) 
dagpype.stddev(ddof=1)

Calculates the sample standard deviation.

Keyword Arguments:
ddof – Degrees of freedom (default 1)
if:
s is the sum of xs, ss is the sum of squared xs n is the number of xs,
then:
stddev = math.sqrt((ss - s * s / n) / (n - ddof))
See Also:
dagpype.mean()

Example:

>>> source([2, 4, 4, 4, 5, 5, 7, 9]) | stddev(0)
2.0
dagpype.sum_()

Computes the sum of piped elements.

See Also:
dagpype.np.sum_()

Example:

>>> source([1, 2, 3, 4]) | sum_()
10
>>> source(['1', '2', '3', '4']) | sum_()
'1234'
dagpype.to_dict()

Converts all elements to a dictionary. Given tuples, the first entry of each is the key, and the second is the data.

See Also:
dagpype.to_list() dagpype.np.to_array()

Example:

>>> source(((1, 'a'), (2, 'b'), (3, 'b'), (4, 'j'))) | to_dict()
{1: 'a', 2: 'b', 3: 'b', 4: 'j'}
dagpype.to_list()

Converts all elements to a list.

See Also:
dagpype.to_dict() dagpype.np.to_array()

Example:

>>> source((1, 2, 3, 4)) | to_list()
[1, 2, 3, 4]
dagpype.to_stream(stream, names=None, delimit=', ', line_terminator='\n', buf_size=256)
Writes elements to an output stream; computes the number of lines written (excluding names). If an element is a tuple, its sub-elements will be separated by the delimit byte. Elements will be separated by the line-terminator byte.
Arguments:
stream – Either a binary stream, e.g., as returned by open(..., ‘wb’), or a name of a file.
Keyword Arguments:
names – Either a byte array, a tuple of byte arrays, or None (default None). If not None, the names
will be written (separated by the delimit byte if more than one), followed by the line_terminator byte.

delimit – Delimiting binary character between elements (default b’,’). line_terminator – Between-element binary character element (default b’

‘)
buf_size – Number of elements buffered before writing (default 1024)
See Also:
dagpype.np.chunks_to_stream()

Examples:

>>> # Writes a CSV file of 3 lines, each containing 2 sub-elements
>>> source([(1, 2), (3, 4), (5, 6)]) | to_stream('data.csv')
3
>>> # Same but with headings
>>> source([(1, 2), (3, 4), (5, 6)]) | to_stream('data.csv', names = ('wind', 'rain'))
3
>>> # Same but with tab separators.
>>> source([(1, 2), (3, 4), (5, 6)]) | to_stream('data.csv', names = ('wind', 'rain'), delimit = b' ')
3
dagpype.chain(key_pipe)

Chains the result of applying an ad-hoc created pipe to each element.

Arguments:
key_pipe - Function mapping each element to a pipe.

Example:

>>> # Chain each element twice.
>>> source([1, 2, 3]) | chain(lambda p : source([p] * 2)) | to_list()
[1, 1, 2, 2, 3, 3]
dagpype.consec_group(key, key_pipe)
Groups consecutive similar elements by sending all such elements through an ad-hoc create pipe.
Arguments:
key – Function mapping each element to a key. This key will
be used to decide which elements are similar.

key_pipe – Function mapping each key to a pipe.

See Also:
dagpype.group()

Example:

>>> # Count number of tuples with same first item.
>>> source([(1, 1), (1, 455), (13, 0)]) | \
... consec_group(
...     lambda p : p[0], 
...     lambda k : sink(k) + count()) | \
... to_list()
[(1, 2), (13, 1)]
dagpype.dict_join(joined, key, common_pipe, out_of_dict_pipe=None, dict_only_pipe=None)

Performs an SQL-style join with a dictionary.

Arguments:

joined – Dictionary of items with which to join. key – Function mapping each element to a key. This key will

be used to decide with which joined element (if any) to join.
common_pipe – Function taking a key and a value from the joined dictionary, and
returning a pipe. This pipe will be used for all elements matching the key.
Keyword Arguments:
out_of_dict_pipe – Pipe used for all elements not in the joined dictionary (default None). dict_only_pipe – Pipe used for all elements only in the dictionary (default None).

Examples:

>>> # Assume employee.csv has the following content:
>>> # Name,EmpId,DeptName
>>> # Harry,3415,Finance
>>> # Sally,2241,Sales
>>> # George,3401,Finance
>>> # Harriet,2202,Sales
>>> # Nelson,2455,Entertainment
>>> #
>>> # Assume dept.csv has the following content:
>>> # DeptName,Manager
>>> # Finance,George
>>> # Sales,Harriet
>>> # Production,Charles
>>> # Create a dictionary mapping employees to managers:
>>> d = stream_vals('employee.csv', (b'Name', b'EmpId', b'DeptName'), (bytes, int, bytes)) | \
...     dict_join(
...         stream_vals('dept.csv', (b'DeptName', b'Manager'), (bytes, bytes)) | to_dict(),
...         lambda name_id_dept : name_id_dept[2],
...         lambda dept, manager : filt(lambda name_id_dept : (name_id_dept[0], manager)),
...         filt(lambda name_id_dept : (name_id_dept[0], None)), 
...         None) | \
...     to_dict()
>>> assert d[b'Harriet'] == b'Harriet'
>>> assert d[b'Nelson'] is None
>>> # Annoying Py3k compatibility stuff:
>>> # Create a dictionary mapping managers to the number of people they manage:
>>> d = stream_vals('employee.csv', (b'Name', b'EmpId', b'DeptName'), (bytes, int, bytes)) | \
...     dict_join(
...         stream_vals('dept.csv', (b'DeptName', b'Manager'), (bytes, bytes)) | to_dict(),
...         lambda name_id_dept : name_id_dept[2],
...         lambda dept, manager : sink(manager) + count(),
...         None, 
...         filt(lambda dept_manager : (dept_manager[1], 0))) | \
...     to_dict()
>>> assert d[b'Harriet'] == 2
>>> assert d[b'Charles'] == 0
dagpype.filt(trans=None, pre=None, post=None)

Filter (transform elements and / or suppress them).

Keyword Arguments: trans – Transformation function for each element (default None). pre – Suppression function checked against each element before

transformation function, if any (default None).
post – Suppression function checked against each element after
transformation function, if any (default None).
See Also:
dagpype.from_() dagpype.to() dagpype.from_to() dagpype.skip() dagpype.nth() dagpype.slice_() dagpype.tail() dagpype.sink() dagpype.grep()

Example:

>>> # square-root of non-negative elements
>>> source([-1, 4, -3, 16]) | filt(trans = lambda x : math.sqrt(x), pre = lambda x : x >= 0) | to_list()
[2.0, 4.0]
dagpype.filters(fn)

Decorator signifying a (generator) function is a filter function.

Arguments:
fn – Decorated function.

Example:

>>> # Function adding 1 to anything (there are simpler ways of doing this):
>>> def add_1():
...     @filters
...     def _act(target):
...         try:
...             while True:
...                 target.send((yield) + 1)
...         except GeneratorExit:
...             target.close()
...     return _act    
dagpype.group(key, key_pipe)
Groups not-necessarily-consecutive similar elements by sending all such elements through an ad-hoc create pipe.
Arguments:
key – Function mapping each element to a key. This key will
be used to decide which elements are similar.

key_pipe – Function mapping each key to a pipe.

See Also:
dagpype.consec_group()

Example:

>>> # Count number of tuples with same first item.
>>> source([(1, 1), (13, 0), (1, 455)]) | \
...     group(
...         lambda p : p[0], 
...         lambda k : sink(k) + count()) | \
...     to_list()
[(1, 2), (13, 1)]
dagpype.sub_pipe_target(pipe, target)
Call, from within an implementation of a filter, to dynamically create a sub-target formed by concatenating a pipe to the current target.
Arguments:
pipe – Typically a pipe created within the implementation. target – The target provided to the implementation.
Returns:If the pipe is a source pipe - None. Otherwise a subtarget to which elements can be passed through send(), closed via close(), etc.

Example:

>>> def some_implementation(target):
...     pipe = filt(lambda x : 2 * x)
...     sub_target = sub_pipe_target(pipe, target)
...     try:
...         while True:
...             sub_target.send((yield))
...     except GeneratorExit:
...         sub_target.close()
...         target.close()                
dagpype.to_list()

Converts all elements to a list.

See Also:
dagpype.to_dict() dagpype.np.to_array()

Example:

>>> source((1, 2, 3, 4)) | to_list()
[1, 2, 3, 4]

Numpy stages.

The examples in this section assume

>>> import numpy
dagpype.np.chunk_source(seq, max_elems=1024)

Streams stream elements as lists.

Arguments:
seq – Some sequence.
Keyword Arguments:
max_elems – Number of rows per chunk (last might have less) (default 1024).
See Also:
dagpype.source()
dagpype.np.chunk_stream_bytes(stream, max_elems=8192, dtype=<type 'numpy.float64'>, num_cols=1)
Reads a binary file containing a numpy.array, and emits a series of chunks. Each chunk is a numpy array with num_cols columns.
Arguments:
stream – Either the name of a file or a binary stream.
Keyword Arguments:
max_elems – Number of rows per chunk (last might have less) (default 8192). dtype – Underlying element type (default numpy.float64) num_cols – Number of columns in the chunks’ arrays (default 1).
See Also:
dagpype.np.chunk_stream_vals() dagpype.np.chunks_to_stream_bytes()

Example:

>>> # Reads from a binary file, and writes the cumulative average to a different one.
>>> np.chunk_stream_bytes('meteo.dat') | np.cum_ave() | np.chunks_to_stream_bytes('wind_ave.dat')
5
dagpype.np.chunk_stream_vals(stream, cols, types_=None, missing_vals=None, delimit=', ', comment=None, skip_init_space=True, max_elems=8192)
Streams delimited (e.g., by commas for CSV files, or by tabs for TAB files) values as tuples of numpy.arrays.
Arguments:

stream – Either the name of a file or a binary stream. cols – Indication of which columns to read. If either an integer or a tuple of integers,

the corresponding columns will be read. If either a string or a tuple of strings, the columns whose first rows have these string names (excluding the first row) will be read.
Keyword Arguments:
types_ – Either None, a type, or a tuple of types (must correspond to cols). The read values will
be cast to these types. If None, this is a tuple of floats.
missing_vals – Either None, a value, or a tuple of values (must correspond to cols). Missing values will be filled from
this parameter. If None, this is a tuple of 0s cast to types_.

delimit – Delimiting binary character (default b’,’). comment – Comment-starting binary character or None (default).

Any character starting from this one until the line end will be ignored.

skip_init_space – Whether spaces starting a field will be ignored (default True). max_elems – Number of rows per chunk (last might have less) (default 8192).

See Also:
dagpype.stream_vals() dagpype.np.chunk_stream_bytes()

Examples:

>>> # Find the correlation between two named columns.
>>> np.chunk_stream_vals('meteo.csv', (b'day', b'wind'), (float, float), (0, 0)) | np.corr()    
0.019720323758326334
>>> #Equivalent to:    
>>> np.chunk_stream_vals('meteo.csv', (b'day', b'wind')) | np.corr()    
0.019720323758326334
>>> # Find the correlation between two indexed columns.
>>> np.chunk_stream_vals('neat_data.csv', (3, 0)) | np.corr()    
-0.084075296393769511
dagpype.np.chunk(max_elems=8192, dtype=<type 'numpy.float64'>)

Transforms a sequence of elements into chunks.

Keyword Arguments:
max_elems – Number of elements per chunk (last might have less) (default 8192). dtype – Underlying element type (default numpy.float64)
See Also:
dagpype.np.unchunk()

Example:

>>> source([0.000000133, 2.3, 9.2, 4.3, -5]) | np.chunk() | np.min_()
-5.0
dagpype.np.cum_ave()

Example:

>>> # Reads from a CSV file, and writes the cumulative average to a different one.
>>> np.chunk_stream_vals('meteo.csv', b'wind') | np.cum_ave() | np.chunks_to_stream('wind_ave.csv', b'wind')
1
dagpype.np.cum_sum()

Example:

>>> # Reads from a CSV file, and writes the cumulative sum to a different one.
>>> np.chunk_stream_vals('meteo.csv', b'wind') | np.cum_sum() | np.chunks_to_stream('wind_ave.csv', b'wind')
1
dagpype.np.enumerate_(start=0)
Enumerates the elements of the chunks of a stream. Transforms it into a stream of pairs, where the first of each is a numpy.array of a running index.
Arguments:
start – Starting value of running index (default 0).
See Also:
dagpype.enumerate_()

Example:

>>> source(['a', 'b', 'c', 'd']) | np.chunk(dtype = str) | np.enumerate_() | nth(0)
(array([0, 1, 2, 3]), array(['a', 'b', 'c', 'd'], 
      dtype='|S1'))
dagpype.np.exp_ave(alpha)

Example:

>>> # Reads from a CSV file, and writes the exponential average to a different one.
>>> np.chunk_stream_vals('meteo.csv', b'wind') | np.cum_ave() | np.chunks_to_stream('wind_ave.csv', b'wind')
1
dagpype.np.skip(n)

Skips the first n elements (not chunks) from a stream of chunks.

See Also:
dagpype.skip()

Example:

>>> # Calculate the mean of 'wind' column elements except the first three.
>>> np.chunk_stream_vals('meteo.csv', b'wind') | np.skip(3) | np.mean()
7.3508771929824563
dagpype.np.unchunk()

Complementary action to chunk. Transforms the rows of an array to tuples.

See Also:
dagpype.np.chunk()

Example:

>>> l = source([numpy.array([[1, 2], [3, 4]])]) | np.unchunk() | to_list()
>>> assert l[0] == (1, 2)
dagpype.np.chunks_mean()

Finds means chunks. They must have the same size.

See Also:
dagpype.chunks_stddev()

Example:

>>> source([numpy.array([1, 2, 3, 4], dtype = float), numpy.array([5, 6, 7, 8])]) | np.chunks_mean()
array([ 3.,  4.,  5.,  6.])
dagpype.np.chunks_stddev(ddof=1)

Calculates the sample standard deviation of chunks. They must have the same size. See the documentation of stddev in the parent module.

See Also:
dagpype.chunks_mean()

Example:

>>> source([numpy.array([1, 2, 3, 4]), numpy.array([5, 6, 7, 8])]) | np.chunks_stddev()
array([ 2.82842712,  2.82842712,  2.82842712,  2.82842712])
dagpype.np.chunks_sum()

Sums chunks. They must have the same size.

Example:

>>> source([numpy.array([1, 2, 3, 4]), numpy.array([5, 6, 7, 8])]) | np.chunks_sum()
array([ 6,  8, 10, 12])
dagpype.np.chunks_to_stream(stream, names=None, fmt='%.18e', delimit=', ', line_terminator='\n')
Writes elements to an output stream; computes the number of lines written (excluding names). If an element is a tuple, its sub-elements will be separated by the delimit byte. Elements will be separated by the line-terminator byte.
Arguments:
stream – Either a binary stream, e.g., as returned by open(..., ‘wb’), or a name of a file.
Keyword Arguments:
names – Either a byte array, a tuple of byte arrays, or None (default None). If not None, the names
will be written (separated by the delimit byte if more than one), followed by the line_terminator byte.

fmt – See corresponding parameter in numpy.savetxt delimit – Delimiting binary character between elements (default b’,’). line_terminator – Between-element binary character element (default b’

‘)

See Also:
dagpype.to_stream()

Examples:

>>> # Reads from a CSV file, and writes the cumulative average to a different one.
>>> np.chunk_stream_vals('meteo.csv', b'wind') | np.cum_ave() | np.chunks_to_stream('wind_ave.csv', b'wind')
1
dagpype.np.chunks_to_stream_bytes(stream)

Writes chunks to a binary stream.

See Also:
dagpype.np.chunk_stream_bytes() dagpype.np.chunks_to_stream() dagpype.to_stream()

Example:

>>> # Reads from a binary file, and writes the cumulative average to a different one.
>>> np.chunk_stream_bytes('meteo.dat') | np.cum_ave() | np.chunks_to_stream_bytes('wind_ave.dat')
5
dagpype.np.concatenate_chunks()

Concatenates chunks.

See Also:
dagpype.np.vstack_chunks()

Example:

>>> assert numpy.allclose(source(range(30000)) | np.chunk() | np.concatenate_chunks(), numpy.array(range(30000)))
dagpype.np.corr()

Finds the correlation between streams of chunk-pairs, or between streams of 2-column chunks.

See Also:
dagpype.corr()

Examples:

>>> # Find the correlation between two indexed columns.
>>> np.chunk_stream_vals('neat_data.csv', cols = (3, 0)) | np.corr()    
-0.084075296393769511
dagpype.np.count()

Computes the number of piped chunks’ elements.

See Also:
dagpype.count()

Examples:

>>> np.chunk_stream_bytes('meteo.dat') | np.count()
5
>>> source([1, 2, 3, 4]) | np.chunk() | np.count()
4
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.count()
2
dagpype.np.max_(axis=None)

Computes the maximum of piped chunks’ elements.

Keyword Arguments:
axis – Axis over which the maximum is taken (default None)
See Also:
dagpype.np.min_() dagpype.max_()

Examples:

>>> np.chunk_stream_bytes('meteo.dat') | np.max_()
3.1400000000000001
>>> source([1, 2, 3, 4]) | np.chunk() | np.max_()
4.0
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.max_()
4.0
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.max_(axis = 0)
array([ 3.,  4.])
dagpype.np.mean(axis=None)

Computes the mean of piped chunks’ elements.

Keyword Arguments:
axis – Axis over which the mean is taken (default None, all elements summed)
See Also:
dagpype.mean()

Examples:

>>> np.chunk_stream_bytes('meteo.dat') | np.mean()
1.1879999999999999
>>> source([1, 2, 3, 4]) | np.chunk() | np.mean()
2.5
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.mean()
2.5
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.mean(axis = 0)
array([ 2.,  3.])
dagpype.np.min_(axis=None)

Computes the minimum of piped chunks’ elements.

Keyword Arguments:
axis – Axis over which the minimum is taken (default None)
See Also:
dagpype.np.max_() dagpype.min_()

Examples:

>>> np.chunk_stream_bytes('meteo.dat') | np.min_()
-1.0
>>> source([1, 2, 3, 4]) | np.chunk() | np.min_()
1.0
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.min_()
1.0
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.min_(axis = 0)
array([ 1.,  2.])
dagpype.np.sum_(axis=None)

Computes the sum of piped chunks’ elements.

Keyword Arguments:
axis – Axis over which the sum is taken (default None, all elements summed)
See Also:
dagpype.sum_() dagpype.np.chunks_sum()

Examples:

>>> np.chunk_stream_bytes('meteo.dat') | np.sum_()
5.9399999999999995
>>> source([1, 2, 3, 4]) | np.chunk() | np.sum_()
10.0
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.sum_()
10.0
>>> source([(1, 2), (3, 4)]) | np.chunk() | np.sum_(axis = 0)
array([ 4.,  6.])
dagpype.np.to_array(dtype=None)

Converts all elements to a numpy.array.

Keyword Arguments: dtype – Same as in the ctor of numpy.array (default None).

Examples:

>>> source(((1, 2), (3, 4))) | np.to_array()
array([[1, 2],
       [3, 4]])
>>> a = source([1, 2, 3, 4]) | np.to_array(dtype = numpy.float64)
dagpype.np.vstack_chunks()

Stacks chunks vertically. They must have the same size.

See Also:
dagpype.np.concatenate_chunks()

Example:

>>> source([numpy.array([1, 2, 3, 4]), numpy.array([5, 6, 7, 8])]) | np.vstack_chunks()
array([[1, 2, 3, 4],
       [5, 6, 7, 8]])

This Page