• Stars
    star
    114
  • Rank 308,031 (Top 7 %)
  • Language
    Python
  • License
    BSD 3-Clause "New...
  • Created over 5 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Codd method-chained SQL generator and Pandas data processing in Python.

data_algebra

data_algebra is a piped data wrangling system based on Codd's relational algebra and experience working with data manipulation languages at scale. The primary purpose of the package is to support an easy to compose and maintain grammar of data processing steps that in turn can be used to generate database specific SQL. The package also implements the same transforms for Pandas DataFrames.

The package is available on PyPi, and can be installed with pip install data_algebra.

A good introduction can be found here, and many worked examples are here. A catalog of expression methods is found here. The pydoc documentation is here. And the README is a good place to check for news or updates.

Currently, the system is primarily adapted and testing for Pandas, Polars, Google BigQuery, PostgreSQL, SQLite, and Spark. Porting and extension is designed to be easy.

This is to be the Python equivalent of the R packages rquery, rqdatatable, and cdata. This package supplies piped Codd-transform style notation that can perform data engineering in Pandas or (still in development) Polars and generate SQL queries from the same specification.

Installing

Install data_algebra with pip install data_algebra

Announcement

This article introduces the data_algebra project: a data processing tool family available in R and Python. These tools are designed to transform data either in-memory or on remote databases. For an example (with video) of using data_algebra to re-arrange data layout please see here. The key question is: what operators (or major steps) are supported by the data algebra, and what methods (operations on columns) are supported. The operators are documented here, and which methods can be used in which contexts is linsted here. Also, please check the README for news.

In particular, we will discuss the Python implementation (also called data_algebra) and its relation to the mature R implementations (rquery and rqdatatable).

Introduction

The project intent is to realize a method chained data processing language based on Codd's relational operators that is easy to maintain, has helpful tooling, and has very similar realizations (or dialects) for:

  • SQL databases accessed from Python, useful working at scale with PostgreSQL or Apache Spark (Spark example here).
  • Pandas DataFrame objects in Python.
  • SQL databases access from R (implementation is here, and is mature and ready for production use).

The intent is the notation should look idiomatic in each language. Working in Python should feel like working in Python, and working in R should feel like working in R. The data semantics, however, are designed to be close to the SQL realizations (given the close connection of SQL to the relational algebra; in particular row numbering starts at 1 and row and column order is not preserved except at row-order steps or select-columns steps respectively). The intent is: it should be very easy to use the system in either Python or R (a boon to multi-language data science projects) and it is easy to port either code or experience from one system to another (a boon for porting projects, or for data scientists working with more than one code base or computer language).

Related work includes:

The data_algebra principles include:

  • Writing data transforms as a pipeline or method-chain of many simple transform steps.
  • Treating data transform pipelines or directed acyclic graphs (DAGs) as themselves being sharable data.
  • Being able to use the same transform specification many places (in memory, on databases, in R, in Python).

The data_algebra supplies two primary services:

  • Building composite data processing pipelines (which we demonstrate in this note).
  • Building record transforms (which we demonstrate here).

Example

Let's start with a pipeline example in Python (for a record transform example, please see here).

For our example we will assume we have a data set of how many points different subjects score in a psychological survey. The goal is transform the data so that we see what fraction of the subjects answers are in each category (subject to an exponential transform, as often used in logistic regression). We then treat the per-subject renormalized data as a probability or diagnosis.

The exact meaning of such a scoring method are not the topic of this note. It is a notional example to show a non-trivial data transformation need. In particular: having to normalize per-subject (divide some set of scores per-subject by a per-subject total) is a classic pain point in data-processing. In classic SQL this can only be done by joining against a summary table, or in more modern SQL with a "window function." We want to show by working in small enough steps this can be done simply.

Set up

Let's start our Python example. First we import the packages we are going to use, and set a few options.

import polars as pl
import data_algebra as da
import data_algebra.BigQuery


da.__version__
'1.5.1'

Now let's type in our example data. Notice this is an in-memory Polars Data.Frame.

