• Stars
    star
    64
  • Rank 463,426 (Top 10 %)
  • Language
    Crystal
  • License
    MIT License
  • Created over 5 years ago
  • Updated 4 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

An AMQP 0-9-1 client for Crystal

amqp-client

An AMQP 0-9-1 client for Crystal.

Installation

  1. Add the dependency to your shard.yml:
dependencies:
  amqp-client:
    github: cloudamqp/amqp-client.cr
  1. Run shards install

Usage

require "amqp-client"

AMQP::Client.start("amqp://guest:guest@localhost") do |c|
  c.channel do |ch|
    # Always set a prefetch limit before consuming
    ch.prefetch(100)

    # Declare a temporary queue
    q = ch.queue("")

    # Declare a durable queue
    q = ch.queue("my-queue")

    # Subscribe to it, and manually acknowledge messages when processed
    q.subscribe(no_ack: false) do |msg|
      puts "Received: #{msg.body_io.to_s}"
      ch.basic_ack(msg.delivery_tag)
    end

    # publish directly to a queue without confirm (fire and forget)
    q.publish "msg"

    # publish directly to a queue and blocking while waiting for confirm
    q.publish_confirm "msg"

    # publish to any exchange/routing-key (fire and forget)
    ch.basic_publish "msg", exchange: "amq.topic", routing_key: "a"

    # publish to any exchange/routing-key and block while waiting for confirm
    ch.basic_publish_confirm "msg", exchange: "amq.topic", routing_key: "a"

    # When the Channel is in confirm mode a block can be given to the basic_publish
    # method and it will be executed when the message is confirmed by the server
    ch.confirm_select
    ch.basic_publish("msg", "amq.topic", "my.topic") do |ok|
      if ok
        puts "Message is confirmed by the server"
      else
        puts "Message was NOT confirmed by the server"
      end
    end

    # This statement will block until a message has arrived
    # The only way to "escape" the block is to unsubscribe
    q.subscribe(tag: "myconsumer", block: true) do |msg|
      q.unsubscribe("myconsumer")
    end

    # Consume and ack, nack or reject msgs
    ch.basic_consume("queue", tag: "consumer-tag", no_ack: false, exclusive: false, block: false) do |msg|
      case msg.body_io.to_s
      when "ack"
        ch.basic_ack(msg.delivery_tag)
      when "reject"
        ch.basic_reject(msg.delivery_tag, requeue: true)
      when "nack"
        ch.basic_nack(msg.delivery_tag, requeue: true, multiple: true)
      end
    end

    ch.prefetch(count: 1000) # alias for basic_qos

    name, message_count, consumer_count =
      ch.queue_declare(name: "myqueue", passive: false, durable: true,
                       exclusive: false, auto_delete: false,
                       arguments: AMQP::Client::Arguments.new)
    q = ch.queue # temporary queue that is deleted when the channel is closed
    ch.queue_purge("myqueue")
    ch.queue_bind("myqueue", "amq.topic", "routing-key")
    ch.queue_unbind("myqueue", "amq.topic", "routing-key")
    msg = ch.basic_get("myqueue", no_ack: true)
    ch.basic_ack(msg.delivery_tag)
    ch.queue_delete("myqueue")
    ch.exchange_declare("my-exchange", type: "topic")
    ch.exchange_delete("my-exchange")
  end
end

You can consume stream queues too:

require "amqp-client"

AMQP::Client.start do |c|
  c.channel do |ch|
    # prefetch required when consuming from stream queues
    ch.prefetch(10)
    # declare a stream queue using the x-queue-type argument
    q = ch.queue("stream1", args: AMQP::Client::Arguments.new({"x-queue-type": "stream"}))
    puts "Waiting for messages. To exit press CTRL+C"
    # Decide from where to subscribe using the x-stream-offset argument
    q.subscribe(block: true, no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
      puts "Received: #{msg.body_io}"
      msg.ack
    end
  end
end

Performance

1-byte messages, without properties/headers:

Publish rate Consume rate
1.200.000 msgs/s 1.000.000 msgs/s

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create a new Pull Request

Contributors

More Repositories

1

lavinmq

Lightweight and fast AMQP (0-9-1) server
Crystal
311
star
2

amqproxy

An intelligent AMQP proxy, with connection and channel pooling/reusing
Crystal
309
star
3

amqp-client.js

AMQP 0-9-1 TypeScript client both for Node.js and browsers (using WebSocket)
TypeScript
176
star
4

android-example

Java
54
star
5

terraform-provider-cloudamqp

Terraform Provider for CloudAMQP
Go
35
star
6

websocket-tcp-relay

Expose any TCP server as a WebSocket endpoint
Crystal
29
star
7

rabbitmq-vshovel

RabbitMQ vShovel plugin
Erlang
25
star
8

amqp-client.rb

Modern AMQP 0-9-1 Ruby client
Ruby
18
star
9

amqpcat

CLI tool for publishing to and consuming from AMQP servers
Crystal
18
star
10

amqp-sse

Ruby Sinatra app illustrating how to use Server Sent Events and AMQP for realtime updates
Ruby
17
star
11

python-amqp-example

How to connect from Python to CloudAMQP
Python
14
star
12

java-amqp-example

Java
11
star
13

rabbitmq-delayed-messages

Ruby snippet, demonstrates how to implement delayed messages in RabbitMQ
10
star
14

nodejs-amqp-example

Example project showing how to connect to CloudAMQP from Node.js
JavaScript
10
star
15

amq-protocol.cr

An AMQP 0.9.1 serialization library for Crystal
Crystal
10
star
16

msg_store_eleveldb_index

eLevelDB backend for RabbitMQ's message index store
Erlang
9
star
17

web-stomp-example

Exampel app in sinatra/javascript to use RabbitMQ Web-stomp
HTML
7
star
18

rmqrecover

Recovers messages from a RabbitMQ data directory
Crystal
6
star
19

php-amqplib-example

How to connect to CloudAMQP from a PHP app
PHP
5
star
20

dotnetcore-amqp-example

C#
4
star
21

DotNetAmqpExample

Simple usage of AMQP demonstrated in a ASP.NET MVC app
C#
4
star
22

clojure-amqp-example

A Clojure example project showing how to connect, publish and consume a message via CloudAMQP
Clojure
2
star
23

php-amqplib-ssl-example

2
star
24

erlang-packages

Erlang debian packages
Dockerfile
1
star
25

amqpshovel

High performance AMQP shovel
Crystal
1
star
26

cloudamqp-connector

Deprecated, please use https://github.com/mulesoft/mule-transport-amqp instead
Java
1
star
27

lavinmq-action

GitHub Action to run LavinMQ
1
star
28

get-me-hired-producer

Making the job search process easier
Python
1
star
29

mesque

A Resque compatible work library using RabbitMQ as backend for relability and performance
Ruby
1
star
30

lavinmq-tutorials

Beginner friendly LavinMQ tutorials that you can't refuse :)
JavaScript
1
star
31

rabbitmq-summit

Website for the rabbitmq-summit
SCSS
1
star
32

rabbitmq-integration-demos

Integrating RabbitMQ with other technologies
Python
1
star
33

websockets-rabbitmq-benchmarks

Benchmark web-mqtt, web-stomp, and web-amqp
JavaScript
1
star