DAG Pipelines are more natural than Python’s regular control structures (e.g., loops and conditionals) in some cases, less so in other cases. Fortunately, they can be combined.
E.g., the following shows using pipelines within regular Python list comprehension, specifically creating a list of means and standard deviations of the contents of the files in some directory
>>> stats = [stream_vals(f) | mean() + stddev() for f in glob.glob('dir/*.txt')]
As a further example, the following shows using stages with regular Python conditionals, specifically,showing how a stage can be selected at runtime; if debug is used, a trace stage - tracing the elements piped through it - will be used, otherwise, a relay stage - passing elements passed through it - will be used:
>>> debug = True
>>> stream_vals('wind.txt') | (trace() if debug else relay()) | sum_()
0 : 2.0
1 : 4.0
2 : 7.0
3 : 23.0
4 : 0.0
5 : 2.0
6 : 4.0
7 : 7.0
8 : 23.0
9 : 0.0
10 : 2.0
11 : 4.0
12 : 7.0
13 : 23.0
14 : 0.0
15 : 2.0
16 : 4.0
17 : 7.0
18 : 23.0
19 : 0.0
20 : 2.0
21 : 4.0
22 : 7.0
23 : 23.0
24 : 0.0
25 : 2.0
26 : 4.0
27 : 7.0
28 : 23.0
29 : 0.0
30 : 2.0
31 : 4.0
32 : 7.0
33 : 23.0
34 : 0.0
35 : 2.0
36 : 4.0
37 : 7.0
38 : 23.0
39 : 0.0
40 : 2.0
41 : 4.0
42 : 7.0
43 : 23.0
44 : 0.0
45 : 2.0
46 : 4.0
47 : 7.0
48 : 23.0
49 : 0.0
50 : 2.0
51 : 4.0
52 : 7.0
53 : 23.0
54 : 0.0
55 : 2.0
56 : 4.0
57 : 7.0
58 : 23.0
59 : 0.0
432.0
>>> debug = False
>>> stream_vals('wind.txt') | (trace() if debug else relay()) | sum_()
432.0
Finally, the following shows functions returning sub-pipes, specifically a function taking a file name, and returning an exponential average of the absolute values of its values:
>>> def abs_exp_ave(f_name):
... return stream_vals(f_name) | abs_() | exp_ave(0.5)
>>> abs_exp_ave('foo.dat') + abs_exp_ave('bar.dat') | corr()
Unfortunately, there are still cases which require further augmentation. Cases describes common cases where augmenting pipelines is necessary,`Dynamic Subpipes`_ shows a mechanism where stages assimilate these control structures inside them, and Frozen and Thawed Targets shows the complementary mechanism, whereby standard control structures can use pipelines. The first approach is useful for very common cases, where the cost of writing new stages is offset by the ease of constructing pipelines using them once they are written.
Scientific data preparation and processing often involves SQL-like operations, even if the terminology is different. Here are three very common cases.
Consider for example the CSV file meteo.csv, in particular the ‘day’ and ‘wind’ columns. Each row contains a day and a wind measurement. Some consecutive rows give wind measurements for the same day, and we very often need to aggregate these similar entries.
Unfortunately, there is no single aggregation strategy that is always appropriate. We might aggregate all wind measurements of some day by a list of all their values, their average and standard deviation, median, sum, any combination of the preceding, and so on. We might do the preceding for all values which are not considered outliers, for example. In general, we might construct a pipeline for each set of similar elements. No stage can be written to do this (at least not efficiently).
This group case, where consecutive similar elements are aggregated, is not the only case where static pipelines need augmentation. The complementary operation, chain_, as well as join, are examples of this as well.
The group case aggregates consecutive similar elements. The chain case is complementary, in that it expands aggregates to similar consecutive elements.
Suppose, for example, we have a directory containing files whose names are of the form ‘data?.csv’. We wish to find the total average of all values in these files.
Using static pipelines, it is easy to build a source that iterates over all such filenames:
os_walk() | filename_filt('data?.csv')
Using static pipelines, it is also easy to find the average of the contents of a specific file:
stream_vals(f_name) | ave()
However, using static pipelines, it is not possible to connect the two. It is not possible to convey to the second pipeline that the output of the first pipeline is an aggregate (in this case, filenames) that should be expanded (in this case, to their content).
Join, and in particular equi-join, are very common operations. In this case, we have two sequences, and wish to output for each elements of the first sequence something that depends on the elements similar to it in the second sequence. This very general operation too cannot be captured by static pipelines.
E.g., consider the files employee.csv and dept.csv, which, respectively contain the following data, respectively:
Name | EmpID | DeptName |
---|---|---|
Harry | 3415 | Finance |
Sally | 2241 | Sales |
George | 3401 | Finance |
Harriet | 2202 | Sales |
Nelson | 2455 | Entertainment |
and
DeptName | Manager |
---|---|
Finance | George |
Sales | Harriet |
Production | Charles |
We might want to map each employee to his / her manager, or map each manager to the number of employees managed.
In this approach, stages internally create sub-pipes on the fly. This section shows stages for the group, chain, and join cases above, as well as how to write such stages.
The dagpype.group() stage takes two functions: the first maps each element to a key used for deciding which elements are similar, and the second is used for dynamically constructing a sub-pipe for similar elements.
def group(key, key_pipe):
"""
Groups consecutive similar elements by sending all such elements
through an ad-hoc create pipe.
Arguments:
key -- Function mapping each element to a key. This key will
be used to decide which elements are similar.
key_pipe -- Function mapping each key to a pipe.
For aggregating the same-day wind measurements of meteo.csv, we can do the following:
>>>> csv_vals('meteo.csv', ('day', 'wind')) | \
... group(
... key = lambda (day, wind) : day,
... key_pipe = lambda day : sink(day) + (select_inds(1) | ave()) + (select_inds(1) | stddev())) | \
... to_csv('day_wind.csv', ('day', 'ave', 'stddev'))
The line
key = lambda (day, wind) : day
indicates that elements are similar if they share the same day. The line
key_pipe = lambda day : sink(day) + (select_inds(1) | ave()) + (select_inds(1) | stddev()))
indicates that we wish to pass all elements with the same day, to a fanned sink consisting of three parts: the day, the average of wind values, and their standard deviation. Note that the resulting triples can be streamed on in the regular manner; in this case, they are written to a CSV file.
To form a list of the medians of wind measurements of same days, we can do this:
>>>> csv_vals('meteo.csv', ('day', 'wind')) | \
... group(
... key = lambda (day, wind) : day,
... key_pipe = lambda day : select_inds(1) | to_array() | sink(lambda a : numpy.median(a))) | \
... to_list()
The dagpype.chain() stage takes a function mapping each element to a pipe emanating all elements which this element aggregates.
def chain(key_pipe):
"""
Chains the result of applying an ad-hoc created pipe to each element.
Arguments:
key_pipe - Function mapping each element to a pipe.
To find the total average of all values in files of the form ‘data?.csv’, we can do this:F
>>>> os_walk() | filename_filt('data?.csv') | \
... chain(lambda f_name : stream_vals(f_name)) \ |
... ave()
The dagpype.dict_join() stage takes a dictionary, two functions, and two optional pipelines:
def dict_join(
joined,
key,
common_pipe,
out_of_dict_pipe = None,
dict_only_pipe = None):
"""
Performs an SQL-style join with a dictionary.
Arguments:
joined -- Dictionary of items with which to join.
key -- Function mapping each element to a key. This key will
be used to decide with which joined element (if any) to join.
common_pipe -- Function taking a key and a value from the joined dictionary, and
returning a pipe. This pipe will be used for all elements matching the key.
Keyword Arguments:
out_of_dict_pipe -- Pipe used for all elements not in the joined dictionary (default None).
dict_only_pipe -- Pipe used for all elements only in the dictionary (default None).
For creating a dictionary mapping employees to managers, we can do the following (refer to the tables above):
>>>> d = csv_vals('data/employee.csv', ('Name', 'EmpId', 'DeptName'), (str, int, str)) | \
... dict_join(
... csv_vals('data/dept.csv', ('DeptName', 'Manager'), (str, str)) | to_dict(),
... lambda name_id_dept : name_id_dept[2],
... lambda dept, manager : filt(lambda name_id_dept : (name_id_dept[0], manager)),
... filt(lambda name_id_dept : (name_id_dept[0], None)),
... None) | \
... to_dict()
>>> assert d['Harriet'] == 'Harriet'
>>> assert d['Nelson'] is None
For creating a dictionary mapping managers to the number of people they manage, we can do the following:
>>> d = csv_vals('data/employee.csv', ('Name', 'EmpId', 'DeptName'), (str, int, str)) | \
... dict_join(
... csv_vals('data/dept.csv', ('DeptName', 'Manager'), (str, str)) | to_dict(),
... lambda name_id_dept : name_id_dept[2],
... lambda dept, manager : sink(manager) + count(),
... None,
... filt(lambda dept_manager : (dept_manager[1], 0))) | \
... to_dict()
>>> assert d['Harriet'] == 2
>>> assert d['Charles'] == 0
In this approach, pipelines can be used within regular control structures.
Using the dagpype.freeze() function, any sink can be “frozen”. Connecting a pipeline to a regular sink causes immediate evaluation of the result. Conversely, connecting a pipeline to a frozen sink does not do so. Only “thawing” a sink, through the dagpype.thaw() function, causes a result evaluation.
For example, in order to calculate the average of all values in files of the form ‘data?.csv’, we can do this:
from glob import iglob
a = freeze(ave())
for f_name in iglob('data?.csv'):
stream_vals(f_name) | a
print thaw(a)
The line
a = freeze(ave())
forms a frozen version of the ave sink. The line
stream_vals(f_name) | a
does not evaluate into a float, it merely updates the frozen sink. Only the line
print thaw(a)
thaws the sink (and, in this case, prints the result).
Similarly, in order to calculate the average and standard deviation of all values in files of the form ‘data?.csv’ truncated to 10, we can do this:
from glob import iglob
a = freeze(filt(lambda x : min(x, 10) | ave() + stddev())
for f_name in iglob('data?.csv'):
stream_vals(f_name) | cast(float) | a
print thaw(a)