• Stars
    star
    581
  • Rank 76,901 (Top 2 %)
  • Language
    Crystal
  • License
    Apache License 2.0
  • Created over 7 years ago
  • Updated 22 days ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Lightweight and fast AMQP (0-9-1) server

Build Status Build Status

LavinMQ

A message queue server that implements the AMQP 0-9-1 protocol. Written in Crystal.

Aims to be very fast, has low RAM requirements, handles very long queues, many connections, and requires minimal configuration.

Read more at LavinMQ.com

Installation

From source

Begin with installing Crystal. Refer to Crystal's installation documentation on how to install Crystal.

Clone the git repository and build the project.

git clone [email protected]:cloudamqp/lavinmq.git
cd lavinmq
make
sudo make install # optional

Now, LavinMQ is ready to be used. You can check the version with:

lavinmq -v

Debian/Ubuntu

curl -fsSL https://packagecloud.io/cloudamqp/lavinmq/gpgkey | gpg --dearmor | sudo tee /usr/share/keyrings/lavinmq.gpg > /dev/null
. /etc/os-release
echo "deb [signed-by=/usr/share/keyrings/lavinmq.gpg] https://packagecloud.io/cloudamqp/lavinmq/$ID $VERSION_CODENAME main" | sudo tee /etc/apt/sources.list.d/lavinmq.list
sudo apt-get update
sudo apt-get install lavinmq

If you need to install a specific version of LavinMQ, do so using the following command: sudo apt install lavinmq=<version>. This works for both upgrades and downgrades.

Fedora

sudo tee /etc/yum.repos.d/lavinmq.repo << 'EOF'
[lavinmq]
name=LavinMQ
baseurl=https://packagecloud.io/cloudamqp/lavinmq/fedora/$releasever/$basearch
gpgkey=https://packagecloud.io/cloudamqp/lavinmq/gpgkey
repo_gpgcheck=1
gpgcheck=0
EOF
sudo dnf install lavinmq

Usage

LavinMQ only requires one argument, and it's a path to a data directory.

Run LavinMQ with: lavinmq -D /var/lib/lavinmq

More configuration options can be viewed with -h, and you can specify a configuration file too, see extras/config.ini for an example.

Docker

Docker images are published to Docker Hub. Fetch and run the latest version with:

docker run --rm -it -p 5672:5672 -p 15672:15672 -v /var/lib/lavinmq:/tmp/amqp cloudamqp/lavinmq

You are then able to visit the management UI at http://localhost:15672 and start publishing/consuming messages to amqp://guest:guest@localhost.

Debugging

In Linux, perf is the tool of choice when tracing and measuring performance.

To see which syscalls that are made use:

perf trace -p $(pidof lavinmq)

To get a live analysis of the mostly called functions, run:

perf top -p $(pidof lavinmq)

A more detailed tutorial on perf is available here.

In OS X the app, Instruments that's bundled with Xcode can be used for tracing.

Memory garbage collection can be diagnosed with boehm-gc environment variables.

Contributing

Kindly read our contributing guide

LavinMQ with various plattforms

All AMQP client libraries work with LavinMQ and there are AMQP client libraries for almost every platform on the market. Here are guides for a couple of common plattforms.

  1. Ruby
  2. Node.js
  3. Java
  4. Python
  5. PHP
  6. Crystal

Performance

A single m6g.large EC2 instance, with a GP3 EBS drive (XFS formatted), can sustain about 700.000 messages/s (16 byte msg body, single queue, single producer, single consumer). A single producer can push 1.600.000 msgs/s and if there's no producers auto-ack consumers can receive 1.200.000 msgs/s.

Enqueueing 100M messages only uses 25 MB RAM. 8000 connection uses only about 400 MB RAM. Declaring 100.000 queues uses about 100 MB RAM. About 1.600 bindings per second can be made to non-durable queues, and about 1000 bindings/second to durable queues.

Implementation

LavinMQ is written in Crystal, a modern language built on the LLVM, with a Ruby-like syntax. It uses an event loop library for IO, is garbage collected, adopts a CSP-like concurrency model and compiles down to a single binary. You can liken it to Go, but with a nicer syntax.

