• Stars
    star
    252
  • Rank 161,312 (Top 4 %)
  • Language
    Python
  • License
    MIT License
  • Created over 4 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A Python concurrency scheduling library, compatible with asyncio and trio.

aiometer

Build Status Coverage Python versions Package version

aiometer is a Python 3.7+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to execute lots of tasks concurrently while controlling concurrency limits (i.e. applying backpressure) and collecting results in a predictable manner.

Content

Example

Let's use HTTPX to make web requests concurrently...

Try this code interactively using IPython.

>>> import asyncio
>>> import functools
>>> import random
>>> import aiometer
>>> import httpx
>>>
>>> client = httpx.AsyncClient()
>>>
>>> async def fetch(client, request):
...     response = await client.send(request)
...     # Simulate extra processing...
...     await asyncio.sleep(2 * random.random())
...     return response.json()["json"]
...
>>> requests = [
...     httpx.Request("POST", "https://httpbin.org/anything", json={"index": index})
...     for index in range(100)
... ]
...
>>> # Send requests, and process responses as they're made available:
>>> async with aiometer.amap(
...     functools.partial(fetch, client),
...     requests,
...     max_at_once=10, # Limit maximum number of concurrently running tasks.
...     max_per_second=5,  # Limit request rate to not overload the server.
... ) as results:
...     async for data in results:
...         print(data)
...
{'index': 3}
{'index': 4}
{'index': 1}
{'index': 2}
{'index': 0}
...
>>> # Alternatively, fetch and aggregate responses into an (ordered) list...
>>> jobs = [functools.partial(fetch, client, request) for request in requests]
>>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5)
>>> results
[{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]

Installation

This project is in beta and maturing. Be sure to pin any dependencies to the latest minor.

pip install "aiometer==0.4.*"

Features

  • Concurrency management and throttling helpers.
  • asyncio and trio support.
  • Fully type annotated.
  • 100% test coverage.

Usage

Flow control

The key highlight of aiometer is allowing you to apply flow control strategies in order to limit the degree of concurrency of your programs.

There are two knobs you can play with to fine-tune concurrency:

  • max_at_once: this is used to limit the maximum number of concurrently running tasks at any given time. (If you have 100 tasks and set max_at_once=10, then aiometer will ensure that no more than 10 run at the same time.)
  • max_per_second: this option limits the number of tasks spawned per second. This is useful to not overload I/O resources, such as servers that may have a rate limiting policy in place.

Example usage:

>>> import asyncio
>>> import aiometer
>>> async def make_query(query):
...     await asyncio.sleep(0.05)  # Simulate a database request.
...
>>> queries = ['SELECT * from authors'] * 1000
>>> # Allow at most 5 queries to run concurrently at any given time:
>>> await aiometer.run_on_each(make_query, queries, max_at_once=5)
...
>>> # Make at most 10 queries per second:
>>> await aiometer.run_on_each(make_query, queries, max_per_second=10)
...
>>> # Run at most 10 concurrent jobs, spawning new ones at least every 5 seconds:
>>> async def job(id):
...     await asyncio.sleep(10)  # A very long task.
...
>>> await aiometer.run_on_each(job, range(100),  max_at_once=10, max_per_second=0.2)

Running tasks

aiometer provides 4 different ways to run tasks concurrently in the form of 4 different run functions. Each function accepts all the options documented in Flow control, and runs tasks in a slightly different way, allowing to address a variety of use cases. Here's a handy table for reference (see also the API Reference):

Entrypoint Use case
run_on_each() Execute async callbacks in any order.
run_all() Return results as an ordered list.
amap() Iterate over results as they become available.
run_any() Return result of first completed function.

To illustrate the behavior of each run function, let's first setup a hello world async program:

>>> import asyncio
>>> import random
>>> from functools import partial
>>> import aiometer
>>>
>>> async def get_greeting(name):
...     await asyncio.sleep(random.random())  # Simulate I/O
...     return f"Hello, {name}"
...
>>> async def greet(name):
...     greeting = await get_greeting(name)
...     print(greeting)
...
>>> names = ["Robert", "Carmen", "Lucas"]

Let's start with run_on_each(). It executes an async function once for each item in a list passed as argument:

>>> await aiometer.run_on_each(greet, names)
'Hello, Robert!'
'Hello, Lucas!'
'Hello, Carmen!'

If we'd like to get the list of greetings in the same order as names, in a fashion similar to Promise.all(), we can use run_all():

>>> await aiometer.run_all([partial(get_greeting, name) for name in names])
['Hello, Robert', 'Hello, Carmen!', 'Hello, Lucas!']

amap() allows us to process each greeting as it becomes available (which means maintaining order is not guaranteed):

>>> async with aiometer.amap(get_greeting, names) as greetings:
...     async for greeting in greetings:
...         print(greeting)
'Hello, Lucas!'
'Hello, Robert!'
'Hello, Carmen!'

Lastly, run_any() can be used to run async functions until the first one completes, similarly to Promise.any():

>>> await aiometer.run_any([partial(get_greeting, name) for name in names])
'Hello, Carmen!'

As a last fun example, let's use amap() to implement a no-threads async version of sleep sort:

>>> import asyncio
>>> from functools import partial
>>> import aiometer
>>> numbers = [0.3, 0.1, 0.6, 0.2, 0.7, 0.5, 0.5, 0.2]
>>> async def process(n):
...     await asyncio.sleep(n)
...     return n
...
>>> async with aiometer.amap(process, numbers) as results:
...     sorted_numbers = [n async for n in results]
...
>>> sorted_numbers
[0.1, 0.2, 0.2, 0.3, 0.5, 0.5, 0.6, 0.7]

How To

Multiple parametrized values in run_on_each and amap

run_on_each and amap only accept functions that accept a single positional argument (i.e. (Any) -> Awaitable).

So if you have a function that is parametrized by multiple values, you should refactor it to match this form.

This can generally be achieved like this:

  1. Build a proxy container type (eg. a namedtuple), eg T.
  2. Refactor your function so that its signature is now (T) -> Awaitable.
  3. Build a list of these proxy containers, and pass it to aiometer.

For example, assuming you have a function that processes X/Y coordinates...

async def process(x: float, y: float) -> None:
    pass

xs = list(range(100))
ys = list(range(100))

for x, y in zip(xs, ys):
    await process(x, y)

You could use it with amap by refactoring it like this:

from typing import NamedTuple

# Proxy container type:
class Point(NamedTuple):
    x: float
    y: float

# Rewrite to accept a proxy as a single positional argument:
async def process(point: Point) -> None:
    x = point.x
    y = point.y
    ...

xs = list(range(100))
ys = list(range(100))

# Build a list of proxy containers:
points = [Point(x, y) for x, y in zip(x, y)]

# Use it:
async with aiometer.amap(process, points) as results:
    ...

API Reference

Common options

  • max_at_once (Optional, int): the maximum number of concurrently running tasks at any given time.
  • max_per_second (Optional, int): the maximum number of tasks spawned per second.

aiometer.run_on_each()

Signature: async aiometer.run_on_each(async_fn, args, *, max_at_once=None, max_per_second=None) -> None

Concurrently run the equivalent of async_fn(arg) for arg in args. Does not return any value. To get return values back, use aiometer.run_all().

aiometer.run_all()

Signature: async aiometer.run_all(async_fns, max_at_once=None, max_per_second=None) -> list

Concurrently run the async_fns functions, and return the list of results in the same order.

aiometer.amap()

Signature: async aiometer.amap(async_fn, args, max_at_once=None, max_per_second=None) -> async iterator

Concurrently run the equivalent of async_fn(arg) for arg in args, and return an async iterator that yields results as they become available.

aiometer.run_any()

Signature: async aiometer.run_any(async_fns, max_at_once=None, max_per_second=None) -> Any

Concurrently run the async_fns functions, and return the first available result.

Contributing

See CONTRIBUTING.md.

License

MIT

More Repositories

1

awesome-asgi

A curated list of awesome ASGI servers, frameworks, apps, libraries, and other resources
Python
1,303
star
2

djangorestframework-api-key

🔐 API key permissions for Django REST Framework
Python
578
star
3

asgi-lifespan

Programmatic startup/shutdown of ASGI apps.
Python
157
star
4

msgpack-asgi

Drop-in MessagePack support for ASGI applications and frameworks
Python
137
star
5

arel

Lightweight browser hot reload for Python ASGI web apps
Python
116
star
6

kafka-fraud-detector

🚨 Simple, self-contained fraud detection system built with Apache Kafka and Python
Python
77
star
7

httpx-sse

Consume Server-Sent Event (SSE) messages with HTTPX
Python
46
star
8

asgi-htmx

HTMX integration for ASGI applications
Python
39
star
9

starlette-auth-toolkit

Authentication backends and helpers for Starlette-based ASGI apps and frameworks
Python
32
star
10

www

Code for https://florimond.dev
Python
30
star
11

ddtrace-asgi

Unofficial Datadog tracing integration for ASGI apps and frameworks
Python
30
star
12

proxyx

[No maintenance intended] Proof of concept lightweight HTTP/1.1 proxy service built with ASGI and HTTPX.
Python
29
star
13

asgi-sitemaps

Sitemap generation for Python ASGI web apps
Python
20
star
14

asgi-caches

Server-side HTTP caching for ASGI applications, inspired by Django's cache framework
Python
18
star
15

pyboids

A boids flocking behaviour algorithm implementation in Python and Pygame
Python
14
star
16

python-in-browser

🐍🛥🌟 Running Python in the browser with Batavia and Starlette
Python
12
star
17

dataclasses-properties

🐍🤝 Reconciling Python's dataclasses and properties
Python
11
star
18

ng-courses

Fetch and display a list of courses by implementing the Model-Adapter Pattern
TypeScript
10
star
19

httpxprof

A tool for profiling HTTPX using cProfile and SnakeViz
Python
9
star
20

all-my-repos

Apply changes across all my repos using https://github.com/asottile/all-repos.
Python
7
star
21

personal

🖥 Personal blog frontend app
TypeScript
7
star
22

cs-ir

Implementation of an Information Retrieval System (IRS)
Jupyter Notebook
5
star
23

limier

Smart conversion and validation toolkit powered by type annotations
Python
4
star
24

paperiano

Paper + Piano (+ Computer Vision + Deep Learning) = Paperiano
Jupyter Notebook
4
star
25

personal-vue

⛰ Vue remake of my personal blog frontend app. Built for learning purposes.
Vue
4
star
26

subscriptions-transport-ws-python

Pure Python, asynchronous, event-loop-agnostic implementation of the subscriptions-transport-ws protocol
Python
4
star
27

azure-pipelines-templates

Azure Pipelines templates for my repos
3
star
28

azp-python-example

Opinionated example Python package Azure Pipelines setup w/ TestPyPI publish on tags
Shell
2
star
29

roller-coaster-loops

Calculus applied to roller coaster physics
Python
2
star
30

datacenters-drought

Data visualization of datacenters location and water resources vs US West 2021 extreme droughts
Python
2
star
31

personal-api

⚡️ Personal blog API
Python
2
star
32

httpx-unixsocket-poc

POC third-party package for adding Unix Domain Socket support to HTTPX
Python
2
star
33

fetch-metadata-asgi

PoC ASGI middleware implementation of the Fetch Metadata specification
Python
2
star
34

pygame-assets

Lightweight asset management for Pygame
Python
2
star
35

ansible-learn

Learning Ansible by using it to configure a Nginx + Python + Uvicorn + Gunicorn + Supervisor deployment in a Vagrant VM.
Makefile
2
star
36

django-clean-media

Unused media files cleaning plugin for Django
Python
1
star
37

aragon

Rule-based streaming data cleansing application written in Python
Python
1
star
38

dots

Small game made with Love2D
Lua
1
star
39

pytest-unasync

Pytest plugin for async-to-sync test generation
Python
1
star
40

python-package-template

My opinionated cookiecutter template for Python packages.
Python
1
star
41

uptv

📺 TV show alerting web app
Python
1
star
42

solitaire-rs

A Rust terminal UI (TUI) implementation of Solitaire
Rust
1
star
43

ddtrace-examples

Playing around with Datadog APM
Python
1
star
44

simulations

Fun mathematical simulations, using Python
Python
1
star
45

realchat

A web-based real-time chat room application made with Go, Svelte, and SocketIO.
Svelte
1
star
46

tower-defense

A cool Python game about defending a tower. All your base are belong to us.
Python
1
star
47

bocadillo-ws-test

Debugging WebSocket connection closures with Bocadillo and Uvicorn
Python
1
star