Augmenting Pipelines

DAG Pipelines are more natural than Python’s regular control structures (e.g., loops and conditionals) in some cases, less so in other cases. Fortunately, they can be combined.

E.g., the following shows using pipelines within regular Python list comprehension, specifically creating a list of means and standard deviations of the contents of the files in some directory

>>> stats = [stream_vals(f) | mean() + stddev() for f in glob.glob('dir/*.txt')]

As a further example, the following shows using stages with regular Python conditionals, specifically,showing how a stage can be selected at runtime; if debug is used, a trace stage - tracing the elements piped through it - will be used, otherwise, a relay stage - passing elements passed through it - will be used:

>>> debug = True
>>> stream_vals('wind.txt') | (trace() if debug else relay()) | sum_()
0 : 2.0
1 : 4.0
2 : 7.0
3 : 23.0
4 : 0.0
5 : 2.0
6 : 4.0
7 : 7.0
8 : 23.0
9 : 0.0
10 : 2.0
11 : 4.0
12 : 7.0
13 : 23.0
14 : 0.0
15 : 2.0
16 : 4.0
17 : 7.0
18 : 23.0
19 : 0.0
20 : 2.0
21 : 4.0
22 : 7.0
23 : 23.0
24 : 0.0
25 : 2.0
26 : 4.0
27 : 7.0
28 : 23.0
29 : 0.0
30 : 2.0
31 : 4.0
32 : 7.0
33 : 23.0
34 : 0.0
35 : 2.0
36 : 4.0
37 : 7.0
38 : 23.0
39 : 0.0
40 : 2.0
41 : 4.0
42 : 7.0
43 : 23.0
44 : 0.0
45 : 2.0
46 : 4.0
47 : 7.0
48 : 23.0
49 : 0.0
50 : 2.0
51 : 4.0
52 : 7.0
53 : 23.0
54 : 0.0
55 : 2.0
56 : 4.0
57 : 7.0
58 : 23.0
59 : 0.0
432.0
>>> debug = False
>>> stream_vals('wind.txt') | (trace() if debug else relay()) | sum_()
432.0

Finally, the following shows functions returning sub-pipes, specifically a function taking a file name, and returning an exponential average of the absolute values of its values:

>>> def abs_exp_ave(f_name):
...     return stream_vals(f_name) | abs_() | exp_ave(0.5)

>>> abs_exp_ave('foo.dat') + abs_exp_ave('bar.dat') | corr()

Unfortunately, there are still cases which require further augmentation. Cases describes common cases where augmenting pipelines is necessary,`Dynamic Subpipes`_ shows a mechanism where stages assimilate these control structures inside them, and Frozen and Thawed Targets shows the complementary mechanism, whereby standard control structures can use pipelines. The first approach is useful for very common cases, where the cost of writing new stages is offset by the ease of constructing pipelines using them once they are written.

Cases

Scientific data preparation and processing often involves SQL-like operations, even if the terminology is different. Here are three very common cases.

Group

Consider for example the CSV file meteo.csv, in particular the ‘day’ and ‘wind’ columns. Each row contains a day and a wind measurement. Some consecutive rows give wind measurements for the same day, and we very often need to aggregate these similar entries.

Unfortunately, there is no single aggregation strategy that is always appropriate. We might aggregate all wind measurements of some day by a list of all their values, their average and standard deviation, median, sum, any combination of the preceding, and so on. We might do the preceding for all values which are not considered outliers, for example. In general, we might construct a pipeline for each set of similar elements. No stage can be written to do this (at least not efficiently).

This group case, where consecutive similar elements are aggregated, is not the only case where static pipelines need augmentation. The complementary operation, chain_, as well as join, are examples of this as well.

Chain

The group case aggregates consecutive similar elements. The chain case is complementary, in that it expands aggregates to similar consecutive elements.

Suppose, for example, we have a directory containing files whose names are of the form ‘data?.csv’. We wish to find the total average of all values in these files.

Using static pipelines, it is easy to build a source that iterates over all such filenames:

os_walk() | filename_filt('data?.csv')

Using static pipelines, it is also easy to find the average of the contents of a specific file:

stream_vals(f_name) | ave()

However, using static pipelines, it is not possible to connect the two. It is not possible to convey to the second pipeline that the output of the first pipeline is an aggregate (in this case, filenames) that should be expanded (in this case, to their content).

Join

Join, and in particular equi-join, are very common operations. In this case, we have two sequences, and wish to output for each elements of the first sequence something that depends on the elements similar to it in the second sequence. This very general operation too cannot be captured by static pipelines.

E.g., consider the files employee.csv and dept.csv, which, respectively contain the following data, respectively:

Name EmpID DeptName
Harry 3415 Finance
Sally 2241 Sales
George 3401 Finance
Harriet 2202 Sales
Nelson 2455 Entertainment

and

DeptName Manager
Finance George
Sales Harriet
Production Charles

We might want to map each employee to his / her manager, or map each manager to the number of employees managed.

Dynamic Subpipes

In this approach, stages internally create sub-pipes on the fly. This section shows stages for the group, chain, and join cases above, as well as how to write such stages.

Group

The dagpype.group() stage takes two functions: the first maps each element to a key used for deciding which elements are similar, and the second is used for dynamically constructing a sub-pipe for similar elements.