Instead of trying to cache messages in RAM, we write all messages as fast as we can to disk and let the OS cache do the caching.

Each queues is backed by a message store on disk, which is just a series of files (segments), by default 8MB each. Message segments are memory-mapped files allocated using the mmap syscall. However, to prevent unnecessary memory usage, we unmap these files and free up the allocated memory when they are not in use. When a file needs to be written or read, we re-map it and use only the memory needed for that specific segment. Each incoming message is appended to the last segment, prefixed with a timestamp, its exchange name, routing key and message headers.

When a message is being consumed it reads sequentially from the segments. Each acknowledged (or rejected) message position in the segment is written to an "ack" file (per segment). If a message is requeued its position is added to a in memory queue. On boot all acked message positions are read from the "ack" files and then when deliviering messages skip those when reading sequentially from the message segments. Segments are deleted when all message in them are acknowledged.

Declarations of queues, exchanges and bindings are written to a definitions file (if the target is durable), encoded as the AMQP frame they came in as. Periodically this file is compacted/garbage-collected by writing only the current in-memory state to the file (getting rid of all delete events). This file is read on boot to restore all definitions.

All non-AMQP objects like users, vhosts, policies, etc. are stored in JSON files. Most often these type of objects does not have a high turnover rate, so we believe that JSON in this case makes it easy for operators to modify things when the server is not running, if ever needed.

In the data directory we store users.json and vhosts.json as mentioned earlier, and each vhost has a directory in which we store definitions.amqp (encoded as AMQP frames), policies.json and the messages named such as msgs.0000000124. Each vhost directory is named after the sha1 hash of its real name. The same goes for the queue directories in the vhost directory. The queue directories only has two files, ack and enq, also described earlier.

Flows

Here is an architectural description of the different flows in the server.

Publish

Client#read_loop reads from the socket, it calls Channel#start_publish for the Basic.Publish frame and Channel#add_content for Body frames. When all content has been received (and appended to an IO::Memory object) it calls VHost#publish with a Message struct. VHost#publish finds all matching queues, writes the message to the message store and then calls Queue#publish with the segment position. Queue#publish writes to the message store.

Consume

When Client#read_loop receives a Basic.Consume frame it will create a Consumer class and add it to the queue's list of consumers. Each consumer has a deliver_loop fiber that will be notified by an internal Channel when new messages are available in the queue.

Getting help

For questions or suggestions:

Features

  • AMQP 0-9-1 compatible
  • AMQPS (TLS)
  • HTTP API
  • Publisher confirm
  • Transactions
  • Policies
  • Shovels
  • Queue federation
  • Exchange federation
  • Dead-lettering
  • TTL support on queue, message, and policy level
  • CC/BCC
  • Alternative exchange
  • Exchange to exchange bindings
  • Direct-reply-to RPC
  • Users and ACL rules
  • VHost separation
  • Consumer cancellation
  • Queue max-length
  • Importing/export definitions
  • Priority queues
  • Delayed exchanges
  • AMQP WebSocket
  • Single active consumer
  • Replication
  • Stream queues

Currently missing but planned features

  • Automatic leader election in clusters

Known differences to other AMQP servers

There are a few edge-cases that are handled a bit differently in LavinMQ compared to other AMQP servers.

  • When comparing queue/exchange/binding arguments all number types (e.g. 10 and 10.0) are considered equivalent
  • When comparing queue/exchange/binding arguments non-effective parameters are also considered, and not ignored
  • TTL of queues and messages are correct to the 0.1 second, not to the millisecond
  • Newlines are not removed from Queue or Exchange names, they are forbidden

Replication

LavinMQ supports replication between a leader server and one or more followers. All changes on the leader is replicated to followers.

Replication configuration

A shared secret is used to allow nodes in a cluster to communicate, make sure that the .replication_secret file is the same in all data directores of all nodes.

Then enable the replication listener on the leader:

[replication]
bind = 0.0.0.0
port = 5679

or start LavinMQ with:

lavinmq --data-dir /var/lib/lavinmq --replication-bind 0.0.0.0 --replication-port 5679

Configure the follower(s) to connect to the leader:

[replication]
follow = tcp://hostname:port

or start LavinMQ with:

