• This repository has been archived on 09/Jul/2024
  • Stars
    star
    209
  • Rank 188,325 (Top 4 %)
  • Language
    Erlang
  • License
    Other
  • Created about 13 years ago
  • Updated 6 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

RabbitMQ Consistent Hash Exchange Type

RabbitMQ Consistent Hash Exchange Type

This was migrated to https://github.com/rabbitmq/rabbitmq-server

This repository has been moved to the main unified RabbitMQ "monorepo", including all open issues. You can find the source under /deps/rabbitmq_consistent_hash_exchange. All issues have been transferred.

Overview

This plugin adds a consistent-hash exchange type to RabbitMQ. This exchange type uses consistent hashing (intro blog posts: one, two, three) to distribute messages between the bound queues. It is recommended to get a basic understanding of the concept before evaluating this plugin and its alternatives.

rabbitmq-sharding is another plugin that provides a way to partition a stream of messages among a set of consumers while trading off total stream ordering for processing parallelism.

Problem Definition

In various scenarios it may be desired to ensure that messages sent to an exchange are reasonably uniformly distributed across a number of queues based on the routing key of the message, a nominated header, or a message property. Technically this can be accomplished using a direct or topic exchange, binding queues to that exchange and then publishing messages to that exchange that match the various binding keys.

However, arranging things this way can be problematic:

  1. It is difficult to ensure that all queues bound to the exchange will receive a (roughly) equal number of messages (distribution uniformity) without baking in to the publishers quite a lot of knowledge about the number of queues and their bindings.

  2. When the number of queues changes, it is not easy to ensure that the new topology still distributes messages between the different queues evenly.

Consistent Hashing is a hashing technique whereby each bucket appears at multiple points throughout the hash space, and the bucket selected is the nearest higher (or lower, it doesn't matter, provided it's consistent) bucket to the computed hash (and the hash space wraps around). The effect of this is that when a new bucket is added or an existing bucket removed, only a very few hashes change which bucket they are routed to.

Supported RabbitMQ Versions

This plugin ships with RabbitMQ.

Supported Erlang Versions

This plugin supports the same Erlang versions as RabbitMQ core.

Enabling the Plugin

This plugin ships with RabbitMQ. Like all other RabbitMQ plugins, it has to be enabled before it can be used:

rabbitmq-plugins enable rabbitmq_consistent_hash_exchange

Provided Exchange Type

The exchange type is "x-consistent-hash".

How It Works

In the case of Consistent Hashing as an exchange type, the hash is calculated from a message property (most commonly the routing key).

When a queue is bound to this exchange, it is assigned one or more partitions on the consistent hashing ring depending on its binding weight (covered below).

For every property hash (e.g. routing key), a hash position computed and a corresponding hash ring partition is picked. That partition corresponds to a bound queue, and the message is routed to that queue.

Assuming a reasonably even routing key distribution of inbound messages, routed messages should be reasonably evenly distributed across all ring partitions, and thus queues according to their binding weights.

Binding Weights

When a queue is bound to a Consistent Hash exchange, the binding key is a number-as-a-string which indicates the binding weight: the number of buckets (sections of the range) that will be associated with the target queue.

Consistent Hashing-based Routing

The hashing distributes routing keys among queues, not message payloads among queues; all messages with the same routing key will go the same queue. So, if you wish for queue A to receive twice as many routing keys routed to it than are routed to queue B, then you bind the queue A with a binding key of twice the number (as a string -- binding keys are always strings) of the binding key of the binding to queue B. Note this is only the case if your routing keys are evenly distributed in the hash space. If, for example, only two distinct routing keys are used on all the messages, there's a chance both keys will route (consistently!) to the same queue, even though other queues have higher values in their binding key. With a larger set of routing keys used, the statistical distribution of routing keys approaches the ratios of the binding keys.

Each message gets delivered to at most one queue. On average, a message gets delivered to exactly one queue. Concurrent binding changes and queue primary replica failures can affect this but on average.

Node Restart Effects

Consistent hashing ring is stored in memory and will be re-populated from exchange bindings when the node boots. Relative positioning of queues on the ring is not guaranteed to be the same between restarts. In practice this means that after a restart, all queues will still receive roughly the same number of messages routed to them (assuming routing key distribution does not change) but a given routing key now may route to a different queue.

