Stages can be connected recursively to form any DAG (Directed Acyclic Graph). | (pipe) is used to chain stages, and + (fan) is used to stack stages.
<source> | <filter> forms a source.
For example,
source([1, 2, 3, 4]) | filt(lambda x : x ** 2)
is a source sending 1, then 4, then 9, then 16.
<filter_1> | <filter_2> forms a filter.
For example,
select_inds((0, 1)) | cast((float, float))
is a filter that, if fed a sequence of tuples, would transform ('1', '2', '3') to the tuple (1, 2).
<filter> | <sink> forms a sink.
For example,
prepend('results:') | to_stream('out.txt'):
is a sink that prints to ‘out.txt’ first ‘results:’, then whatever is sent to it.
<source> | <sink> results in the last object the sink sends on.
For example,
source(range(4)) | sum_of()
is 6.
<sink_1> | <sink_2> forms a sink.
For example,
to_array() | sink(lambda a : numpy.median(a))
is a sink that calculates a median of a sequence.
Note
Parentheses might be necessary for chaining sinks. For example,
>> source([1, 2, 3, 4, 5]) | (to_array() | sink(lambda a : numpy.median(a)))
3
is fine, but
>> source([1, 2, 3, 4, 5]) | to_array() | sink(lambda a : numpy.median(a))
TypeError: unsupported operand type(s) for |: 'int' and '_SnkPiped'
is not, because this is the same as
>> numpy.array([1, 2, 3, 4, 5]) | sink(lambda a : numpy.median(a))
<source_1> + <source_2> + ... + <source_n> forms a source which sends tuples of the source sends.
For example,
source([1, 2, 3]) + source([4, 5, 6])
sends the sequence of pairs (1, 4), (2, 5), (3, 6).
Similarly,
read_lines('wind.txt') + read_lines('rain.txt') + read_lines('hail.txt')
is a source which sends a sequence of triplets of lines from files.
<filter_1> + <filter_2> + ... + <filter_n> forms a filter to which whatever is sent, is duplicated along the n branches, and the sends of these branches are combined to tuples.
For example,
filt(lambda x : min(x, 5)) + filt(lambda x : min(x, 10))
forms a filter, that transforms each element into a pair, the first value of which is the element truncated to 5, and the second value of which is the element truncated to 10.
Similarly,
skip_n(-5) + skip_n(5)
forms a filter which transforms a sequence into a pair of itself shifted 5 to the past and the future.
<sink_1> + <sink_2> + ... + <sink_n> forms a sink whose result is the tuple of the branch results.
For example,
min_() + max_() + sum_()
is a sink that returns the triplet of the minimum, maximum, and sum of the sequence passed through it.