def group(key, key_pipe):
"""
Groups consecutive similar elements by sending all such elements
    through an ad-hoc create pipe.

Arguments:
    key -- Function mapping each element to a key. This key will
        be used to decide which elements are similar.
    key_pipe -- Function mapping each key to a pipe.

For aggregating the same-day wind measurements of meteo.csv, we can do the following:

>>>> csv_vals('meteo.csv', ('day', 'wind')) | \
... group(
...     key = lambda (day, wind) : day,
...     key_pipe = lambda day : sink(day) + (select_inds(1) | ave()) + (select_inds(1) | stddev())) | \
... to_csv('day_wind.csv', ('day', 'ave', 'stddev'))

The line

key = lambda (day, wind) : day

indicates that elements are similar if they share the same day. The line

key_pipe = lambda day : sink(day) + (select_inds(1) | ave()) + (select_inds(1) | stddev()))

indicates that we wish to pass all elements with the same day, to a fanned sink consisting of three parts: the day, the average of wind values, and their standard deviation. Note that the resulting triples can be streamed on in the regular manner; in this case, they are written to a CSV file.

To form a list of the medians of wind measurements of same days, we can do this:

>>>> csv_vals('meteo.csv', ('day', 'wind')) | \
...     group(
...         key = lambda (day, wind) : day,
...         key_pipe = lambda day : select_inds(1) | to_array() | sink(lambda a : numpy.median(a))) | \
...     to_list()

Chain

The dagpype.chain() stage takes a function mapping each element to a pipe emanating all elements which this element aggregates.

def chain(key_pipe):
    """
    Chains the result of applying an ad-hoc created pipe to each element.

    Arguments:
        key_pipe - Function mapping each element to a pipe.

To find the total average of all values in files of the form ‘data?.csv’, we can do this:F

>>>> os_walk() | filename_filt('data?.csv') | \
...     chain(lambda f_name :  stream_vals(f_name)) \ |
...     ave()

Join

The dagpype.dict_join() stage takes a dictionary, two functions, and two optional pipelines:

def dict_join(
        joined,
        key,
        common_pipe,
        out_of_dict_pipe = None,
        dict_only_pipe = None):
    """
    Performs an SQL-style join with a dictionary.

    Arguments:
        joined -- Dictionary of items with which to join.
        key -- Function mapping each element to a key. This key will
            be used to decide with which joined element (if any) to join.
        common_pipe -- Function taking a key and a value from the joined dictionary, and
            returning a pipe. This pipe will be used for all elements matching the key.

    Keyword Arguments:
        out_of_dict_pipe -- Pipe used for all elements not in the joined dictionary (default None).
        dict_only_pipe -- Pipe used for all elements only in the dictionary (default None).

For creating a dictionary mapping employees to managers, we can do the following (refer to the tables above):

>>>> d = csv_vals('data/employee.csv', ('Name', 'EmpId', 'DeptName'), (str, int, str)) | \
...     dict_join(
...         csv_vals('data/dept.csv', ('DeptName', 'Manager'), (str, str)) | to_dict(),
...         lambda name_id_dept : name_id_dept[2],
...         lambda dept, manager : filt(lambda name_id_dept : (name_id_dept[0], manager)),
...         filt(lambda name_id_dept : (name_id_dept[0], None)),
...         None) | \
...     to_dict()
>>> assert d['Harriet'] == 'Harriet'
>>> assert d['Nelson'] is None

For creating a dictionary mapping managers to the number of people they manage, we can do the following:

>>> d = csv_vals('data/employee.csv', ('Name', 'EmpId', 'DeptName'), (str, int, str)) | \
...     dict_join(
...         csv_vals('data/dept.csv', ('DeptName', 'Manager'), (str, str)) | to_dict(),
...         lambda name_id_dept : name_id_dept[2],
...         lambda dept, manager : sink(manager) + count(),
...         None,
...         filt(lambda dept_manager : (dept_manager[1], 0))) | \
...     to_dict()
>>> assert d['Harriet'] == 2
>>> assert d['Charles'] == 0

Frozen And Thawed Targets

In this approach, pipelines can be used within regular control structures.

Using the dagpype.freeze() function, any sink can be “frozen”. Connecting a pipeline to a regular sink causes immediate evaluation of the result. Conversely, connecting a pipeline to a frozen sink does not do so. Only “thawing” a sink, through the dagpype.thaw() function, causes a result evaluation.

For example, in order to calculate the average of all values in files of the form ‘data?.csv’, we can do this:

from glob import iglob

a = freeze(ave())
for f_name in iglob('data?.csv'):
    stream_vals(f_name) | a
print thaw(a)

The line

a = freeze(ave())

forms a frozen version of the ave sink. The line

stream_vals(f_name) | a

does not evaluate into a float, it merely updates the frozen sink. Only the line

print thaw(a)

thaws the sink (and, in this case, prints the result).

Similarly, in order to calculate the average and standard deviation of all values in files of the form ‘data?.csv’ truncated to 10, we can do this:

from glob import iglob

a = freeze(filt(lambda x : min(x, 10) | ave() + stddev())
for f_name in iglob('data?.csv'):
    stream_vals(f_name) | cast(float) | a
print thaw(a)

Table Of Contents

Previous topic

XML

Next topic

Sed-Like Examples

This Page