Connecting Stages To DAGs

Stages can be connected recursively to form any DAG (Directed Acyclic Graph). | (pipe) is used to chain stages, and + (fan) is used to stack stages.

| (Pipe)

  1. <source> | <filter> forms a source.

    source | filter

    For example,

    source([1, 2, 3, 4]) | filt(lambda x : x ** 2)
    

    is a source sending 1, then 4, then 9, then 16.

  2. <filter_1> | <filter_2> forms a filter.

    filter | filter

    For example,

    select_inds((0, 1)) | cast((float, float))
    

    is a filter that, if fed a sequence of tuples, would transform ('1', '2', '3') to the tuple (1, 2).

  3. <filter> | <sink> forms a sink.

    filter | sink

    For example,

    prepend('results:') | to_stream('out.txt'):

    is a sink that prints to ‘out.txt’ first ‘results:’, then whatever is sent to it.

  4. <source> | <sink> results in the last object the sink sends on.

    source | sink

    For example,

    source(range(4)) | sum_of()
    

    is 6.

  1. <sink_1> | <sink_2> forms a sink.

    sink | sink

    For example,

    to_array() | sink(lambda a : numpy.median(a))
    

    is a sink that calculates a median of a sequence.

    Note

    Parentheses might be necessary for chaining sinks. For example,

    >> source([1, 2, 3, 4, 5]) | (to_array() | sink(lambda a : numpy.median(a)))
    3

    is fine, but

     >> source([1, 2, 3, 4, 5]) | to_array() | sink(lambda a : numpy.median(a))
    TypeError: unsupported operand type(s) for |: 'int' and '_SnkPiped'

    is not, because this is the same as

    >> numpy.array([1, 2, 3, 4, 5]) | sink(lambda a : numpy.median(a))

+ (Fan)

  1. <source_1> + <source_2> + ... + <source_n> forms a source which sends tuples of the source sends.

    source + source

    For example,

    source([1, 2, 3]) + source([4, 5, 6])
    

    sends the sequence of pairs (1, 4), (2, 5), (3, 6).

    Similarly,

    read_lines('wind.txt') + read_lines('rain.txt') + read_lines('hail.txt')
    

    is a source which sends a sequence of triplets of lines from files.

  2. <filter_1> + <filter_2> + ... + <filter_n> forms a filter to which whatever is sent, is duplicated along the n branches, and the sends of these branches are combined to tuples.

    filt + filt

    For example,

    filt(lambda x : min(x, 5)) + filt(lambda x : min(x, 10))
    

    forms a filter, that transforms each element into a pair, the first value of which is the element truncated to 5, and the second value of which is the element truncated to 10.

    Similarly,

    skip_n(-5) + skip_n(5)
    

    forms a filter which transforms a sequence into a pair of itself shifted 5 to the past and the future.

  3. <sink_1> + <sink_2> + ... + <sink_n> forms a sink whose result is the tuple of the branch results.

    sink + sink

    For example,

    min_() + max_() + sum_()
    

    is a sink that returns the triplet of the minimum, maximum, and sum of the sequence passed through it.

Table Of Contents

Previous topic

Stages

Next topic

NumPy And High-Performance Chunking

This Page