• Stars
    star
    207
  • Rank 188,695 (Top 4 %)
  • Language
    Python
  • License
    Other
  • Created over 5 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Pure python AMQP 0.9.1 asynchronous client library

AIORMQ

Coveralls Status Build status Latest Version

aiormq is a pure python AMQP client library.

Status

  • 3.x.x branch - Production/Stable
  • 4.x.x branch - Unstable (Experimental)
  • 5.x.x and greater is only Production/Stable releases.

Features

  • Connecting by URL
  • Buffered queue for received frames

  • Only PLAIN auth mechanism support

  • Publisher confirms support

  • Transactions support

  • Channel based asynchronous locks

    Note

    AMQP 0.9.1 requires serialize sending for some frame types on the channel. e.g. Content body must be following after content header. But frames might be sent asynchronously on another channels.

  • Tracking unroutable messages (Use connection.channel(on_return_raises=False) for disabling)

  • Full SSL/TLS support, using your choice of:
    • amqps:// url query parameters:
      • cafile= - string contains path to ca certificate file
      • capath= - string contains path to ca certificates
      • cadata= - base64 encoded ca certificate data
      • keyfile= - string contains path to key file
      • certfile= - string contains path to certificate file
      • no_verify_ssl - boolean disables certificates validation
    • context= SSLContext keyword argument to connect().
  • Python type hints

  • Uses pamqp as an AMQP 0.9.1 frame encoder/decoder

Tutorial

Introduction

Simple consumer

import asyncio
import aiormq

