• Stars
    star
    114
  • Rank 308,031 (Top 7 %)
  • Language
    Python
  • License
    MIT License
  • Created about 4 years ago
  • Updated about 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Utility function to parallelise pipelines of Python asyncio iterators/generators

asyncio-buffered-pipeline CircleCI Test Coverage

Parallelise pipelines of Python async iterables/generators.

Installation

pip install asyncio-buffered-pipeline

Usage / What problem does this solve?

If you have a chain of async generators, even though each is async, only one runs at any given time. For example, the below runs in (just over) 30 seconds.

import asyncio

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    it_1 = gen_1()
    it_2 = gen_2(it_1)
    it_3 = gen_3(it_2)

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline function allows you to make to a small change, passing each generator through its return value, to parallelise the generators to reduce this to (just over) 12 seconds.

import asyncio
from asyncio_buffered_pipeline import buffered_pipeline

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    buffer_iterable = buffered_pipeline()
    it_1 = buffer_iterable(gen_1())
    it_2 = buffer_iterable(gen_2(it_1))
    it_3 = buffer_iterable(gen_3(it_2))

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline ensures internal tasks are cancelled on any exception.

Buffer size

The default buffer size is 1. This is suitable if each iteration takes approximately the same amount of time. If this is not the case, you may wish to change it using the buffer_size parameter of buffer_iterable.

it = buffer_iterable(gen(), buffer_size=2)

Features

  • Only one task is created for each buffer_iterable, in which the iterable is iterated over, with its values stored in an internal buffer.

  • All the tasks of the pipeline are cancelled if any of the generators raise an exception.

  • If a generator raises an exception, the exception is propagated to calling code.

  • The buffer size of each step in the pipeline is configurable.

  • The "chaining" is not abstracted away. You still have full control over the arguments passed to each step, and you don't need to buffer each iterable in the pipeline if you don't want to: just don't pass those through buffer_iterable.

More Repositories

1

sqlite-s3-query

Python functions to query SQLite files stored on S3
Python
249
star
2

aiodnsresolver

Python asyncio DNS resolver
Python
62
star
3

fifolock

A flexible low-level tool to make synchronisation primitives in asyncio Python
Python
53
star
4

sqlite-memory-vfs

Python writable in-memory virtual filesystem for SQLite
Python
16
star
5

OpenTTDLab

A Python framework for running reproducible experiments using OpenTTD
Python
16
star
6

stream-inflate

Uncompress Deflate and Deflate64 streams in pure Python
Python
9
star
7

aiothrottler

Throttler for asyncio Python
Python
8
star
8

PDW-File-Browser

PHP
8
star
9

aiofastforward

Fast-forward time in asyncio Python by providing patched versions of loop.call_later, loop.call_at, loop.time, and asyncio.sleep.
Python
8
star
10

lowhaio

A lightweight Python asyncio HTTP client
Python
7
star
11

threaded-buffered-pipeline

Utility function to parallelise pipelines of Python iterables
Python
5
star
12

treelock

Fast read/write sub-tree locking for asyncio Python
Python
5
star
13

django-postgres-isolation-levels

A set of tests exploring PostgreSQL transactions and Django
Python
5
star
14

s3selectparser

Python parser for the S3 Select SQL language
Python
3
star
15

projections

Rotating the world before applying the Mercator projection in realtime in a browser
JavaScript
3
star
16

blog

My personal blog, powered by gulp, hosted on AWS S3 with Cloudfront as CDN
HTML
2
star
17

python-http-signature-client

Python implementation of the client side of the IETF draft "Signing HTTP Messages"
Python
2
star
18

python-http-signature-server

Python implementation of the server side of the IETF draft "Signing HTTP Messages"
Python
2
star
19

aiomemoize

Memoize ayncio Python function calls, with manual invalidation
Python
1
star
20

FullScreenImage

A MooTools plugin to show images in full screen, using a small Flash applet.
JavaScript
1
star
21

ffmpeg-tools

Couple of hacked-together scripts to join together jpgs frame-by-frame into a video with blackframes in between
Shell
1
star
22

ubuntu-ros-opencv

Docker image with ROS and OpenCV
Dockerfile
1
star
23

lowhaio-chunked

Chunked transfer request encoding for lowhaio
Python
1
star
24

lowhaio-aws-sigv4-unsigned-payload

AWS Signature Version 4 with unsigned payload signing for lowhaio
Python
1
star
25

aiomemoizettl

Memoize asyncio Python calls with a per-result TTL
Python
1
star
26

Fx.Presets

MooTools plugin that allows clear definition and use of groups of related animations on multiple elements.
JavaScript
1
star
27

aiotimeout

Timeout context manager for asyncio Python
Python
1
star
28

aiomemoizeconcurrent

Memoize concurrent asyncio Python function calls
Python
1
star
29

stream-parse-openttd

Python package to parse the contents of OpenTTD save games
Python
1
star