lavinmq --data-dir /var/lib/lavinmq-follower --follow tcp://leader.example.com:5679

Stream queues

Stream queues are like append-only logs and can be consumed multiple times. Each consumer can start to read from anywhere in the queue (using the x-stream-offset consumer argument) over and over again. Stream queues are different from normal queues in that messages are not deleted (see #retention) when a consumer acknowledge them.

Retention

Messages are only deleted when max-length, max-length-bytes or max-age are applied, either as queue arguments or as policies. The limits are checked only when new messages are published to the queue, and only act on whole segments (which by default are 8MiB), so the limits aren't necessarily exact. So even if a max-age limit is set, but no messages are published to the queue, messages might still be available in the stream queue that is way older that the limit specified.

Contributors

License

The software is licensed under the Apache License 2.0.

Copyright 2018-2024 84codes AB

LavinMQ is a trademark of 84codes AB

More Repositories

1

amqproxy

An intelligent AMQP proxy, with connection and channel pooling/reusing
Crystal
335
star
2

amqp-client.js

AMQP 0-9-1 TypeScript client both for Node.js and browsers (using WebSocket)
TypeScript
200
star
3

amqp-client.cr

An AMQP 0-9-1 client for Crystal
Crystal
69
star
4

android-example

Java
53
star
5

terraform-provider-cloudamqp

Terraform Provider for CloudAMQP
Go
35
star
6

websocket-tcp-relay

Expose any TCP server as a WebSocket endpoint
Crystal
29
star
7

rabbitmq-vshovel

RabbitMQ vShovel plugin
Erlang
24
star
8

amqp-client.rb

Modern AMQP 0-9-1 Ruby client
Ruby
19
star
9

amqpcat

CLI tool for publishing to and consuming from AMQP servers
Crystal
18
star
10

amqp-sse

Ruby Sinatra app illustrating how to use Server Sent Events and AMQP for realtime updates
Ruby
17
star
11

python-amqp-example

How to connect from Python to CloudAMQP
Python
14
star
12

java-amqp-example

Java
11
star
13

rabbitmq-delayed-messages

Ruby snippet, demonstrates how to implement delayed messages in RabbitMQ
10
star
14

nodejs-amqp-example

Example project showing how to connect to CloudAMQP from Node.js
JavaScript
10
star
15

amq-protocol.cr

An AMQP 0.9.1 serialization library for Crystal
Crystal
10
star
16

msg_store_eleveldb_index

eLevelDB backend for RabbitMQ's message index store
Erlang
9
star
17

web-stomp-example

Exampel app in sinatra/javascript to use RabbitMQ Web-stomp
HTML
7
star
18

php-amqplib-example

How to connect to CloudAMQP from a PHP app
PHP
6
star
19

rmqrecover

Recovers messages from a RabbitMQ data directory
Crystal
6
star
20

dotnetcore-amqp-example

C#
4
star
21

DotNetAmqpExample

Simple usage of AMQP demonstrated in a ASP.NET MVC app
C#
4
star
22

clojure-amqp-example

A Clojure example project showing how to connect, publish and consume a message via CloudAMQP
Clojure
2
star
23

get-me-hired-producer

Making the job search process easier
Python
2
star
24

php-amqplib-ssl-example

2
star
25

presentation-demo

Crystal
2
star
26

rabbitmq-integration-demos

Integrating RabbitMQ with other technologies
Python
2
star
27

erlang-packages

Erlang debian packages
Dockerfile
1
star
28

amqpshovel

High performance AMQP shovel
Crystal
1
star
29

cloudamqp-connector

Deprecated, please use https://github.com/mulesoft/mule-transport-amqp instead
Java
1
star
30

lavinmq-action

GitHub Action to run LavinMQ
1
star
31

lavinmq-tutorials

Beginner friendly LavinMQ tutorials that you can't refuse :)
JavaScript
1
star
32

rabbitmq-summit

Website for the rabbitmq-summit
SCSS
1
star
33

mesque

A Resque compatible work library using RabbitMQ as backend for relability and performance
Ruby
1
star
34

websockets-rabbitmq-benchmarks

Benchmark web-mqtt, web-stomp, and web-amqp
JavaScript
1
star