async def on_message(message):
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """
    print(f" [x] Received message {message!r}")
    print(f"Message body is: {message.body!r}")
    print("Before sleep!")
    await asyncio.sleep(5)   # Represents async I/O operations
    print("After sleep!")


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('helo')
    consume_ok = await channel.basic_consume(
        declare_ok.queue, on_message, no_ack=True
    )


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

Simple publisher

import asyncio
from typing import Optional

import aiormq
from aiormq.abc import DeliveredMessage


MESSAGE: Optional[DeliveredMessage] = None


async def main():
    global MESSAGE

    body = b'Hello World!'

    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # Creating a channel
    channel = await connection.channel()

    declare_ok = await channel.queue_declare("hello", auto_delete=True)

    # Sending the message
    await channel.basic_publish(body, routing_key='hello')
    print(f" [x] Sent {body}")

    MESSAGE = await channel.basic_get(declare_ok.queue)
    print(f" [x] Received message from {declare_ok.queue!r}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'

Work Queues

Create new task

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    body = b' '.join(sys.argv[1:]) or b"Hello World!"

    # Sending the message
    await channel.basic_publish(
        body,
        routing_key='task_queue',
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1,
        )
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Simple worker

import asyncio
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f" [x] Received message {message!r}")
    print(f"     Message body is: {message.body!r}")


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")


    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    # Declaring queue
    declare_ok = await channel.queue_declare('task_queue', durable=True)

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()

Publish Subscribe

Publisher

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare(
        exchange='logs', exchange_type='fanout'
    )

    body = b' '.join(sys.argv[1:]) or b"Hello World!"

    # Sending the message
    await channel.basic_publish(
        body, routing_key='info', exchange='logs'
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Subscriber

import asyncio
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f"[x] {message.body!r}")

    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    await channel.exchange_declare(
        exchange='logs', exchange_type='fanout'
    )

    # Declaring queue
    declare_ok = await channel.queue_declare(exclusive=True)

    # Binding the queue to the exchange
    await channel.queue_bind(declare_ok.queue, 'logs')

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(' [*] Waiting for logs. To exit press CTRL+C')
loop.run_forever()

Routing

Direct consumer

import sys
import asyncio
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f" [x] {message.delivery.routing_key!r}:{message.body!r}"
    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = aiormq.Connection("amqp://guest:guest@localhost/")
    await connection.connect()

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    severities = sys.argv[1:]

    if not severities:
        sys.stderr.write(f"Usage: {sys.argv[0]} [info] [warning] [error]\n")
        sys.exit(1)

    # Declare an exchange
    await channel.exchange_declare(
        exchange='logs', exchange_type='direct'
    )

    # Declaring random queue
    declare_ok = await channel.queue_declare(durable=True, auto_delete=True)

    for severity in severities:
        await channel.queue_bind(
            declare_ok.queue, 'logs', routing_key=severity
        )

    # Start listening the random queue
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()

Emitter

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare(
        exchange='logs', exchange_type='direct'
    )

    body = (
        b' '.join(arg.encode() for arg in sys.argv[2:])
        or
        b"Hello World!"
    )

    # Sending the message
    routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'

    await channel.basic_publish(
        body, exchange='logs', routing_key=routing_key,
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1
        )
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Topics

Publisher

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare('topic_logs', exchange_type='topic')

    routing_key = (
        sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
    )

    body = (
        b' '.join(arg.encode() for arg in sys.argv[2:])
        or
        b"Hello World!"
    )

    # Sending the message
    await channel.basic_publish(
        body, exchange='topic_logs', routing_key=routing_key,
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1
        )
    )

    print(f" [x] Sent {body!r}")

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Consumer

import asyncio
import sys
import aiormq
import aiormq.abc


async def on_message(message: aiormq.abc.DeliveredMessage):
    print(f" [x] {message.delivery.routing_key!r}:{message.body!r}")
    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = await aiormq.connect(
        "amqp://guest:guest@localhost/", loop=loop
    )

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    # Declare an exchange
    await channel.exchange_declare('topic_logs', exchange_type='topic')

    # Declaring queue
    declare_ok = await channel.queue_declare('task_queue', durable=True)

    binding_keys = sys.argv[1:]

    if not binding_keys:
        sys.stderr.write(
            f"Usage: {sys.argv[0]} [binding_key]...\n"
        )
        sys.exit(1)

    for binding_key in binding_keys:
        await channel.queue_bind(
            declare_ok.queue, 'topic_logs', routing_key=binding_key
        )

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for
# data and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()

Remote procedure call (RPC)

RPC server

import asyncio
import aiormq
import aiormq.abc


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


async def on_message(message:aiormq.abc.DeliveredMessage):
    n = int(message.body.decode())

    print(f" [.] fib({n})")
    response = str(fib(n)).encode()

    await message.channel.basic_publish(
        response, routing_key=message.header.properties.reply_to,
        properties=aiormq.spec.Basic.Properties(
            correlation_id=message.header.properties.correlation_id
        ),

    )

    await message.channel.basic_ack(message.delivery.delivery_tag)
    print('Request complete')


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('rpc_queue')

    # Start listening the queue with name 'hello'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()

RPC client

import asyncio
import uuid
import aiormq
import aiormq.abc


class FibonacciRpcClient:
    def __init__(self):
        self.connection = None      # type: aiormq.Connection
        self.channel = None         # type: aiormq.Channel
        self.callback_queue = ''
        self.futures = {}
        self.loop = loop

    async def connect(self):
        self.connection = await aiormq.connect("amqp://guest:guest@localhost/")

        self.channel = await self.connection.channel()
        declare_ok = await self.channel.queue_declare(
            exclusive=True, auto_delete=True
        )

        await self.channel.basic_consume(declare_ok.queue, self.on_response)

        self.callback_queue = declare_ok.queue

        return self

    async def on_response(self, message: aiormq.abc.DeliveredMessage):
        future = self.futures.pop(message.header.properties.correlation_id)
        future.set_result(message.body)

    async def call(self, n):
        correlation_id = str(uuid.uuid4())
        future = loop.create_future()

        self.futures[correlation_id] = future

        await self.channel.basic_publish(
            str(n).encode(), routing_key='rpc_queue',
            properties=aiormq.spec.Basic.Properties(
                content_type='text/plain',
                correlation_id=correlation_id,
                reply_to=self.callback_queue,
            )
        )

        return int(await future)


async def main():
    fibonacci_rpc = await FibonacciRpcClient().connect()
    print(" [x] Requesting fib(30)")
    response = await fibonacci_rpc.call(30)
    print(r" [.] Got {response!r}")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

More Repositories

1

aio-pika

AMQP 0.9 client designed for asyncio and humans.
Python
1,195
star
2

aiofile

Real asynchronous file operations with asyncio support.
Python
358
star
3

pypi-server

Tornado based server like pypi.python.org. With caching from pypi.
Python
118
star
4

cysystemd

systemd wrapper on Cython
Cython
96
star
5

caio

Linux AIO c python bindings
C
44
star
6

async-class

Write classes with async def __ainit__
Python
37
star
7

aiohttp-xmlrpc

XMLRPC for aiohttp
Python
31
star
8

python-escpos

Fork of https://code.google.com/p/python-escpos/
Python
29
star
9

aiohttp-asgi

Run ASGI application with aiohttp
Python
21
star
10

crew

AMQP based worker/master pattern framework
Python
20
star
11

lumper

Distributed building system for docker. Pull repo from the github by tag-webhook and queueing task for building by worker.
Python
20
star
12

python-pidfile

Python
19
star
13

aiohttp-jsonrpc

JSON-RPC server and client implementation based on aiohttp
Python
16
star
14

argclass

A wrapper around the standard argparse module that allows you to describe argument parsers declaratively
Python
12
star
15

python-lsm

Python wrapper for lsm1 extension for sqlite4
C
9
star
16

flask-example

Python
7
star
17

slimurl

SlimURL - Fast library for parsing and building URL addresses
Python
6
star
18

aiohttp-compress

This module is the simplest way to enable compression support for aiohttp server applications globally.
Python
6
star
19

ssi-server

Simple SSI server for developers
Python
6
star
20

cgroups-exporter

cgroups prometheus exporter
Python
6
star
21

pyjwt-rsa

Helpers for JWT tokens with RSA.
Python
5
star
22

rabbitmq-cluster

docker image for clustering rabbitmq
Python
5
star
23

forklib

Fork the single process easily
Python
5
star
24

cmagic

Python wrapper for libmagic
Python
5
star
25

pylive555

Python live555 RTSP streaming bindings
C++
5
star
26

pygost

pygost mirror form http://pygost.cypherpunks.ru/
Python
4
star
27

markdown-pytest

A simple module to test your documentation examples with pytest
Python
4
star
28

subprocess_helper

Abstraction over python subprocess.Popen for running subprocess asynchronous
Python
3
star
29

ketama-python

Static C bindings for libketama
C
3
star
30

rest-client

RESTFull client library for python (based on tornado)
Python
3
star
31

TelePY

Asterisk Web Based Settings System
3
star
32

doh-proxy

DNS over HTTPS proxy server
Python
3
star
33

lxd-exporter

Python
3
star
34

aiocarbon

Asynchronous client for carbon.
Python
3
star
35

logging-journald

Pure python logging handler for writing logs to the journald using native protocol
Python
3
star
36

tornado-xmlrpc

Tornado XML-RPC server and client
Python
2
star
37

cyleb128

Python
2
star
38

forkme

Python
2
star
39

object_cacher

Simple objects/methods results cacher with optional persistent cacheing.
Python
2
star
40

cm

Configuration Magic
Python
2
star
41

carbon-client

graphite/carbon udp client for sending metrics
Python
2
star
42

tornado-psycopg2

Tornado driver for support asynchronous mode for psycopg2
Python
2
star
43

zram-swap

Easy way to configure swap over zram
Python
2
star
44

pytest-rst

Run python tests from README.rst or any .rst document with pytest
Python
2
star
45

pgbouncer-docker

pgbouncer docker image
Python
2
star
46

fast-json

Fast JSON serialization and deserialization with ujson
Python
2
star
47

prettylog

Let's write beautiful logs
Python
2
star
48

carbon-proxy

Proxy carbon via HTTP(s)
Python
1
star
49

simpleaes

Very simple pycripto helper
Python
1
star
50

docker-acestream

Acestream docker container
1
star
51

aio-pipe

Asyncio helper for UNIX PIPEs
Python
1
star
52

lsm-db-extras

Thread/Process safe shelves and other lsm-db helpers
Python
1
star
53

openssl-helper

Tools for simple create openssl certificates
C
1
star
54

netatalk-centos-build

Build scripts for netatalk
Makefile
1
star
55

tagz

tagz is a html tags builder
Python
1
star
56

arconfig

Save/Load your options to/into config
Python
1
star
57

xchroot

Advanced chroot
Shell
1
star
58

nginx-ssi-blog

nginx-ssi-blog engine
JavaScript
1
star
59

grafana-docker

Dockerfile
1
star
60

update-he-dns

Simple DYNDNS updater for dns.he.net
Python
1
star
61

optdict

OptDict - Option parser from dictionary, with configure from file and validation.
Python
1
star
62

docker-proxysql-multiplexing-bug

Repo for reproducing https://github.com/sysown/proxysql/issues/948
Python
1
star