• Stars
    star
    1,992
  • Rank 23,254 (Top 0.5 %)
  • Language
    Erlang
  • License
    Other
  • Created over 9 years ago
  • Updated 8 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Delayed Messaging for RabbitMQ

RabbitMQ Delayed Message Plugin

Consider the Limitations

This plugin adds delayed-messaging (or scheduled-messaging) to RabbitMQ. Its current design has plenty of limitation (documented below), consider using an external scheduler and a data store that fits your needs first.

This plugin badly needs a new design and a reimplementation from the ground up.

If you accept the limitations, please read on.

The Basics

With this plugin enabled, a user can declare an exchange with the type x-delayed-message and then publish messages with the custom header x-delay expressing in milliseconds a delay time for the message. The message will be delivered to the respective queues after x-delay milliseconds.

Intended Use Cases

This plugin was designed for delaying message publishing for a number of seconds, minutes, or hours. A day or two at most.

It is not a longer term scheduling solution. If you need to delay publishing by days, weeks, months, or years, consider using a data store suitable for long-term storage, and an external scheduling tool of some kind.

Supported RabbitMQ Versions

The most recent release of this plugin targets RabbitMQ 3.13.x.

Supported Erlang/OTP Versions

The latest version of this plugin requires Erlang 26.0 or later versions, same as RabbitMQ 3.13.x.

Project Maturity

The current design of this plugin is mature and potential suitable for production use as long as the user is aware of its limitations and the intended use cases.

This plugin is not commercially supported by VMware at the moment but it doesn't mean that it will be abandoned or team RabbitMQ is not interested in improving it in the future. It is not, however, a high priority for our small team.

So, give it a try with your workload and decide for yourself.

Installation

Download a Binary Build

Binary builds are distributed via GitHub releases.

As with all 3rd party plugins, the .ez file must be placed into a node's plugins directory and be readable by the effective user of the RabbitMQ process.

To find out what the plugins directory is, use rabbitmq-plugins directories

rabbitmq-plugins directories -s

Enabling the Plugin

Then run the following command:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Usage

To use the delayed-messaging feature, declare an exchange with the type x-delayed-message:

// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...

Note that we pass an extra header called x-delayed-type, more on it under the Routing section.

Once we have the exchange declared we can publish messages providing a header telling the plugin for how long to delay our messages:

// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

byte[] messageBodyBytes2 = "more delayed payload".getBytes("UTF-8");
Map<String, Object> headers2 = new HashMap<String, Object>();
headers2.put("x-delay", 1000);
AMQP.BasicProperties.Builder props2 = new AMQP.BasicProperties.Builder().headers(headers2);
channel.basicPublish("my-exchange", "", props2.build(), messageBodyBytes2);
// ... more code ...

In the above example we publish two messages, specifying the delay time with the x-delay header. For this example, the plugin will deliver to our queues first the message with the body "more delayed payload" and then the one with the body "delayed payload".

If the x-delay header is not present, then the plugin will proceed to route the message without delay.

Routing

This plugin allows for flexible routing via the x-delayed-type arguments that can be passed during exchange.declare. In the example above we used "direct" as exchange type. That means the plugin will have the same routing behavior shown by the direct exchange.

If you want a different routing behavior, then you could provide a different exchange type, like "topic" for example. You can also specify exchange types provided by plugins. Note that this argument is required and must refer to an existing exchange type.

Performance Impact

Due to the "x-delayed-type" argument, one could use this exchange in place of other exchanges, since the "x-delayed-message" exchange will just act as proxy. Note that there might be some performance implications if you do this.

For each message that crosses an "x-delayed-message" exchange, the plugin will try to determine if the message has to be expired by making sure the delay is within range, ie: Delay > 0, Delay =< ?ERL_MAX_T (In Erlang a timer can be set up to (2^32)-1 milliseconds in the future).

If the previous condition holds, then the message will be persisted to Mnesia and some other logic will kick in to determine if this particular message delay needs to replace the current scheduled timer and so on.