In other words, this exchange type provides consistent message distribution between queues but cannot guarantee stable routing [queue] locality for a message with a fixed routing key.

Usage Example

The Topology

In the below example the queues q0 and q1 get bound each with the weight of 1 in the hash space to the exchange e which means they'll each get roughly the same number of routing keys. The queues q2 and q3 however, get 2 buckets each (their weight is 2) which means they'll each get roughly the same number of routing keys too, but that will be approximately twice as many as q0 and q1.

Note the routing_keys in the bindings are numbers-as-strings. This is because AMQP 0-9-1 specifies the routing_key field must be a string.

Choosing Appropriate Weight Values

The example uses low weight values intentionally. Higher values will reduce throughput of the exchange, primarily for workloads that experience a high binding churn (queues are bound to and unbound from a consistent hash exchange frequently). Single digit weight values are recommended (and usually sufficient).

Inspecting Message Counts

The example then publishes 100,000 messages to our exchange with random routing keys, the queues will get their share of messages roughly equal to the binding keys ratios. After this has completed, message distribution between queues can be inspected using RabbitMQ's management UI and rabbitmqctl list_queues.

Routing Keys and Uniformity of Distribution

It is important to ensure that the messages being published to the exchange have varying routing keys: if a very small set of routing keys are being used then there's a possibility of messages not being evenly distributed between the bound queues. With a large number of bound queues some queues may get no messages routed to them at all.

If pseudo-random or unique values such as client/session/request identifiers are used for routing keys (or another property used for hashing) then reasonably uniform distribution should be observed.

Executable Versions

Executable versions of some of the code examples can be found under ./examples.

Code Example in Python

This version of the example uses Pika, the most widely used Python client for RabbitMQ:

#!/usr/bin/env python

import pika
import time

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch   = conn.channel()

ch.exchange_declare(exchange="e", exchange_type="x-consistent-hash", durable=True)

for q in ["q1", "q2", "q3", "q4"]:
    ch.queue_declare(queue=q, durable=True)
    ch.queue_purge(queue=q)

for q in ["q1", "q2"]:
    ch.queue_bind(exchange="e", queue=q, routing_key="1")

for q in ["q3", "q4"]:
    ch.queue_bind(exchange="e", queue=q, routing_key="2")

n = 100000

for rk in list(map(lambda s: str(s), range(0, n))):
    ch.basic_publish(exchange="e", routing_key=rk, body="")
print("Done publishing.")

print("Waiting for routing to finish...")
# in order to keep this example simpler and focused,
# wait for a few seconds instead of using publisher confirms and waiting for those
time.sleep(5)

print("Done.")
conn.close()

Code Example in Java

Below is a version of the example that uses the official RabbitMQ Java client:

package com.rabbitmq.examples;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;

public class ConsistentHashExchangeExample1 {
  private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";

  public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
    ConnectionFactory cf = new ConnectionFactory();
    Connection conn = cf.newConnection();
    Channel ch = conn.createChannel();

    for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
      ch.queueDeclare(q, true, false, false, null);
      ch.queuePurge(q);
    }

    ch.exchangeDeclare("e1", CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);

    for (String q : Arrays.asList("q1", "q2")) {
      ch.queueBind(q, "e1", "1");
    }

    for (String q : Arrays.asList("q3", "q4")) {
      ch.queueBind(q, "e1", "2");
    }

    ch.confirmSelect();

    AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
    for (int i = 0; i < 100000; i++) {
      ch.basicPublish("e1", String.valueOf(i), bldr.build(), "".getBytes("UTF-8"));
    }

    ch.waitForConfirmsOrDie(10000);

    System.out.println("Done publishing!");
    System.out.println("Evaluating results...");
    // wait for one stats emission interval so that queue counters
    // are up-to-date in the management UI
    Thread.sleep(5);

    System.out.println("Done.");
    conn.close();
  }
}

Code Example in Ruby

Below is a version that uses Bunny, the most widely used Ruby client for RabbitMQ:

#!/usr/bin/env ruby

require 'bunny'

conn = Bunny.new
conn.start

ch = conn.create_channel
ch.confirm_select

q1 = ch.queue("q1", durable: true)
q2 = ch.queue("q2", durable: true)
q3 = ch.queue("q3", durable: true)
q4 = ch.queue("q4", durable: true)

[q1, q2, q3, q4]. each(&:purge)

