• Stars
    star
    1,195
  • Rank 38,871 (Top 0.8 %)
  • Language
    Python
  • License
    Apache License 2.0
  • Created almost 8 years ago
  • Updated 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

AMQP 0.9 client designed for asyncio and humans.

aio-pika

ReadTheDocs Coveralls Github Actions Latest Version

A wrapper around aiormq for asyncio and humans.

Check out the examples and the tutorial in the documentation.

If you are a newcomer to RabbitMQ, please start with the adopted official RabbitMQ tutorial.

Note

Since version 5.0.0 this library doesn't use pika as AMQP connector. Versions below 5.0.0 contains or requires pika's source code.

Note

The version 7.0.0 has breaking API changes, see CHANGELOG.md for migration hints.

Features

  • Completely asynchronous API.
  • Object oriented API.
  • Transparent auto-reconnects with complete state recovery with connect_robust (e.g. declared queues or exchanges, consuming state and bindings).
  • Python 3.7+ compatible.
  • For python 3.5 users, aio-pika is available via aio-pika<7.
  • Transparent publisher confirms support.
  • Transactions support.
  • Complete type-hints coverage.

Installation

pip install aio-pika

Usage example

Simple consumer:

import asyncio
import aio_pika
import aio_pika.abc


async def main(loop):
    # Connect with the givien parameters is also valiable.
    # aio_pika.connect_robust(host="host", login="login", password="password")
    # You can only choose one option to create a connection, url or kw-based params.
    connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/", loop=loop
    )

    async with connection:
        queue_name = "test_queue"

        # Creating channel
        channel: aio_pika.abc.AbstractChannel = await connection.channel()

        # Declaring queue
        queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(
            queue_name,
            auto_delete=True
        )

        async with queue.iterator() as queue_iter:
            # Cancel consuming after __aexit__
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

                    if queue.name in message.body.decode():
                        break


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Simple publisher:

import asyncio
import aio_pika
import aio_pika.abc


async def main(loop):
    # Explicit type annotation
    connection: aio_pika.RobustConnection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/", loop=loop
    )

    routing_key = "test_queue"

    channel: aio_pika.abc.AbstractChannel = await connection.channel()

    await channel.default_exchange.publish(
        aio_pika.Message(
            body='Hello {}'.format(routing_key).encode()
        ),
        routing_key=routing_key
    )

    await connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Get single message example:

import asyncio
from aio_pika import connect_robust, Message


async def main(loop):
    connection = await connect_robust(
        "amqp://guest:[email protected]/",
        loop=loop
    )

    queue_name = "test_queue"
    routing_key = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Declaring exchange
    exchange = await channel.declare_exchange('direct', auto_delete=True)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    # Binding queue
    await queue.bind(exchange, routing_key)

    await exchange.publish(
        Message(
            bytes('Hello', 'utf-8'),
            content_type='text/plain',
            headers={'foo': 'bar'}
        ),
        routing_key
    )

    # Receiving message
    incoming_message = await queue.get(timeout=5)

    # Confirm message
    await incoming_message.ack()

    await queue.unbind(exchange, routing_key)
    await queue.delete()
    await connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

There are more examples and the RabbitMQ tutorial in the documentation.

See also

aiormq

aiormq is a pure python AMQP client library. It is under the hood of aio-pika and might to be used when you really loving works with the protocol low level. Following examples demonstrates the user API.

Simple consumer:

import asyncio
import aiormq