This means that while one could use this exchange in place of a direct or fanout exchange (or any other exchange for that matter), it will be slower than using the actual exchange. If you don't need to delay messages, then use the actual exchange.

Limitations

Delayed messages are stored in a Mnesia table (also see Limitations below) with a single disk replica on the current node. They will survive a node restart. While timer(s) that triggered scheduled delivery are not persisted, it will be re-initialised during plugin activation on node start. Obviously, only having one copy of a scheduled message in a cluster means that losing that node or disabling the plugin on it will lose the messages residing on that node.

This plugin was created with disk nodes in mind. RAM nodes are currently unsupported and adding support for them is not a priority (if you aren't sure what RAM nodes are and whether you need to use them, you almost certainly don't).

The plugin only performs one attempt at publishing each message but since publishing is local, in practice the only issue that may prevent delivery is the lack of queues (or bindings) to route to.

Closely related to the above, the mandatory flag is not supported by this exchange: we cannot be sure that at the future publishing point in time

  • there is at least one queue we can route to
  • the original connection is still around to send a basic.return to

Current design of this plugin doesn't really fit scenarios with a high number of delayed messages (e.g. 100s of thousands or millions). See #72 for details.

Disabling the Plugin

You can disable this plugin by calling rabbitmq-plugins disable rabbitmq_delayed_message_exchange but note that ALL DELAYED MESSAGES THAT HAVEN'T BEEN DELIVERED WILL BE LOST.

Building the Plugin

bazel build //:erlang_app
bazel build :ez

The EZ file is created in the bazel-bin directory.

Creating a Release

  1. Update broker_version_requirements in helpers.bzl & Makefile (Optional)
  2. Update the plugin version in MODULE.bazel
  3. Push a tag (i.e. v3.13.0) with the matching version
  4. Allow the Release workflow to run and create a draft release
  5. Review and publish the release

LICENSE

See the LICENSE file.

More Repositories

1

rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
Starlark
11,952
star
2

rabbitmq-tutorials

Tutorials for using RabbitMQ in various ways
Java
6,393
star
3

rabbitmq-dotnet-client

RabbitMQ .NET client for .NET Standard 2.0+ and .NET 4.6.2+
C#
2,056
star
4

internals

High level architecture overview
1,444
star
5

amqp091-go

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
Go
1,319
star
6

rabbitmq-java-client

RabbitMQ Java client
Java
1,227
star
7

cluster-operator

RabbitMQ Cluster Kubernetes Operator
Go
865
star
8

ra

A Raft implementation for Erlang and Elixir that strives to be efficient and make it easier to use multiple Raft clusters in a single system.
Erlang
800
star
9

rabbitmq-website

RabbitMQ website
JavaScript
769
star
10

erlang-rpm

Latest Erlang/OTP releases packaged as a zero dependency RPM, just enough for running RabbitMQ
Shell
540
star
11

rabbitmq-management

RabbitMQ Management UI and HTTP API
Erlang
369
star
12

tls-gen

Generates self-signed x509/TLS/SSL certificates useful for development
Python
362
star
13

rabbitmq-perf-test

A load testing tool
Java
355
star
14

khepri

Khepri is a tree-like replicated on-disk database library for Erlang and Elixir.
Erlang
317
star
15

erlando

Erlando
Erlang
306
star
16

rabbitmq-sharding

Sharded logical queues for RabbitMQ: a queue type which provides improved parallelism and thoughput at the cost of total ordering
Erlang
303
star
17

rabbitmq-peer-discovery-k8s

Kubernetes-based peer discovery mechanism for RabbitMQ
Erlang
296
star
18

rabbitmq-objc-client

RabbitMQ client for Objective-C and Swift
Objective-C
241
star
19

chef-cookbook

Development repository for Chef cookbook RabbitMQ
Ruby
211
star
20

rmq-0mq

ZeroMQ support in RabbitMQ
Erlang
210
star
21

rabbitmq-consistent-hash-exchange

RabbitMQ Consistent Hash Exchange Type
Erlang
209
star
22

rabbitmq-auth-backend-http

HTTP-based authorisation and authentication for RabbitMQ
Makefile
199
star
23