x  = ch.exchange("chx", type: "x-consistent-hash", durable: true)

[q1, q2].each { |q| q.bind(x, routing_key: "1") }
[q3, q4].each { |q| q.bind(x, routing_key: "2") }

n = 100_000
n.times do |i|
  x.publish(i.to_s, routing_key: i.to_s)
end

ch.wait_for_confirms
puts "Done publishing!"

# wait for queue stats to be emitted so that management UI numbers
# are up-to-date
sleep 5
conn.close
puts "Done"

Code Example in Erlang

Below is a version of the example that uses the RabbitMQ Erlang client:

-include_lib("amqp_client/include/amqp_client.hrl").

test() ->
    {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
    {ok, Chan} = amqp_connection:open_channel(Conn),
    Queues = [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>],
    amqp_channel:call(Chan,
                  #'exchange.declare'{
                    exchange = <<"e">>, type = <<"x-consistent-hash">>
                  }),
    [amqp_channel:call(Chan, #'queue.declare'{queue = Q}) || Q <- Queues],
    [amqp_channel:call(Chan, #'queue.bind'{queue = Q,
                                           exchange = <<"e">>,
                                           routing_key = <<"1">>})
        || Q <- [<<"q0">>, <<"q1">>]],
    [amqp_channel:call(Chan, #'queue.bind' {queue = Q,
                                            exchange = <<"e">>,
                                            routing_key = <<"2">>})
        || Q <- [<<"q2">>, <<"q3">>]],
    RK = list_to_binary(integer_to_list(random:uniform(1000000))),
    Msg = #amqp_msg{props = #'P_basic'{}, payload = <<>>},
    [amqp_channel:call(Chan,
                   #'basic.publish'{
                     exchange = <<"e">>,
                     routing_key = RK
                   }, Msg) || _ <- lists:seq(1, 100000)],
amqp_connection:close(Conn),
ok.

Configuration

Routing on a Header

Under most circumstances the routing key is a good choice for something to hash. However, in some cases it is necessary to use the routing key for some other purpose (for example with more complex routing involving exchange to exchange bindings). In this case it is possible to configure the consistent hash exchange to route based on a named header instead. To do this, declare the exchange with a string argument called "hash-header" naming the header to be used.

When a "hash-header" is specified, the chosen header must be provided. If published messages do not contain the header, they will all get routed to the same arbitrarily chosen queue.

Code Example in Python

#!/usr/bin/env python

import pika
import time

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch   = conn.channel()

args = {u'hash-header': u'hash-on'}
ch.exchange_declare(exchange='e2',
                    exchange_type='x-consistent-hash',
                    arguments=args,
                    durable=True)

for q in ['q1', 'q2', 'q3', 'q4']:
    ch.queue_declare(queue=q, durable=True)
    ch.queue_purge(queue=q)

for q in ['q1', 'q2']:
    ch.queue_bind(exchange='e2', queue=q, routing_key='1')

for q in ['q3', 'q4']:
    ch.queue_bind(exchange='e2', queue=q, routing_key='2')

n = 100000

for rk in list(map(lambda s: str(s), range(0, n))):
    hdrs = {u'hash-on': rk}
    ch.basic_publish(exchange='e2',
                     routing_key='',
                     body='',
                     properties=pika.BasicProperties(content_type='text/plain',
                                                     delivery_mode=2,
                                                     headers=hdrs))
print('Done publishing.')

print('Waiting for routing to finish...')
# in order to keep this example simpler and focused,
# wait for a few seconds instead of using publisher confirms and waiting for those
time.sleep(5)

print('Done.')
conn.close()

Code Example in Java

