Stages

A stage is a pipeline element that creates data for the pipeline (e.g., by reading it from a file), manipulates it (e.g., by calculating its absolute value), or consumes it (e.g., by writing it to a file, or by calculating its average). The first type is a source, the second is a filter, and the third is a sink.

Note

  • This page describes the different types of stages, and how to create new ones.
  • Connecting Stages To DAGs describes how to connect stages to form DAGs.
  • Reference describes the available stages.

Sources

A source is a pipeline stage to which elements are not sent - it only sends on. A typical source is something that reads text files, CSV files, a numpy.Array, and so forth.

A source is formed either using the dagpype.source() function, or using the dagpype.sources() decorator.

Creating A Source Via The dagpype.source() Function

The dagpype.source() function takes anything iterable. for example, to create a counter counting from 0 up to 5, we can use:

source(range(5))

or, more generally,

def counter(n):
    return source(range(n))

Creating A Source Via The dagpype.sources() Decorator

The dagpype.sources() decorator transforms a generator function to a source. Thus another way to create a counter is through

def count(n):
    @sources
    def _act():
        for i in range(n):
            yield i
    return _act

The latter method is more appropriate for sources whose logic is more complex and stateful. Here is the classic Fibonacci example:

def fib(n):
    @sources
    def _act():
        a, b, counter = 0, 1, 0
        while True:
            if (counter > n): return
            yield a
            a, b = b, a + b
            counter += 1
    return _act

Filters

A filter is a pipeline stage to which elements are sent, and which sends elements as well.

A filter, in general, is formed using the dagpype.filters() decorator. Many common filters can be formed using the dagpype.filt() function.

Creating A Filter Via The dagpype.filters() Decorator

The dagpype.filters() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield), and sends them on through send. The GeneratorExit exception indicates that no more elements are coming.

For example, to create an absolute-value filter, we can use

def abs_():
    def _act(target):
        try:
            while True:
                target.send((yield))
        except GeneratorExit:
            target.close()
    return _act

A filter need not necessarily send an element per element sent to it. Here is an appending filter:

def append(stuff):
    def _act(target):
        try:
            while True:
                target.send((yield))
        except GeneratorExit:
            target.send(stuff)
            target.close()
    return _act

Creating A Filter Via The dagpype.filt() Function

In the very common case where a filter does send at most an element per element sent to it, possibly blocking some of them, the helper function filt can be used:

def filt(trans = None, pre = None, post = None):
    """
    Filter (transform elements and / or suppress them).

    Keyword arguments:
    trans = Transformation function for each element (default None).
    pre = Suppression function checked against each element before
        transformation function, if any (default None).
    post = Suppression function checked against each element after
        transformation function, if any (default None).

The dagpype.abs_() filter, for example, can be written as:

filt(lambda x : abs(x))

Sinks

A sink is a pipeline stage to which elements are sent for it to perform some final action.

A sink, in general, is formed using the dagpype.sinks() decorator. Many common sinks can be formed using the dagpype.sink() function.

Creating A Sink Via The dagpype.sinks() Decorator

The dagpype.sinks() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield). It sends its final output in the GeneratorExit handler.

For example, to create a sink determining if the sequence has even or odd length, we can use:

def odd_count():
    def _act(target):
        try:
            n = 0
            while True:
                (yield)
                n += 1
        except GeneratorExit:
            # This is the final result of the sink.
            target.send(n % 2 == 1)
            target.close()
    return _act

Creating A Sink Via The dagpype.sink() Function

There are two common special cases in which a sink can be formed more easily using the dagpype.sink() function.

In the first case, the desired result is some Python object independent from the input sequence. Calling dagpype.sink() with a Python object (except a function) forms a sink whose output is unconditionally that object. For example:

>> source([1, 2]) | sink('hello')
'hello'


>> source([1, 2, 3]) | sink('hello')
'hello'

In the second case, the desired result is the application of a function to the last element of the sequence. Calling dagpype.sink() with a function forms a sink whose output is the application of the function to the last element of the sequence. For example:

>> source([1, 2, 3]) | sink(lambda x : x ** 2)
9