rabbitmq-erlang-client

Erlang client for RabbitMQ
Erlang
185
star
24

rabbitmq-mqtt

RabbitMQ MQTT plugin
Erlang
173
star
25

rabbitmq-stream-go-client

A client library for RabbitMQ streams
Go
167
star
26

rabbitmq-stream-rust-client

A client library for RabbitMQ streams
Rust
148
star
27

rabbitmq-prometheus

A minimalistic Prometheus exporter of core RabbitMQ metrics
Erlang
145
star
28

looking_glass

An Erlang/Elixir/BEAM profiler tool
Erlang
139
star
29

hop

RabbitMQ HTTP API client for Java, Groovy, and other JVM languages
Java
137
star
30

messaging-topology-operator

RabbitMQ messaging topology operator
Go
123
star
31

rabbitmq-cli

Command line tools for RabbitMQ
Elixir
105
star
32

rabbitmq-stream-dotnet-client

RabbitMQ client for the stream protocol
C#
100
star
33

rabbitmq-web-stomp-examples

Makefile
94
star
34

rabbitmq-amqp1.0

AMQP 1.0 support for RabbitMQ
Erlang
93
star
35

rabbitmq-web-stomp

Provides support for STOMP over WebSockets
Erlang
89
star
36

rabbitmq-recent-history-exchange

RabbitMQ Recent History Exchange
Makefile
82
star
37

diy-kubernetes-examples

Examples that demonstrate how deploy a RabbitMQ cluster to Kubernetes, the DIY way
Makefile
82
star
38

rabbitmq-event-exchange

Expose broker events as messages
Erlang
78
star
39

rabbitmq-clusterer

This project is ABANDONWARE. Use https://www.rabbitmq.com/cluster-formation.html instead.
Erlang
72
star
40

rabbitmq-message-timestamp

A RabbitMQ plugin that adds a timestamp to all incoming messages
Makefile
72
star
41

rabbitmq-common

Common library used by rabbitmq-server and rabbitmq-erlang-client
Erlang
66
star
42

rabbitmq-c

The official rabbitmq-c sources have moved to:
C
65
star
43

tgir

Official repository for Thank Goodness It's RabbitMQ (TGIR)!
Makefile
65
star
44

rabbitmq-jms-client

RabbitMQ JMS client
Java
61
star
45

rabbit-socks