d_local = pl.DataFrame({
    'subjectID':[1, 1, 2, 2],
    'surveyCategory': [ "withdrawal behavior", "positive re-framing", "withdrawal behavior", "positive re-framing"],
    'assessmentTotal': [5., 2., 3., 4.],
    'irrelevantCol1': ['irrel1']*4,
    'irrelevantCol2': ['irrel2']*4,
})

d_local
shape: (4, 5)
subjectID surveyCategory assessmentTotal irrelevantCol1 irrelevantCol2
i64 str f64 str str
1 "withdrawal beh... 5.0 "irrel1" "irrel2"
1 "positive re-fr... 2.0 "irrel1" "irrel2"
2 "withdrawal beh... 3.0 "irrel1" "irrel2"
2 "positive re-fr... 4.0 "irrel1" "irrel2"

Let's also copy this data to a database. Normally big data is already in the system one wants to work with, so the copying over is just to simulate the data already being there.

db_handle = data_algebra.BigQuery.example_handle()

print(db_handle)
BigQuery_DBHandle(db_model=BigQueryModel, conn=<google.cloud.bigquery.client.Client object at 0x7fb1c0cad270>)
remote_table_description = db_handle.insert_table(
    d_local.to_pandas(), 
    table_name='d', 
    allow_overwrite=True)

remote_table_description.head
subjectID surveyCategory assessmentTotal irrelevantCol1 irrelevantCol2
0 1 withdrawal behavior 5.0 irrel1 irrel2
1 1 positive re-framing 2.0 irrel1 irrel2
2 2 withdrawal behavior 3.0 irrel1 irrel2
3 2 positive re-framing 4.0 irrel1 irrel2

Normally one does not read data back from a database, but instead materializes results in the database with SQL commands such as CREATE TABLE tablename AS SELECT .... Also note: case in columns is a bit of nightmare. It is often best to lower-case them all.

Back to the data_algebra

Now we continue our example by importing the data_algebra components we need.

Now we use the data_algebra to define our processing pipeline: ops. We are writing this pipeline using a method chaining notation. This notation will look very much like a pipe to R/magrittr users.

scale = 0.237

ops = (
    da.descr(d=d_local)
        .extend({'probability': f'(assessmentTotal * {scale}).exp()'})
        .extend({'total': 'probability.sum()'},
                partition_by='subjectID')
        .extend({'probability': 'probability / total'})
        .extend({'row_number': '(1).cumsum()'},
                partition_by=['subjectID'],
                order_by=['probability'], 
                reverse=['probability'])
        .select_rows('row_number == 1')
        .select_columns(['subjectID', 'surveyCategory', 'probability'])
        .rename_columns({'diagnosis': 'surveyCategory'})
    )

We are deliberately writing a longer pipeline of simple steps, so we can use the same pipeline locally with Pandas or Polars, and (potentially) great scale with PostgreSQL or Apache Spark. A more concise variation of this pipeline can be found in the R example here.

The intent is: the user can build up very sophisticated processing pipelines using a small number of primitive steps. The pipelines tend to be long, but can still be very efficient- as they are well suited for use with Polars, and with SQL query optimizers. Most of the heavy lifting is performed by the very powerful "window functions" (triggered by use of partition_by and order_by) available on the extend() step. Multiple statements can be combined into extend steps, but only when they have the same window-structure, and don't create and use the same value name in the same statement (except for replacement, which is shown in this example). Many conditions are checked and enforced during pipeline construction, making debugging very easy.

For a more Pythonic way of writing the same pipeline we can show how the code would have been formatted by black.

py_source = ops.to_python(pretty=True)

print(py_source)
(
    TableDescription(
        table_name="d",
        column_names=[
            "subjectID",
            "surveyCategory",
            "assessmentTotal",
            "irrelevantCol1",
            "irrelevantCol2",
        ],
    )
    .extend({"probability": "(assessmentTotal * 0.237).exp()"})
    .extend({"total": "probability.sum()"}, partition_by=["subjectID"])
    .extend({"probability": "probability / total"})
    .extend(
        {"row_number": "(1).cumsum()"},
        partition_by=["subjectID"],
        order_by=["probability"],
        reverse=["probability"],
    )
    .select_rows("row_number == 1")
    .select_columns(["subjectID", "surveyCategory", "probability"])
    .rename_columns({"diagnosis": "surveyCategory"})
)

