This is a Python framework for scientific data-processing and data-preparation DAG (directed acyclic graph) pipelines.
It is designed to work well within Python scripts or IPython, provide an in-Python alternative for sed, awk, perl, and grep, and complement libraries such as NumPy/SciPy, SciKits, pandas, MayaVi, PyTables, and so forth. Those libraries process data once it has been assembled. This library is for flexible data assembly and quick exploration, or for aggregating huge data which cannot be reasonably assembled.
Suppose we wish to check the correlation between the ‘wind’ and ‘rain’ columns in a CSV file, excluding all entries with values larger than 10 as outliers.
Here is some straightforward code that does this:
r = csv.DictReader('meteo.csv', ('wind', 'rain'))
sx, sxx, sy, syy, sxy, n = 0, 0, 0, 0, 0, 0
for row in r:
x, y = float(row['wind']), float(row['rain'])
if x < 10 and y < 10:
sx += x
sxx += x * x
sy += y
sxy += x * y
syy += y * y
n += 1
c = (n * sxy - sx * sy) / math.sqrt(n * sxx - sx * sx) / math.sqrt(n * syy - sy * sy)
This code is relatively long. The source-reading logic, filtering logic, and calculation logic, are not cleanly separated; the code cannot flexibly deal with changes to the data format from CSV to something else, for example.
Alternatively, here is some NumPy-based code that does this:
data = numpy.genfromtxt('meteo.csv', names = ('wind', 'rain'), delimiter = ',', skip_header = True)
wind = [wind for (wind, rain) in data if wind < 10 and rain < 10]
rain = [rain for (wind, rain) in data if wind < 10 and rain < 10]
c = numpy.corrcoef(wind, rain)[0, 1]
This code is shorter and more separated, but the numpy.genfromtxt call loads the entire dataset into memory, which can be very inefficient.
Conversely, using this library, the code becomes (assuming having typed from dagpype import *):
>>> c = stream_vals('meteo.csv', (b'wind', b'rain')) | \
... filt(pre = lambda (wind, rain) : wind < 10 and rain < 10) | \
... corr()
which processes the data efficiently, and, moreover, is short enough to use from the command line for quick data exploration.
The library has the following features:
The package is at PyPI.
The usual setup for Python libraries is used. Type:
$ pip install dagpype
or
$ sudo pip install dagpype
Note
To install this package from the source distribution, the system must have a C++ compiler installed. The setup script will invoke this compiler.
Using Python 2.* on Windows will attempt to invoke Visual Studio 2008. If you are using a Visual Studio 2010 or 2012, download and extract the archive. From within the DAGPype directory, use
> SET VS90COMNTOOLS=%VS100COMNTOOLS%
or
> SET VS90COMNTOOLS=%VS110COMNTOOLS%
(for Visual Studio 2010 and 2012, respectively), followed by
> python setup.py install
The documentation is hosted at PyPI Docs and can also be found in the ‘docs’ directory of the distribution.
Bugtracking is on Google Code.
(See more online sed-like, perl-like, and awk-like examples)
Note
The following examples assume first typing from dagpype import *
>>> stream_vals('wind.txt') + stream_vals('rain.txt') | corr()
0.74
>>> stream_vals('wind.txt') | mean() + stddev() + min_() + max_()
(3, 0.4, 0, 9)
>>> stream_vals('meteo.csv', (b'wind', b'rain')) | \
... filt(lambda (wind, rain) : (min(wind, 10), min(rain, 10))) | \
... to_csv('fixed_data.csv', (b'wind', b'rain'))
>>> v = stream_vals('wind.txt') | np.to_array()
>>> v = stream_vals('rain.txt') | skip(3) | skip(-4) | to_list()
>>> stream_vals('meteo.csv', (b'day', b'wind')) | \
... group(
... key = lambda (day, wind) : day,
... key_pipe = lambda day : sink(day) + (select_inds(1) | mean()) + (select_inds(1) | stddev())) | \
... to_csv('day_wind.csv', (b'day', b'mean', b'stddev'))
>>> c = stream_vals('wind.txt') | skip_n(-5) + skip_n(5) | corr()
>>> c = stream_vals('meteo.csv', (b'wind', b'rain')) | \
... (select_inds(0) | low_pass_filter(0.5)) + (select_inds(1) | low_pass_filter(0.5)) | \
... corr()
>>> stream_vals('meteo.csv', 'rain') | prob_rand_sample(0.01) | (to_array() | sink(lambda a : numpy.median(a))
>>> stream_vals('meteo.csv', 'rain') | size_rand_sample(100) | (to_array() | sink(lambda a : numpy.median(a))
>>> debug = True
>>> stream_vals('wind.txt') | (trace() if debug else relay()) | sum_()
0 : 2.0
1 : 4.0
2 : 7.0
3 : 23.0
...
57 : 7.0
58 : 23.0
59 : 0.0
432.0
>>> debug = False
>>> stream_vals('wind.txt') | (trace() if debug else relay()) | sum_()
432.0
>>> def abs_exp_ave(f_name):
... return stream_vals(f_name) | abs_() | exp_ave(0.5)
>>> abs_exp_ave('foo.dat') + abs_exp_ave('bar.dat') | corr()
>>> stats = [stream_vals(f) | mean() + stddev() for f in glob.glob('dir/*.txt')]
>>> stream_vals('meteo.csv', (b'day', b'wind')) | \
... group(lambda (day, wind) : day, lambda day : select_inds(1) | mean()) | \
... cum_ave() | (plot.plot() | plot.show())
>>> stream_vals('meteo.csv', (b'day', b'wind')) | \
... group(
... lambda (day, wind) : day,
... lambda day : select_inds(1) | (np.to_array() | sink(lambda a : median(a))) | \
... cum_ave() | (plot.plot() | plot.show())
Version | Date | Description |
---|---|---|
0.1.5.1 | 19/04/2013 | NumPy 1.7.1 Compilation bugfix |
0.1.5 | 18/04/2013 | Doctest in unittests: numerous doc bugfixes, illegal floats parse bugfix |
0.1.4.0 | 18/02/2013 | CSV parser IEEE exception bugfix, offline docs bugfix |
0.1.3.9 | 03/02/2013 | Python 2.6 backport |
0.1.3.8 | 28/01/2013 | OS X build bugfixes |
0.1.3.7 | 18/01/2013 | Windows build / IO bugfixes |
0.1.3.6 | 16/01/2013 | Py3K |
0.1.3.5 | 15/01/2013 | CSV parser bugfix, Perl, Awk, and Sed type stages, GNU/Linux gather-IO backend |
0.1.3.4 | 05/01/2013 | Moving to PyPI |
0.1.3.3 | 02/01/2013 | Consecutive vs. non-consecutive grouping, import bugfix |
0.1.3.2 | 20/12/2012 | CSV filter stage |
0.1.3.1 | 12/12/2012 | Missing extension bugfix |
0.1.3 | 12/12/2012 | Optimized C numpy IO |
0.1.2.3 | 7/12/2012 | Bugfix (stream_vals) |
0.1.2.2 | 3/12/2012 | Bugfixes (VC++, select_inds) |
0.1.2.1 | 2/12/2012 | Better pyplot keyword args support |
0.1.2 | 29/11/2012 | Various plotting stages |
0.1.1.3 | 29/11/2012 | Rand size samples, text/regex grep |
0.1.1.1 | 20/10/2012 | More moving averages, min/max, quantiles, etc. |
0.1.1 | 20/10/2012 | Optimized C CSV reader |
0.1.0.3 | 30/9/2012 | Py3 compatibility |
0.1.0.2 | 28/9/2012 | More numpy stages |
0.1.0.1 | 22/9/2012 | Setup bugfix |
0.1.0 | 18/8/2012 | Initial release |
This library uses many ideas from David Beazley’s generator talk [Beazley08] and coroutine talk [Beazley09]
[Beazley08] | http://www.dabeaz.com/generators/ |
[Beazley09] | http://www.dabeaz.com/coroutines/ |
Many thanks to Anand Jeyahar, Brad Reisfeld, Tal Kremerman, Eran Segal, and Simon Pantzare for patches.