• Stars
    star
    1,546
  • Rank 30,047 (Top 0.6 %)
  • Language
    Python
  • License
    MIT License
  • Created about 6 years ago
  • Updated about 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Concurrent data pipelines in Python >>>

Pypeln

Coverage


Pypeln (pronounced as "pypeline") is a simple yet powerful Python library for creating concurrent data pipelines.

Main Features

  • Simple: Pypeln was designed to solve medium data tasks that require parallelism and concurrency where using frameworks like Spark or Dask feels exaggerated or unnatural.
  • Easy-to-use: Pypeln exposes a familiar functional API compatible with regular Python code.
  • Flexible: Pypeln enables you to build pipelines using Processes, Threads and asyncio.Tasks via the exact same API.
  • Fine-grained Control: Pypeln allows you to have control over the memory and cpu resources used at each stage of your pipelines.

For more information take a look at the Documentation.

diagram

Installation

Install Pypeln using pip:

pip install pypeln

Basic Usage

With Pypeln you can easily create multi-stage data pipelines using 3 type of workers:

Processes

You can create a pipeline based on multiprocessing.Process workers by using the process module:

import pypeln as pl
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.process.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.process.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

At each stage the you can specify the numbers of workers. The maxsize parameter limits the maximum amount of elements that the stage can hold simultaneously.

Threads

You can create a pipeline based on threading.Thread workers by using the thread module:

import pypeln as pl
import time
from random import random

def slow_add1(x):
    time.sleep(random()) # <= some slow computation
    return x + 1

def slow_gt3(x):
    time.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.thread.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.thread.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Here we have the exact same situation as in the previous case except that the worker are Threads.

Tasks

You can create a pipeline based on asyncio.Task workers by using the task module:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.task.filter(slow_gt3, stage, workers=2)

data = list(stage) # e.g. [5, 6, 9, 4, 8, 10, 7]

Conceptually similar but everything is running in a single thread and Task workers are created dynamically. If the code is running inside an async task can use await on the stage instead to avoid blocking:

import pypeln as pl
import asyncio
from random import random

async def slow_add1(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x + 1

async def slow_gt3(x):
    await asyncio.sleep(random()) # <= some slow computation
    return x > 3


def main():
    data = range(10) # [0, 1, 2, ..., 9] 

    stage = pl.task.map(slow_add1, data, workers=3, maxsize=4)
    stage = pl.task.filter(slow_gt3, stage, workers=2)

    data = await stage # e.g. [5, 6, 9, 4, 8, 10, 7]

asyncio.run(main())

Sync

The sync module implements all operations using synchronous generators. This module is useful for debugging or when you don't need to perform heavy CPU or IO tasks but still want to retain element order information that certain functions like pl.*.ordered rely on.

import pypeln as pl
import time
from random import random

def slow_add1(x):
    return x + 1

def slow_gt3(x):
    return x > 3

data = range(10) # [0, 1, 2, ..., 9] 

stage = pl.sync.map(slow_add1, data, workers=3, maxsize=4)
stage = pl.sync.filter(slow_gt3, stage, workers=2)

data = list(stage) # [4, 5, 6, 7, 8, 9, 10]

Common arguments such as workers and maxsize are accepted by this module's functions for API compatibility purposes but are ignored.

Mixed Pipelines

You can create pipelines using different worker types such that each type is the best for its given task so you can get the maximum performance out of your code:

data = get_iterable()
data = pl.task.map(f1, data, workers=100)
data = pl.thread.flat_map(f2, data, workers=10)
data = filter(f3, data)
data = pl.process.map(f4, data, workers=5, maxsize=200)

Notice that here we even used a regular python filter, since stages are iterables Pypeln integrates smoothly with any python code, just be aware of how each stage behaves.

Pipe Operator

In the spirit of being a true pipeline library, Pypeln also lets you create your pipelines using the pipe | operator:

data = (
    range(10)
    | pl.process.map(slow_add1, workers=3, maxsize=4)
    | pl.process.filter(slow_gt3, workers=2)
    | list
)

Run Tests

A sample script is provided to run the tests in a container (either Docker or Podman is supported), to run tests:

$ bash scripts/run-tests.sh

This script can also receive a python version to check test against, i.e

$ bash scripts/run-tests.sh 3.7

Related Stuff

Contributors

License

MIT

More Repositories

1

treex

A Pytree Module system for Deep Learning in JAX
Python
215
star
2

karma

An MVC framework for Unity3D
C#
167
star
3

phi

Functional Programming + Python - Pain
Python
132
star
4

tensorbuilder

TensorBuilder is a TensorFlow library enables you to easily create complex deep neural networks by leveraging the phi DSL to help define their structure.
Python
92
star
5

ciclo

A functional training loops library for JAX
Python
83
star
6

nnx

Neural Networks for JAX
Python
82
star
7

NDArray

A Multidimensional Array library for Swift
Swift
64
star
8

simple-pytree

A dead simple Python package for creating custom JAX pytree objects
Python
61
star
9

treeo

A small library for creating and manipulating custom JAX Pytree classes
Python
59
star
10

cybrain

Neural Networks in Cython, inspired by PyBrain.
HTML
58
star
11

einop

Python
56
star
12

jax_metrics

A metrics library for the JAX ecosystem
Python
35
star
13

Stream

A simple data processing library for Swift
Swift
28
star
14

jax-gravity

Jupyter Notebook
24
star
15

quantile-regression

Jupyter Notebook
19
star
16

dataget

A framework-agnostic datasets library for Machine Learning research and education.
Python
18
star
17

dyn_plot

Jupyter Notebook
13
star
18

resume

Machine Learning Engineer with background in Maths & Physics
CSS
9
star
19

simple-detr

Python
8
star
20

python_path

Python
7
star
21

stop_iter

Python
7
star
22

umvc

C#
6
star
23

TypeJS

Automatic Type-checking for JavaScript Functions!
JavaScript
5
star
24

jax-streams

Jupyter Notebook
5
star
25

simple-self-supervised

Python
4
star
26

mnist-estimator

Jupyter Notebook
4
star
27

refx

Python
4
star
28

algorithms

Python
3
star
29

object-detection

Jupyter Notebook
3
star
30

simple-ddpm

Jupyter Notebook
3
star
31

aoc-2023

Mojo
3
star
32

synthetic-casino

Shell
2
star
33

flax-tools

Python
2
star
34

jax-differentiable

Python
2
star
35

simple-mixture-models

Python
2
star
36

simple-attention

Python
2
star
37

NeuralJS

Neural Networks in JavaScript, inspired by PyBrain.
JavaScript
2
star
38

swift-test

Swift
1
star
39

tree_collections

Rust
1
star
40

tensorrl

Python
1
star
41

ml-meetup-1-intro-tensorflow

1
star
42

flax-xor-demo

Jupyter Notebook
1
star
43

simple-perceiver

Python
1
star
44

point-cloud-attention

Python
1
star
45

holberton

Python
1
star
46

tfinterface

Python
1
star
47

supervised-avanzado-german-traffic-signs

Jupyter Notebook
1
star
48

udacity-simulator-dataset

Python
1
star
49

feature-engineering-blog

Jupyter Notebook
1
star
50

nfl

Jupyter Notebook
1
star
51

Atoms

molecules
C#
1
star
52

test_tf2

Jupyter Notebook
1
star
53

live-q-table

Jupyter Notebook
1
star
54

arista-server

JavaScript
1
star
55

domain-randomization

Python
1
star
56

media

1
star