package com.rabbitmq.examples;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ConsistentHashExchangeExample2 {
  public static final String EXCHANGE = "e2";
  private static String EXCHANGE_TYPE = "x-consistent-hash";

  public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
    ConnectionFactory cf = new ConnectionFactory();
    Connection conn = cf.newConnection();
    Channel ch = conn.createChannel();

    for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
      ch.queueDeclare(q, true, false, false, null);
      ch.queuePurge(q);
    }

    Map<String, Object> args = new HashMap<>();
    args.put("hash-header", "hash-on");
    ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);

    for (String q : Arrays.asList("q1", "q2")) {
      ch.queueBind(q, EXCHANGE, "1");
    }

    for (String q : Arrays.asList("q3", "q4")) {
      ch.queueBind(q, EXCHANGE, "2");
    }

    ch.confirmSelect();


    for (int i = 0; i < 100000; i++) {
      AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
      Map<String, Object> hdrs = new HashMap<>();
      hdrs.put("hash-on", String.valueOf(i));
      ch.basicPublish(EXCHANGE, "", bldr.headers(hdrs).build(), "".getBytes("UTF-8"));
    }

    ch.waitForConfirmsOrDie(10000);

    System.out.println("Done publishing!");
    System.out.println("Evaluating results...");
    // wait for one stats emission interval so that queue counters
    // are up-to-date in the management UI
    Thread.sleep(5);

    System.out.println("Done.");
    conn.close();
  }
}

Code Example in Ruby

#!/usr/bin/env ruby

require 'bundler'
Bundler.setup(:default, :test)
require 'bunny'

conn = Bunny.new
conn.start

ch = conn.create_channel
ch.confirm_select

q1 = ch.queue("q1", durable: true)
q2 = ch.queue("q2", durable: true)
q3 = ch.queue("q3", durable: true)
q4 = ch.queue("q4", durable: true)

[q1, q2, q3, q4]. each(&:purge)

x  = ch.exchange("x2", type: "x-consistent-hash", durable: true, arguments: {"hash-header" => "hash-on"})

[q1, q2].each { |q| q.bind(x, routing_key: "1") }
[q3, q4].each { |q| q.bind(x, routing_key: "2") }

n = 100_000
(0..n).map(&:to_s).each do |i|
  x.publish(i.to_s, routing_key: rand.to_s, headers: {"hash-on": i})
end

ch.wait_for_confirms
puts "Done publishing!"

# wait for queue stats to be emitted so that management UI numbers
# are up-to-date
sleep 5
conn.close
puts "Done"

Code Example in Erlang

With RabbitMQ Erlang client:

-include_lib("amqp_client/include/amqp_client.hrl").

test() ->
    {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
    {ok, Chan} = amqp_connection:open_channel(Conn),
    Queues = [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>],
    amqp_channel:call(
      Chan, #'exchange.declare'{
              exchange  = <<"e">>,
              type      = <<"x-consistent-hash">>,
              arguments = [{<<"hash-header">>, longstr, <<"hash-on">>}]
            }),
    [amqp_channel:call(Chan, #'queue.declare'{queue = Q}) || Q <- Queues],
    [amqp_channel:call(Chan, #'queue.bind' {queue = Q,
                                            exchange = <<"e">>,
                                            routing_key = <<"1">>})
        || Q <- [<<"q0">>, <<"q1">>]],
    [amqp_channel:call(Chan, #'queue.bind' {queue = Q,
                                            exchange = <<"e">>,
                                            routing_key = <<"2">>})
        || Q <- [<<"q2">>, <<"q3">>]],
    RK = list_to_binary(integer_to_list(random:uniform(1000000))),
    Msg = #amqp_msg {props = #'P_basic'{headers = [{<<"hash-on">>, longstr, RK}]}, payload = <<>>},
    [amqp_channel:call(Chan,
                   #'basic.publish'{
                     exchange = <<"e">>,
                     routing_key = <<"">>,
                   }, Msg) || _ <- lists:seq(1, 100000)],
amqp_connection:close(Conn),
ok.

Routing on a Message Property

In addition to a value in the header property, you can also route on the message_id, correlation_id, or timestamp message properties. To do so, declare the exchange with a string argument called "hash-property" naming the property to be used.

When a "hash-property" is specified, the chosen property must be provided. If published messages do not contain the property, they will all get routed to the same arbitrarily chosen queue.

Code Example in Python

#!/usr/bin/env python

import pika
import time

conn = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
ch   = conn.channel()

args = {u'hash-property': u'message_id'}
ch.exchange_declare(exchange='e3',
                    exchange_type='x-consistent-hash',
                    arguments=args,
                    durable=True)

for q in ['q1', 'q2', 'q3', 'q4']:
    ch.queue_declare(queue=q, durable=True)
    ch.queue_purge(queue=q)

for q in ['q1', 'q2']:
    ch.queue_bind(exchange='e3', queue=q, routing_key='1')

for q in ['q3', 'q4']:
    ch.queue_bind(exchange='e3', queue=q, routing_key='2')

