A stage is a pipeline element that creates data for the pipeline (e.g., by reading it from a file), manipulates it (e.g., by calculating its absolute value), or consumes it (e.g., by writing it to a file, or by calculating its average). The first type is a source, the second is a filter, and the third is a sink.
Note
A source is a pipeline stage to which elements are not sent - it only sends on. A typical source is something that reads text files, CSV files, a numpy.Array, and so forth.
A source is formed either using the dagpype.source() function, or using the dagpype.sources() decorator.
The dagpype.source() function takes anything iterable. for example, to create a counter counting from 0 up to 5, we can use:
source(range(5))
or, more generally,
def counter(n):
return source(range(n))
The dagpype.sources() decorator transforms a generator function to a source. Thus another way to create a counter is through
def count(n):
@sources
def _act():
for i in range(n):
yield i
return _act
The latter method is more appropriate for sources whose logic is more complex and stateful. Here is the classic Fibonacci example:
def fib(n):
@sources
def _act():
a, b, counter = 0, 1, 0
while True:
if (counter > n): return
yield a
a, b = b, a + b
counter += 1
return _act
A filter is a pipeline stage to which elements are sent, and which sends elements as well.
A filter, in general, is formed using the dagpype.filters() decorator. Many common filters can be formed using the dagpype.filt() function.
The dagpype.filters() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield), and sends them on through send. The GeneratorExit exception indicates that no more elements are coming.
For example, to create an absolute-value filter, we can use
def abs_():
def _act(target):
try:
while True:
target.send((yield))
except GeneratorExit:
target.close()
return _act
A filter need not necessarily send an element per element sent to it. Here is an appending filter:
def append(stuff):
def _act(target):
try:
while True:
target.send((yield))
except GeneratorExit:
target.send(stuff)
target.close()
return _act
In the very common case where a filter does send at most an element per element sent to it, possibly blocking some of them, the helper function filt can be used:
def filt(trans = None, pre = None, post = None):
"""
Filter (transform elements and / or suppress them).
Keyword arguments:
trans = Transformation function for each element (default None).
pre = Suppression function checked against each element before
transformation function, if any (default None).
post = Suppression function checked against each element after
transformation function, if any (default None).
The dagpype.abs_() filter, for example, can be written as:
filt(lambda x : abs(x))
A sink is a pipeline stage to which elements are sent for it to perform some final action.
A sink, in general, is formed using the dagpype.sinks() decorator. Many common sinks can be formed using the dagpype.sink() function.
The dagpype.sinks() decorator transforms a coroutine function to a filter. The coroutine obtains elements through (yield). It sends its final output in the GeneratorExit handler.
For example, to create a sink determining if the sequence has even or odd length, we can use:
def odd_count():
def _act(target):
try:
n = 0
while True:
(yield)
n += 1
except GeneratorExit:
# This is the final result of the sink.
target.send(n % 2 == 1)
target.close()
return _act
There are two common special cases in which a sink can be formed more easily using the dagpype.sink() function.
In the first case, the desired result is some Python object independent from the input sequence. Calling dagpype.sink() with a Python object (except a function) forms a sink whose output is unconditionally that object. For example:
>> source([1, 2]) | sink('hello')
'hello'
>> source([1, 2, 3]) | sink('hello')
'hello'
In the second case, the desired result is the application of a function to the last element of the sequence. Calling dagpype.sink() with a function forms a sink whose output is the application of the function to the last element of the sequence. For example:
>> source([1, 2, 3]) | sink(lambda x : x ** 2)
9