Websocket and Socket.IO support for RabbitMQ (deprecated -- see https://github.com/sockjs/sockjs-erlang instead)
Erlang
58
star
46

rabbitmq-web-mqtt

Provides support for MQTT over WebSockets
Erlang
55
star
47

rabbitmq-stream-java-client

RabbitMQ Stream Java Client
Java
55
star
48

rabbitmq-top

Adds top-like information on the Erlang VM to the management plugin.
Makefile
55
star
49

rabbitmq-shovel

RabbitMQ Shovel plugin
Erlang
53
star
50

rabbitmq-auth-mechanism-ssl

RabbitMQ TLS (x509 certificate) authentication mechanism
Makefile
52
star
51

aten

An adaptive accrual node failure detection library for Elixir and Erlang
Erlang
50
star
52

rabbitmq-stomp

RabbitMQ STOMP plugin
Erlang
49
star
53

rabbitmq-tracing

RabbitMQ Tracing
Erlang
48
star
54

mnevis

Raft-based, consensus oriented implementation of Mnesia transactions
Erlang
48
star
55

gen-batch-server

A generic batching server for Erlang and Elixir
Erlang
47
star
56

rabbitmq-priority-queue

Priority Queues
46
star
57

osiris

Log based streaming subsystem for RabbitMQ
Erlang
45
star
58

rabbitmq-management-visualiser

RabbitMQ Topology Visualiser
JavaScript
41
star
59

rabbitmq-oauth2-tutorial

Explore integration of RabbitMQ with Oauth 2.0 auth backend plugin
Shell
41
star
60

rabbitmq-peer-discovery-consul

Consul-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
40
star
61

rabbitmq-federation

RabbitMQ Federation plugin
Erlang
39
star
62

rabbitmq-auth-backend-oauth2

RabbitMQ authorization backend that uses OAuth 2.0 (JWT) tokens
Erlang
38
star
63

rabbitmq-codegen

RabbitMQ protocol code-generation and machine-readable spec
Python
37
star
64

support-tools

A staging area for various support and troubleshooting tools that are not (or not yet) included into RabbitMQ distribution
Shell
36
star
65

ra-kv-store

Raft-based key-value store
Clojure
33
star
66

rabbitmq-perf-html

Web page to view performance results
JavaScript
33
star
67

rabbitmq-web-mqtt-examples

Examples for the Web MQTT plugin
JavaScript
32
star
68

rabbitmq-public-umbrella

Work with ease on multiple RabbitMQ sub-projects, e.g. core broker, plugins and some client libraries
Makefile
32
star
69

rules_erlang

Bazel rules for building Erlang applications and libraries
Starlark
32
star
70

rabbitmq-smtp

RabbitMQ SMTP gateway
Erlang
31
star
71

rabbitmq-metronome

RabbitMQ example plugin
Makefile
27
star
72

rabbitmq-rtopic-exchange

RabbitMQ Reverse Topic Exchange
Erlang
27
star
73

horus

Erlang library to create standalone modules from anonymous functions
Erlang
25
star
74

workloads

Continuous validation of RabbitMQ workloads
JavaScript
24
star
75

rabbitmq-tracer

AMQP 0-9-1 protocol analyzer
Java
24
star
76

rabbitmq-peer-discovery-aws

AWS-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
24
star
77

rabbitmq-shovel-management

RabbitMQ Shovel Management
Makefile
23
star
78

rabbitmq-auth-backend-ldap

RabbitMQ LDAP authentication
Erlang
22
star
79

rabbitmq-management-themes

Makefile
22
star
80

rabbitmq-auth-backend-amqp

Authentication over AMQP RPC
Erlang
20
star
81

rabbitmq-amqp1.0-client

Erlang AMQP 1.0 client
Erlang
20
star
82

erlang-data-structures

Erlang Data Structures
Erlang
20
star
83

chocolatey-package

RabbitMQ chocolatey package
PowerShell
19
star
84

lz4-erlang

LZ4 compression library for Erlang.
C
19
star
85

rabbitmq-service-nodejs-sample

A simple node.js sample app for the RabbitMQ service/add-on
JavaScript
18
star
86

rabbitmq-auth-backend-oauth2-spike

See rabbitmq/rabbitmq-auth-backend-oauth2 instead.
Erlang
17
star
87

rabbitmq-msg-store-index-eleveldb

LevelDB-based message store index for RabbitMQ
Erlang
17
star
88

rabbitmq-management-agent

RabbitMQ Management Agent
Erlang
17
star
89

rabbitmq-auth-backend-cache

Authorisation result caching plugin (backend) for RabbitMQ
Erlang
17
star
90

rabbitmq-jsonrpc-channel

RabbitMQ JSON-RPC Channels
JavaScript
15
star
91

rabbitmq-ha

Highly available queues for RabbitMQ
Erlang
15
star
92

rabbitmq-jsonrpc

RabbitMQ JSON-RPC Integration
Makefile
15
star
93

stdout_formatter

Erlang library to format paragraphs, lists and tables as plain text
Erlang
15
star
94

rabbitmq-peer-discovery-etcd

etcd-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
15
star
95

rabbitmq-federation-management

RabbitMQ Federation Management
Makefile
14
star
96

credentials-obfuscation

Tiny library/OTP app for credential obfuscation
Erlang
14
star
97

rabbitmq-server-release

RabbitMQ packaging and release engineering bits that do not belong to the Concourse pipelines.
Shell
13
star
98

seshat

Erlang
13
star
99

rabbitmq-jms-topic-exchange

Custom exchange that implements JMS topic selection for RabbitMQ
Erlang
13
star
100

rabbitmq-store-exporter

RabbitMQ Store Exporter
Erlang
12
star