async def on_message(message):
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """
    print(f" [x] Received message {message!r}")
    print(f"Message body is: {message.body!r}")
    print("Before sleep!")
    await asyncio.sleep(5)   # Represents async I/O operations
    print("After sleep!")

async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('helo')
    consume_ok = await channel.basic_consume(
        declare_ok.queue, on_message, no_ack=True
    )

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

Simple publisher:

import asyncio
from typing import Optional

import aiormq
from aiormq.abc import DeliveredMessage

MESSAGE: Optional[DeliveredMessage] = None

async def main():
    global MESSAGE
    body = b'Hello World!'

    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # Creating a channel
    channel = await connection.channel()
    declare_ok = await channel.queue_declare("hello", auto_delete=True)

    # Sending the message
    await channel.basic_publish(body, routing_key='hello')
    print(f" [x] Sent {body}")

    MESSAGE = await channel.basic_get(declare_ok.queue)
    print(f" [x] Received message from {declare_ok.queue!r}")

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'

The patio and the patio-rabbitmq

PATIO is an acronym for Python Asynchronous Tasks for AsyncIO - an easily extensible library, for distributed task execution, like celery, only targeting asyncio as the main design approach.

patio-rabbitmq provides you with the ability to use RPC over RabbitMQ services with extremely simple implementation:

from patio import Registry, ThreadPoolExecutor
from patio_rabbitmq import RabbitMQBroker

rpc = Registry(project="patio-rabbitmq", auto_naming=False)

@rpc("sum")
def sum(*args):
    return sum(args)

async def main():
    async with ThreadPoolExecutor(rpc, max_workers=16) as executor:
        async with RabbitMQBroker(
            executor, amqp_url="amqp://guest:guest@localhost/",
        ) as broker:
            await broker.join()

And the caller side might be written like this:

import asyncio
from patio import NullExecutor, Registry
from patio_rabbitmq import RabbitMQBroker

async def main():
    async with NullExecutor(Registry(project="patio-rabbitmq")) as executor:
        async with RabbitMQBroker(
            executor, amqp_url="amqp://guest:guest@localhost/",
        ) as broker:
            print(await asyncio.gather(
                *[
                    broker.call("mul", i, i, timeout=1) for i in range(10)
                 ]
            ))

Propan🔥

Propan is a powerful and easy-to-use Python framework for building event-driven applications that interact with any MQ Broker.

If you need no deep dive into RabbitMQ details, you can use more high-level Propan interfaces:

from propan import PropanApp, RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = PropanApp(broker)

@broker.handle("user")
async def user_created(user_id: int):
    assert isinstance(user_id, int)
    return f"user-{user_id}: created"

@app.after_startup
async def pub_smth():
    assert (
        await broker.publish(1, "user", callback=True)
    ) ==  "user-1: created"

Also, Propan validates messages by pydantic, generates your project AsyncAPI spec, tests application locally, RPC calls, and more.

In fact, it is a high-level wrapper on top of aio-pika, so you can use both of these libraries' advantages at the same time.

python-socketio

Socket.IO is a transport protocol that enables real-time bidirectional event-based communication between clients (typically, though not always, web browsers) and a server. This package provides Python implementations of both, each with standard and asyncio variants.

Also this package is suitable for building messaging services over RabbitMQ via aio-pika adapter:

import socketio
from aiohttp import web

sio = socketio.AsyncServer(client_manager=socketio.AsyncAioPikaManager())
app = web.Application()
sio.attach(app)

@sio.event
async def chat_message(sid, data):
    print("message ", data)

if __name__ == '__main__':
    web.run_app(app)

And a client is able to call chat_message the following way:

import asyncio
import socketio

sio = socketio.AsyncClient()

async def main():
    await sio.connect('http://localhost:8080')
    await sio.emit('chat_message', {'response': 'my response'})

if __name__ == '__main__':
    asyncio.run(main())

The taskiq and the taskiq-aio-pika

Taskiq is an asynchronous distributed task queue for python. The project takes inspiration from big projects such as Celery and Dramatiq. But taskiq can send and run both the sync and async functions.

The library provides you with aio-pika broker for running tasks too.

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker()

@broker.task
async def test() -> None:
    print("nothing")

async def main():
    await broker.startup()
    await test.kiq()

Rasa

With over 25 million downloads, Rasa Open Source is the most popular open source framework for building chat and voice-based AI assistants.

With Rasa, you can build contextual assistants on:

  • Facebook Messenger
  • Slack
  • Google Hangouts
  • Webex Teams
  • Microsoft Bot Framework
  • Rocket.Chat
  • Mattermost
  • Telegram
  • Twilio

Your own custom conversational channels or voice assistants as:

  • Alexa Skills
  • Google Home Actions

Rasa helps you build contextual assistants capable of having layered conversations with lots of back-and-forth. In order for a human to have a meaningful exchange with a contextual assistant, the assistant needs to be able to use context to build on things that were previously discussed – Rasa enables you to build assistants that can do this in a scalable way.

And it also uses aio-pika to interact with RabbitMQ deep inside!

Versioning

This software follows Semantic Versioning

For contributors

Setting up development environment

Clone the project:

git clone https://github.com/mosquito/aio-pika.git
cd aio-pika

Create a new virtualenv for aio-pika:

python3 -m venv env
source env/bin/activate

Install all requirements for aio-pika:

pip install -e '.[develop]'

Running Tests

NOTE: In order to run the tests locally you need to run a RabbitMQ instance with default user/password (guest/guest) and port (5672).

The Makefile provides a command to run an appropriate RabbitMQ Docker image:

make rabbitmq

To test just run:

make test

Editing Documentation

To iterate quickly on the documentation live in your browser, try:

nox -s docs -- serve

Creating Pull Requests

Please feel free to create pull requests, but you should describe your use cases and add some examples.

Changes should follow a few simple rules:

  • When your changes break the public API, you must increase the major version.
  • When your changes are safe for public API (e.g. added an argument with default value)
  • You have to add test cases (see tests/ folder)
  • You must add docstrings
  • Feel free to add yourself to "thank's to" section

More Repositories

1

aiofile

Real asynchronous file operations with asyncio support.
Python
358
star
2

aiormq

Pure python AMQP 0.9.1 asynchronous client library
Python
207
star
3

pypi-server

Tornado based server like pypi.python.org. With caching from pypi.
Python
118
star
4

cysystemd

systemd wrapper on Cython
Cython
96
star
5

caio

Linux AIO c python bindings
C
44
star
6

async-class

Write classes with async def __ainit__
Python
37
star
7

aiohttp-xmlrpc

XMLRPC for aiohttp
Python
31
star
8

python-escpos

Fork of https://code.google.com/p/python-escpos/
Python
29
star
9

aiohttp-asgi

Run ASGI application with aiohttp
Python
21
star
10

crew

AMQP based worker/master pattern framework
Python
20
star
11

lumper

Distributed building system for docker. Pull repo from the github by tag-webhook and queueing task for building by worker.
Python
20
star
12

python-pidfile

Python
19
star
13

aiohttp-jsonrpc

JSON-RPC server and client implementation based on aiohttp
Python
16
star
14

argclass

A wrapper around the standard argparse module that allows you to describe argument parsers declaratively
Python
12
star
15

python-lsm

Python wrapper for lsm1 extension for sqlite4
C
9
star
16

flask-example

Python
7
star
17

slimurl

SlimURL - Fast library for parsing and building URL addresses
Python
6
star
18

aiohttp-compress

This module is the simplest way to enable compression support for aiohttp server applications globally.
Python
6
star
19

ssi-server

Simple SSI server for developers
Python
6
star
20

cgroups-exporter

cgroups prometheus exporter
Python
6
star
21

pyjwt-rsa

Helpers for JWT tokens with RSA.
Python
5
star
22

rabbitmq-cluster

docker image for clustering rabbitmq
Python
5
star
23

forklib

Fork the single process easily
Python
5
star
24

cmagic

Python wrapper for libmagic
Python
5
star
25

pylive555

Python live555 RTSP streaming bindings
C++
5
star
26

pygost

pygost mirror form http://pygost.cypherpunks.ru/
Python
4
star
27

markdown-pytest

A simple module to test your documentation examples with pytest
Python
4
star
28

subprocess_helper

Abstraction over python subprocess.Popen for running subprocess asynchronous
Python
3
star
29

ketama-python

Static C bindings for libketama
C
3
star
30

rest-client

RESTFull client library for python (based on tornado)
Python
3
star
31

TelePY

Asterisk Web Based Settings System
3
star
32

doh-proxy

DNS over HTTPS proxy server
Python
3
star
33

lxd-exporter

Python
3
star
34

aiocarbon

Asynchronous client for carbon.
Python
3
star
35

logging-journald

Pure python logging handler for writing logs to the journald using native protocol
Python
3
star
36

tornado-xmlrpc

Tornado XML-RPC server and client
Python
2
star
37

cyleb128

Python
2
star
38

forkme

Python
2
star
39

object_cacher

Simple objects/methods results cacher with optional persistent cacheing.
Python
2
star
40

cm

Configuration Magic
Python
2
star
41

carbon-client

graphite/carbon udp client for sending metrics
Python
2
star
42

tornado-psycopg2

Tornado driver for support asynchronous mode for psycopg2
Python
2
star
43

zram-swap

Easy way to configure swap over zram
Python
2
star
44

pytest-rst

Run python tests from README.rst or any .rst document with pytest
Python
2
star
45

pgbouncer-docker

pgbouncer docker image
Python
2
star
46

fast-json

Fast JSON serialization and deserialization with ujson
Python
2
star
47

prettylog

Let's write beautiful logs
Python
2
star
48

carbon-proxy

Proxy carbon via HTTP(s)
Python
1
star
49

simpleaes

Very simple pycripto helper
Python
1
star
50

docker-acestream

Acestream docker container
1
star
51

aio-pipe

Asyncio helper for UNIX PIPEs
Python
1
star
52

lsm-db-extras

Thread/Process safe shelves and other lsm-db helpers
Python
1
star
53

openssl-helper

Tools for simple create openssl certificates
C
1
star
54

netatalk-centos-build

Build scripts for netatalk
Makefile
1
star
55

tagz

tagz is a html tags builder
Python
1
star
56

arconfig

Save/Load your options to/into config
Python
1
star
57

xchroot

Advanced chroot
Shell
1
star
58

nginx-ssi-blog

nginx-ssi-blog engine
JavaScript
1
star
59

grafana-docker

Dockerfile
1
star
60

update-he-dns

Simple DYNDNS updater for dns.he.net
Python
1
star
61

optdict

OptDict - Option parser from dictionary, with configure from file and validation.
Python
1
star
62

docker-proxysql-multiplexing-bug

Repo for reproducing https://github.com/sysown/proxysql/issues/948
Python
1
star