In either case, the pipeline is read as a sequence of operations (top to bottom, and left to right). What it is saying is:

  • We start with a table named "d" that is known to have columns "subjectID", "surveyCategory", "assessmentTotal", "irrelevantCol1", and "irrelevantCol2".

  • We produce a new table by transforming this table through a sequence of "extend" operations which add new columns.

    • The first extend computes probability = exp(scale*assessmentTotal), this is similar to the inverse-link step of a logistic regression. We assume when writing this pipeline we were given this math as a requirement.
    • The next few extend steps total the probability per-subject (this is controlled by the partition_by argument) and then rank the normalized probabilities per-subject (grouping again specified by the partition_by argument, and order controlled by the order_by clause).
  • We then select the per-subject top-ranked rows by the select_rows step.

  • And finally we clean up the results for presentation with the select_columns, rename_columns, and order_rows steps. The names of these methods are intended to evoke what they do.

The point is: each step is deliberately so trivial one can reason about it. However the many steps in sequence do quite a lot.

SQL

Once we have the ops object we can do quite a lot with it. We have already exhibited the pretty-printing of the pipeline. Next we demonstrate translating the operator pipeline into SQL.

sql = db_handle.to_sql(ops)

print(sql)
-- data_algebra SQL https://github.com/WinVector/data_algebra
--  dialect: BigQueryModel 1.5.1
--       string quote: "
--   identifier quote: `
WITH
 `table_reference_0` AS (
  SELECT
   `subjectID` ,
   `surveyCategory` ,
   `assessmentTotal`
  FROM
   `data-algebra-test.test_1.d`
 ) ,
 `extend_1` AS (
  SELECT  -- .extend({ 'probability': '(assessmentTotal * 0.237).exp()'})
   `subjectID` ,
   `surveyCategory` ,
   EXP(`assessmentTotal` * 0.237) AS `probability`
  FROM
   `table_reference_0`
 ) ,
 `extend_2` AS (
  SELECT  -- .extend({ 'total': 'probability.sum()'}, partition_by=['subjectID'])
   `subjectID` ,
   `surveyCategory` ,
   `probability` ,
   SUM(`probability`) OVER ( PARTITION BY `subjectID`  )  AS `total`
  FROM
   `extend_1`
 ) ,
 `extend_3` AS (
  SELECT  -- .extend({ 'probability': 'probability / total'})
   `subjectID` ,
   `surveyCategory` ,
   `probability` / `total` AS `probability`
  FROM
   `extend_2`
 ) ,
 `extend_4` AS (
  SELECT  -- .extend({ 'row_number': '(1).cumsum()'}, partition_by=['subjectID'], order_by=['probability'], reverse=['probability'])
   `subjectID` ,
   `surveyCategory` ,
   `probability` ,
   SUM(1) OVER ( PARTITION BY `subjectID` ORDER BY `probability` DESC  )  AS `row_number`
  FROM
   `extend_3`
 ) ,
 `select_rows_5` AS (
  SELECT  -- .select_rows('row_number == 1')
   `subjectID` ,
   `surveyCategory` ,
   `probability`
  FROM
   `extend_4`
  WHERE
   `row_number` = 1
 )
SELECT  -- .rename_columns({'diagnosis': 'surveyCategory'})
 `surveyCategory` AS `diagnosis` ,
 `subjectID` ,
 `probability`
FROM
 `select_rows_5`

Older SQL (with use of with or common table expressions) can be hard to read, as SQL expresses composition by inner-nesting (inside SELECT statements happen first). The operator pipeline expresses composition by sequencing or method-chaining, which can be a lot more legible. In this example we use the SQL-99 common table expression (WITH) notation to manage the composition in a more legible manner. A huge advantage of the SQL is: we can send it to the database for execution, as we do now.

Also notice the generated SQL has applied query narrowing: columns not used in the outer queries are removed from the inner queries. The "irrelevant" columns are not carried into the calculation as they would be with a SELECT *. This early optimization comes in quite handy.

db_handle.read_query(sql)
diagnosis subjectID probability
0 positive re-framing 2 0.558974
1 withdrawal behavior 1 0.670622

What comes back is: one row per subject, with the highest per-subject diagnosis and the estimated probability. Again, the math of this is outside the scope of this note (think of that as something coming from a specification)- the ability to write such a pipeline is our actual topic.

The hope is that the data_algebra pipeline is easier to read, write, and maintain than the SQL query. If we wanted to change the calculation we would just add a stage to the data_algebra pipeline and then regenerate the SQL query.

Polars

An advantage of the pipeline is it can also be directly used on Pandas or Polars DataFrames. Let's see how that is achieved.

ops.eval({'d': d_local})
shape: (2, 3)
subjectID diagnosis probability
i64 str f64
1 "withdrawal beh... 0.670622
2 "positive re-fr... 0.558974

There is also a shorthand notation for single table source pipelines:

ops.transform(d_local)
shape: (2, 3)
subjectID diagnosis probability
i64 str f64
1 "withdrawal beh... 0.670622
2 "positive re-fr... 0.558974

eval takes a dictionary of DataFrames (names matching names specified in the pipeline) and returns the result of applying the pipeline to this data. Currently our Pandas and Polars implementation only allows very simple window functions. This is why we didn't write probability = probability/sum(probability), but instead broken the calculation into multiple steps by introducing the total column (the SQL realization does in fact support more complex window functions). This is a small issue with the grammar: but our feeling encourage simple steps is in fact a good thing (improves debuggability), and in SQL the query optimizers likely optimize the different query styles into very similar realizations anyway.

Pandas

The exact same pipeline can be applied directly to Pandas data frames.

ops.transform(d_local.to_pandas())
subjectID diagnosis probability
0 1 withdrawal behavior 0.670622
1 2 positive re-framing 0.558974

Export/Import

Because our operator pipeline is a Python object with no references to external objects (such as the database connection), it can be saved through standard methods such as "pickling."

Some Advantages of data_algebra

A data_algebra operator pipeline carries around usable knowledge of the data transform.

For example:

# report all source table columns used by the query
ops.columns_used()
{'d': {'assessmentTotal', 'subjectID', 'surveyCategory'}}
# what columns does this operation produce?
ops.column_names
('subjectID', 'diagnosis', 'probability')

Conclusion

The data_algebra is part of a powerful cross-language and mutli-implementaiton family data manipulation tools. These tools can greatly reduce the development and maintenance cost of data science projects, while improving the documentation of project intent.

Win Vector LLC is looking for sponsors and partners to further the package. In particular if your group is using both R and Python in big-data projects (where SQL is a need, including Apache Spark), or are porting a project from one of these languages to another- please get in touch.

# be neat
db_handle.close()

Note: mysql is not fully supported, as it doesn't name quoted common table expression columns in an obvious way. Current primary databases are PostgreSQL, Google Big Query, SparkSQL, and SQLite.

More Repositories

1

zmPDSwR

Example R scripts and data for "Practical Data Science with R" 1st edition by Nina Zumel and John Mount (Manning Publications)
HTML
477
star
2

vtreat

vtreat is a data frame processor/conditioner that prepares real-world data for predictive modeling in a statistically sound manner. Distributed under choice of GPL-2 or GPL-3 license.
HTML
283
star
3

Examples

Various examples for different articles
HTML
152
star
4

wrapr

Wrap R for Sweet R Code
R
135
star
5

PDSwR2

Code, Data, and Examples for Practical Data Science with R 2nd edition (Nina Zumel and John Mount) https://github.com/WinVector/PDSwR2
HTML
130
star
6

pyvtreat

vtreat is a data frame processor/conditioner that prepares real-world data for predictive modeling in a statistically sound manner. Distributed under a BSD-3-Clause license.
Python
119
star
7

rquery

Data Wrangling and Query Generating Operators for R. Distributed under choice of GPL-2 or GPL-3 license.
HTML
109
star
8

WVPlots

Pre-packaged plots in R
R
84
star
9

replyr

Patches for using dplyr with Databases and Big Data
HTML
67
star
10

BigDataRStrata2017

All material for "Modeling big data with R, sparklyr, and Apache Spark" Strata Hadoop 2017.
HTML
63
star
11

seplyr

Improved Standard Evaluation Interfaces for Common Data Manipulation Tasks
R
49
star
12

cdata

Higher order fluid or coordinatized data transforms in R. Distributed under choice of GPL-2 or GPL-3 license.
R
44
star
13

CampaignPlanner

Example code for Lesson on Response Campaign planning
HTML
38
star
14

rqdatatable

Implement the rquery piped query algebra in R using data.table. Distributed under choice of GPL-2 or GPL-3 license.
R
37
star
15

Logistic

Experimental logistic regression code supporting multiple result categories, many levels of categorical modeling variables, good optimization, L2 regularization and more.
Java
35
star
16

AutoDiff

Example automatic differentiation code in Scala
Scala
30
star
17

sigr

Concise formatting of significances in R (GPL3 license).
HTML
27
star
18

ExploreModels

Code and data for "The Geometry of Classifiers"
R
26
star
19

WinVector.github.io

Viewable pages from WinVector LLC view at: http://winvector.github.io
HTML
23
star
20

NestedModelsTalk

Support materials for WinVector talk
19
star
21

CampaignPlanner_v3

Shiny demo of A/B test planning and evaluation (improved UI for A/B testing method taught in free video course)
R
17
star
22

WVLPSolver

Experimental pure Java revised simplex linear program solver (Apache 2.0 license)
Java
15
star
23

Locality-Sensitive-Hashing-Example

Simple example of Locality Sensitive Hashing
Java
14
star
24

RcppDynProg

Dynamic Programming implemented in Rcpp. Includes example partition and out of sample fitting applications.
C++
14
star
25

ODSCWest2017

Win-Vector LLC ODSC West 2017 presentation materials (will be populated by the day of the conference)
HTML
14
star
26

kcomp

Demonstration of parametric bootstrap to find k for kmeans
HTML
10
star
27

ValidatingModelsInR

Slides and code for "Validating Models in R" Strata 2016 RDay http://conferences.oreilly.com/strata/hadoop-big-data-ca/public/schedule/detail/48053
HTML
10
star
28

SQLScrewdriver

Iterate through database tables (by JDBC) and TSV(tab separated values)/CSV(comma separated values) and load/dump data.
Java
8
star
29

wvpy

Tools to convert from Jupyter notebooks to and from Python .py files, and render.
HTML
8
star
30

FastBaseR

Examples of fast grouped row-wise operations in R (no C, C++, data.table, or dplyr used).
R
6
star
31

VectorDemo

Tutorial on using vectors in data science projects.
Jupyter Notebook
3
star
32

OutOfCore

Example of out of core coding techniques
Java
2
star
33

Importance-Sampling

Importance Sampling Example
Java
2
star
34

ExampleRPackage

Example of how to build a simple R package
R
2
star
35

ClassifierMetrics

Some examples of measuring classifier performance in R
HTML
2
star
36

QSurvival

Quasi observation based survival package for R.
R
2
star
37

LStep

Trivial demonstration of a diverging Newton-Raphson step when solving a logistic regression
Java
2
star
38

JXREF

Java based XML tool to help check Manning Agile Author XML for cross reference problems (Java based, GPL3+ license)
Java
1
star
39

ExperimentInspector

Java code to build synthetic data sets that match reported summary totals. Helps explore possible range of variation.
Java
1
star
40

crosspca

Cross-validated PCA/PCR demonstration based on the work: http://www.win-vector.com/blog/2016/05/pcr_part2_yaware/
R
1
star
41

SessionExample

Example code for articles on sessionizing data.
1
star
42

daccum

Example library to accumulate data frame rows in R
R
1
star
43

YConditionalRegularizedModel

Example of a neural net model, with regularization on y-conditional activation patterns
Jupyter Notebook
1
star
44

ATasteOfDataScience

Working an example of supervised machine learning in Python
Jupyter Notebook
1
star
45

CVRTSEncoder

Spectral encoding of categorical variables using model residual trajectories
R
1
star
46

wvu

Win Vector LLC Python data science teaching tools (graphs and data manipulation)
HTML
1
star
47

BreakingNestedModelBias

Support materials for Win-Vector blog article
HTML
1
star