n = 100000

for rk in list(map(lambda s: str(s), range(0, n))):
    ch.basic_publish(exchange='e3',
                     routing_key='',
                     body='',
                     properties=pika.BasicProperties(content_type='text/plain',
                                                     delivery_mode=2,
                                                     message_id=rk))
print('Done publishing.')

print('Waiting for routing to finish...')
# in order to keep this example simpler and focused,
# wait for a few seconds instead of using publisher confirms and waiting for those
time.sleep(5)

print('Done.')
conn.close()

Code Example in Java

package com.rabbitmq.examples;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class ConsistentHashExchangeExample3 {
  public static final String EXCHANGE = "e3";
  private static String EXCHANGE_TYPE = "x-consistent-hash";

  public static void main(String[] argv) throws IOException, TimeoutException, InterruptedException {
    ConnectionFactory cf = new ConnectionFactory();
    Connection conn = cf.newConnection();
    Channel ch = conn.createChannel();

    for (String q : Arrays.asList("q1", "q2", "q3", "q4")) {
      ch.queueDeclare(q, true, false, false, null);
      ch.queuePurge(q);
    }

    Map<String, Object> args = new HashMap<>();
    args.put("hash-property", "message_id");
    ch.exchangeDeclare(EXCHANGE, EXCHANGE_TYPE, true, false, args);

    for (String q : Arrays.asList("q1", "q2")) {
      ch.queueBind(q, EXCHANGE, "1");
    }

    for (String q : Arrays.asList("q3", "q4")) {
      ch.queueBind(q, EXCHANGE, "2");
    }

    ch.confirmSelect();


    for (int i = 0; i < 100000; i++) {
      AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
      ch.basicPublish(EXCHANGE, "", bldr.messageId(String.valueOf(i)).build(), "".getBytes("UTF-8"));
    }

    ch.waitForConfirmsOrDie(10000);

    System.out.println("Done publishing!");
    System.out.println("Evaluating results...");
    // wait for one stats emission interval so that queue counters
    // are up-to-date in the management UI
    Thread.sleep(5);

    System.out.println("Done.");
    conn.close();
  }
}

Code Example in Ruby

#!/usr/bin/env ruby

require 'bundler'
Bundler.setup(:default, :test)
require 'bunny'

conn = Bunny.new
conn.start

ch = conn.create_channel
ch.confirm_select

q1 = ch.queue("q1", durable: true)
q2 = ch.queue("q2", durable: true)
q3 = ch.queue("q3", durable: true)
q4 = ch.queue("q4", durable: true)

[q1, q2, q3, q4].each(&:purge)

x  = ch.exchange("x3", type: "x-consistent-hash", durable: true, arguments: {"hash-property" => "message_id"})

[q1, q2].each { |q| q.bind(x, routing_key: "1") }
[q3, q4].each { |q| q.bind(x, routing_key: "2") }

n = 100_000
(0..n).map(&:to_s).each do |i|
  x.publish(i.to_s, routing_key: rand.to_s, message_id: i)
end

ch.wait_for_confirms
puts "Done publishing!"

# wait for queue stats to be emitted so that management UI numbers
# are up-to-date
sleep 5
conn.close
puts "Done"

Code Example in Erlang

-include_lib("amqp_client/include/amqp_client.hrl").

test() ->
    {ok, Conn} = amqp_connection:start(#amqp_params_network{}),
    {ok, Chan} = amqp_connection:open_channel(Conn),
    Queues = [<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>],
    amqp_channel:call(Chan,
                  #'exchange.declare'{
                    exchange = <<"e">>, type = <<"x-consistent-hash">>,
                    arguments = {<<"hash-property">>, longstr, <<"message_id">>}                    
                  }),
    [amqp_channel:call(Chan, #'queue.declare'{queue = Q}) || Q <- Queues],
    [amqp_channel:call(Chan, #'queue.bind'{queue = Q,
                                           exchange = <<"e">>,
                                           routing_key = <<"1">>})
        || Q <- [<<"q0">>, <<"q1">>]],
    [amqp_channel:call(Chan, #'queue.bind' {queue = Q,
                                            exchange = <<"e">>,
                                            routing_key = <<"2">>})
        || Q <- [<<"q2">>, <<"q3">>]],
    RK = list_to_binary(integer_to_list(random:uniform(1000000)),
    Msg = #amqp_msg{props = #'P_basic'{message_id = RK}, payload = <<>>},
    [amqp_channel:call(Chan,
                   #'basic.publish'{
                     exchange = <<"e">>,
                     routing_key = <<"">>,
                     )
                   }, Msg) || _ <- lists:seq(1, 100000)],
amqp_connection:close(Conn),
ok.

Getting Help

If you have questions or need help, feel free to ask on the RabbitMQ mailing list.

Implementation Details

The hash function used in this plugin as of RabbitMQ 3.7.8 is A Fast, Minimal Memory, Consistent Hash Algorithm by Lamping and Veach. Erlang's phash2 function is used to convert non-integer values to an integer one that can be used by the jump consistent hash function by Lamping and Veach.

Distribution Uniformity

A Chi-squared test was used to evaluate distribution uniformity. Below are the results for 18 bucket counts and how they compare to two commonly used p-value thresholds:

Number of buckets Chi-squared test result Degrees of freedom p-value = 0.05 p-value = 0.01
2 0.5 1 3.84 6.64
3 0.946 2 5.99 9.21
4 2.939 3 7.81 11.35
5 2.163 4 3.49 13.28
6 2.592 5 11.07 15.09
7 4.654 6 12.59 16.81
8 7.566 7 14.07 18.48
9 5.847 8 15.51 20.09
10 9.790 9 16.92 21.67
11 13.448 10 18.31 23.21
12 12.432 11 19.68 24.73
13 12.338 12 21.02 26.22
14 9.898 13 22.36 27.69
15 8.513 14 23.69 29.14
16 6.997 15 24.99 30.58
17 6.279 16 26.30 32.00
18 10.373 17 28.87 34.81
19 12.935 18 30.14 36.19
20 11.895 19 31.41 37.57

Binding Operations and Bucket Management

When a queue is bound to a consistent hash exchange, the protocol method, queue.bind, carries a weight in the routing (binding) key. The binding is given a number of buckets on the hash ring (hash space) equal to the weight. When a queue is unbound, the buckets added for the binding are deleted. These two operations use linear algorithms to update the ring.

To perform routing the exchange extract the appropriate value for hashing, hashes it and retrieves a bucket number from the ring, then the bucket and its associated queue.

The implementation assumes there is only one binding between a consistent hash exchange and a queue. Having more than one binding is unnecessary because queue weight can be provided at the time of binding.

Clustered Environments

The state of the hash space is distributed across all cluster nodes.

Continuous Integration

Build Status

Copyright and License

(c) 2013-2020 VMware, Inc. or its affiliates.

Released under the Mozilla Public License 2.0, same as RabbitMQ. See LICENSE for details.

More Repositories

1

rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
Starlark
11,952
star
2

rabbitmq-tutorials

Tutorials for using RabbitMQ in various ways
Java
6,393
star
3

rabbitmq-dotnet-client

RabbitMQ .NET client for .NET Standard 2.0+ and .NET 4.6.2+
C#
2,056
star
4

rabbitmq-delayed-message-exchange

Delayed Messaging for RabbitMQ
Erlang
1,992
star
5

internals

High level architecture overview
1,444
star
6

amqp091-go

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
Go
1,319
star
7

rabbitmq-java-client

RabbitMQ Java client
Java
1,227
star
8

cluster-operator

RabbitMQ Cluster Kubernetes Operator
Go
865
star
9

ra

A Raft implementation for Erlang and Elixir that strives to be efficient and make it easier to use multiple Raft clusters in a single system.
Erlang
800
star
10

rabbitmq-website

RabbitMQ website
JavaScript
769
star
11

erlang-rpm

Latest Erlang/OTP releases packaged as a zero dependency RPM, just enough for running RabbitMQ
Shell
540
star
12

rabbitmq-management

RabbitMQ Management UI and HTTP API
Erlang
369
star
13

tls-gen

Generates self-signed x509/TLS/SSL certificates useful for development
Python
362
star
14

rabbitmq-perf-test

A load testing tool
Java
355
star
15

khepri

Khepri is a tree-like replicated on-disk database library for Erlang and Elixir.
Erlang
317
star
16

erlando

Erlando
Erlang
306
star
17

rabbitmq-sharding

Sharded logical queues for RabbitMQ: a queue type which provides improved parallelism and thoughput at the cost of total ordering
Erlang
303
star
18

rabbitmq-peer-discovery-k8s

Kubernetes-based peer discovery mechanism for RabbitMQ
Erlang
296
star
19

rabbitmq-objc-client

RabbitMQ client for Objective-C and Swift
Objective-C
241
star
20

chef-cookbook

Development repository for Chef cookbook RabbitMQ
Ruby
211
star
21

rmq-0mq

ZeroMQ support in RabbitMQ
Erlang
210
star
22

rabbitmq-auth-backend-http

HTTP-based authorisation and authentication for RabbitMQ
Makefile
199
star
23

rabbitmq-erlang-client

Erlang client for RabbitMQ
Erlang
185
star
24

rabbitmq-mqtt

RabbitMQ MQTT plugin
Erlang
173
star
25

rabbitmq-stream-go-client

A client library for RabbitMQ streams
Go
167
star
26

rabbitmq-stream-rust-client

A client library for RabbitMQ streams
Rust
148
star
27

rabbitmq-prometheus

A minimalistic Prometheus exporter of core RabbitMQ metrics
Erlang
145
star
28

looking_glass

An Erlang/Elixir/BEAM profiler tool
Erlang
139
star
29

hop

RabbitMQ HTTP API client for Java, Groovy, and other JVM languages
Java
137
star
30

messaging-topology-operator

RabbitMQ messaging topology operator
Go
123
star
31

rabbitmq-cli

Command line tools for RabbitMQ
Elixir
105
star
32

rabbitmq-stream-dotnet-client

RabbitMQ client for the stream protocol
C#
100
star
33

rabbitmq-web-stomp-examples

Makefile
94
star
34

rabbitmq-amqp1.0

AMQP 1.0 support for RabbitMQ
Erlang
93
star
35

rabbitmq-web-stomp

Provides support for STOMP over WebSockets
Erlang
89
star
36

rabbitmq-recent-history-exchange

RabbitMQ Recent History Exchange
Makefile
82
star
37

diy-kubernetes-examples

Examples that demonstrate how deploy a RabbitMQ cluster to Kubernetes, the DIY way
Makefile
82
star
38

rabbitmq-event-exchange

Expose broker events as messages
Erlang
78
star
39

rabbitmq-clusterer

This project is ABANDONWARE. Use https://www.rabbitmq.com/cluster-formation.html instead.
Erlang
72
star
40

rabbitmq-message-timestamp

A RabbitMQ plugin that adds a timestamp to all incoming messages
Makefile
72
star
41

rabbitmq-common

Common library used by rabbitmq-server and rabbitmq-erlang-client
Erlang
66
star
42

rabbitmq-c

The official rabbitmq-c sources have moved to:
C
65
star
43

tgir

Official repository for Thank Goodness It's RabbitMQ (TGIR)!
Makefile
65
star
44

rabbitmq-jms-client

RabbitMQ JMS client
Java
61
star
45

rabbit-socks

Websocket and Socket.IO support for RabbitMQ (deprecated -- see https://github.com/sockjs/sockjs-erlang instead)
Erlang
58
star
46

rabbitmq-web-mqtt

Provides support for MQTT over WebSockets
Erlang
55
star
47

rabbitmq-stream-java-client

RabbitMQ Stream Java Client
Java
55
star
48

rabbitmq-top

Adds top-like information on the Erlang VM to the management plugin.
Makefile
55
star
49

rabbitmq-shovel

RabbitMQ Shovel plugin
Erlang
53
star
50

rabbitmq-auth-mechanism-ssl

RabbitMQ TLS (x509 certificate) authentication mechanism
Makefile
52
star
51

aten

An adaptive accrual node failure detection library for Elixir and Erlang
Erlang
50
star
52

rabbitmq-stomp

RabbitMQ STOMP plugin
Erlang
49
star
53

rabbitmq-tracing

RabbitMQ Tracing
Erlang
48
star
54

mnevis

Raft-based, consensus oriented implementation of Mnesia transactions
Erlang
48
star
55

gen-batch-server

A generic batching server for Erlang and Elixir
Erlang
47
star
56

rabbitmq-priority-queue

Priority Queues
46
star
57

osiris

Log based streaming subsystem for RabbitMQ
Erlang
45
star
58

rabbitmq-management-visualiser

RabbitMQ Topology Visualiser
JavaScript
41
star
59

rabbitmq-oauth2-tutorial

Explore integration of RabbitMQ with Oauth 2.0 auth backend plugin
Shell
41
star
60

rabbitmq-peer-discovery-consul

Consul-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
40
star
61

rabbitmq-federation

RabbitMQ Federation plugin
Erlang
39
star
62

rabbitmq-auth-backend-oauth2

RabbitMQ authorization backend that uses OAuth 2.0 (JWT) tokens
Erlang
38
star
63

rabbitmq-codegen

RabbitMQ protocol code-generation and machine-readable spec
Python
37
star
64

support-tools

A staging area for various support and troubleshooting tools that are not (or not yet) included into RabbitMQ distribution
Shell
36
star
65

ra-kv-store

Raft-based key-value store
Clojure
33
star
66

rabbitmq-perf-html

Web page to view performance results
JavaScript
33
star
67

rabbitmq-web-mqtt-examples

Examples for the Web MQTT plugin
JavaScript
32
star
68

rabbitmq-public-umbrella

Work with ease on multiple RabbitMQ sub-projects, e.g. core broker, plugins and some client libraries
Makefile
32
star
69

rules_erlang

Bazel rules for building Erlang applications and libraries
Starlark
32
star
70

rabbitmq-smtp

RabbitMQ SMTP gateway
Erlang
31
star
71

rabbitmq-metronome

RabbitMQ example plugin
Makefile
27
star
72

rabbitmq-rtopic-exchange

RabbitMQ Reverse Topic Exchange
Erlang
27
star
73

horus

Erlang library to create standalone modules from anonymous functions
Erlang
25
star
74

workloads

Continuous validation of RabbitMQ workloads
JavaScript
24
star
75

rabbitmq-tracer

AMQP 0-9-1 protocol analyzer
Java
24
star
76

rabbitmq-peer-discovery-aws

AWS-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
24
star
77

rabbitmq-shovel-management

RabbitMQ Shovel Management
Makefile
23
star
78

rabbitmq-auth-backend-ldap

RabbitMQ LDAP authentication
Erlang
22
star
79

rabbitmq-management-themes

Makefile
22
star
80

rabbitmq-auth-backend-amqp

Authentication over AMQP RPC
Erlang
20
star
81

rabbitmq-amqp1.0-client

Erlang AMQP 1.0 client
Erlang
20
star
82

erlang-data-structures

Erlang Data Structures
Erlang
20
star
83

chocolatey-package

RabbitMQ chocolatey package
PowerShell
19
star
84

lz4-erlang

LZ4 compression library for Erlang.
C
19
star
85

rabbitmq-service-nodejs-sample

A simple node.js sample app for the RabbitMQ service/add-on
JavaScript
18
star
86

rabbitmq-auth-backend-oauth2-spike

See rabbitmq/rabbitmq-auth-backend-oauth2 instead.
Erlang
17
star
87

rabbitmq-msg-store-index-eleveldb

LevelDB-based message store index for RabbitMQ
Erlang
17
star
88

rabbitmq-management-agent

RabbitMQ Management Agent
Erlang
17
star
89

rabbitmq-auth-backend-cache

Authorisation result caching plugin (backend) for RabbitMQ
Erlang
17
star
90

rabbitmq-jsonrpc-channel

RabbitMQ JSON-RPC Channels
JavaScript
15
star
91

rabbitmq-ha

Highly available queues for RabbitMQ
Erlang
15
star
92

rabbitmq-jsonrpc

RabbitMQ JSON-RPC Integration
Makefile
15
star
93

stdout_formatter

Erlang library to format paragraphs, lists and tables as plain text
Erlang
15
star
94

rabbitmq-peer-discovery-etcd

etcd-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
15
star
95

rabbitmq-federation-management

RabbitMQ Federation Management
Makefile
14
star
96

credentials-obfuscation

Tiny library/OTP app for credential obfuscation
Erlang
14
star
97

rabbitmq-server-release

RabbitMQ packaging and release engineering bits that do not belong to the Concourse pipelines.
Shell
13
star
98

seshat

Erlang
13
star
99

rabbitmq-jms-topic-exchange

Custom exchange that implements JMS topic selection for RabbitMQ
Erlang
13
star
100

rabbitmq-store-exporter

RabbitMQ Store Exporter
Erlang
12
star