• Stars
    star
    167
  • Rank 226,635 (Top 5 %)
  • Language
    Go
  • License
    MIT License
  • Created about 4 years ago
  • Updated 3 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A client library for RabbitMQ streams

RabbitMQ Stream GO Client


Build codecov

Go client for RabbitMQ Stream Queues

Table of Contents

Overview

Go client for RabbitMQ Stream Queues

Installing

 go get -u github.com/rabbitmq/rabbitmq-stream-go-client

imports:

"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" // Main package
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" // amqp 1.0 package to encode messages
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" // messages interface package, you may not need to import it directly

Run server with Docker


You may need a server to test locally. Let's start the broker:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
    rabbitmq:3.9-management

The broker should start in a few seconds. When it’s ready, enable the stream plugin and stream_management:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream_management

Management UI: http://localhost:15672/
Stream uri: rabbitmq-stream://guest:guest@localhost:5552

Getting started for impatient

See getting started example.

Examples

See examples directory for more use cases.

Usage

Connect

Standard way to connect single node:

env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5552).
			SetUser("guest").
			SetPassword("guest"))
	CheckErr(err)

you can define the number of producers per connections, the default value is 1:

stream.NewEnvironmentOptions().
SetMaxProducersPerClient(2))

you can define the number of consumers per connections, the default value is 1:

stream.NewEnvironmentOptions().
SetMaxConsumersPerClient(2))

To have the best performance you should use the default values. Note about multiple consumers per connection: The IO threads is shared across the consumers, so if one consumer is slow it could impact other consumers performances

Multi hosts

It is possible to define multi hosts, in case one fails to connect the clients tries random another one.

addresses := []string{
		"rabbitmq-stream://guest:guest@host1:5552/%2f",
		"rabbitmq-stream://guest:guest@host2:5552/%2f",
		"rabbitmq-stream://guest:guest@host3:5552/%2f"}

env, err := stream.NewEnvironment(
			stream.NewEnvironmentOptions().SetUris(addresses))

Load Balancer

The stream client is supposed to reach all the hostnames, in case of load balancer you can use the stream.AddressResolver parameter in this way:

addressResolver := stream.AddressResolver{
		Host: "load-balancer-ip",
		Port: 5552,
	}
env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost(addressResolver.Host).
			SetPort(addressResolver.Port).
			SetAddressResolver(addressResolver).

In this configuration the client tries the connection until reach the right node.

This rabbitmq blog post explains the details.

See also "Using a load balancer" example in the examples directory

TLS

To configure TLS you need to set the IsTLS parameter:

env, err := stream.NewEnvironment(
		stream.NewEnvironmentOptions().
			SetHost("localhost").
			SetPort(5551). // standard TLS port
			SetUser("guest").
			SetPassword("guest").
			IsTLS(true).
			SetTLSConfig(&tls.Config{}),
	)

The tls.Config is the standard golang tls library https://pkg.go.dev/crypto/tls
See also "Getting started TLS" example in the examples directory.

It is also possible to configure TLS using the Schema URI like:

env, err := stream.NewEnvironment(
				stream.NewEnvironmentOptions().
				SetUri("rabbitmq-stream+tls://guest:guest@localhost:5551/").
				SetTLSConfig(&tls.Config{}),
)

Sasl Mechanisms

To configure SASL you need to set the SaslMechanism parameter Environment.SetSaslConfiguration:

cfg := new(tls.Config)
cfg.ServerName = "my_server_name"
cfg.RootCAs = x509.NewCertPool()

if ca, err := os.ReadFile("certs/ca_certificate.pem"); err == nil {
	cfg.RootCAs.AppendCertsFromPEM(ca)
}

if cert, err := tls.LoadX509KeyPair("certs/client/cert.pem", "certs/client/key.pem"); err == nil {
cfg.Certificates = append(cfg.Certificates, cert)
}

env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
	SetUri("rabbitmq-stream+tls://my_server_name:5551/").
	IsTLS(true).
	SetSaslConfiguration(stream.SaslConfigurationExternal). // SASL EXTERNAL
	SetTLSConfig(cfg))

Streams

To define streams you need to use the the environment interfaces DeclareStream and DeleteStream.

It is highly recommended to define stream retention policies during the stream creation, like MaxLengthBytes or MaxAge:

err = env.DeclareStream(streamName,
		stream.NewStreamOptions().
		SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))

The function DeclareStream doesn't return errors if a stream is already defined with the same parameters. Note that it returns the precondition failed when it doesn't have the same parameters Use StreamExists to check if a stream exists.

Streams Statistics

To get stream statistics you need to use the the environment.StreamStats method.

stats, err := environment.StreamStats(testStreamName)

// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet

firstOffset, err := stats.FirstOffset() // first offset of the stream

// LastOffset - The last offset in the stream.
// return last offset in the stream
// error if there is no first offset yet
lastOffset, err := stats.LastOffset() // last offset of the stream

// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
//	It is the offset of the first message in the last chunk confirmed by a quorum of the stream
//	cluster members (leader and replicas).
//
//	The committed chunk ID is a good indication of what the last offset of a stream can be at a
//	given time. The value can be stale as soon as the application reads it though, as the committed
//	chunk ID for a stream that is published to changes all the time.

committedChunkId, err := statsAfter.CommittedChunkId()

Publish messages

To publish a message you need a *stream.Producer instance:

producer, err :=  env.NewProducer("my-stream", nil)

With ProducerOptions is possible to customize the Producer behaviour:

type ProducerOptions struct {
	Name       string // Producer name, it is useful to handle deduplication messages
	QueueSize  int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
	BatchSize  int // It is the batch-size aggregation, low value reduce the latency, high value increase the throughput
	BatchPublishingDelay int    // Period to send a batch of messages.
}

The client provides two interfaces to send messages. send:

var message message.StreamMessage
message = amqp.NewMessage([]byte("hello"))
err = producer.Send(message)

and BatchSend:

var messages []message.StreamMessage
for z := 0; z < 10; z++ {
  messages = append(messages, amqp.NewMessage([]byte("hello")))
}
err = producer.BatchSend(messages)

producer.Send:

  • accepts one message as parameter
  • automatically aggregates the messages
  • automatically splits the messages in case the size is bigger than requestedMaxFrameSize
  • automatically splits the messages based on batch-size
  • sends the messages in case nothing happens in producer-send-timeout
  • is asynchronous

producer.BatchSend:

  • accepts an array messages as parameter
  • is synchronous

Close the producer: producer.Close() the producer is removed from the server. TCP connection is closed if there aren't other producers

Send vs BatchSend

The BatchSend is the primitive to send the messages, Send introduces a smart layer to publish messages and internally uses BatchSend.

The Send interface works in most of the cases, In some condition is about 15/20 slower than BatchSend. See also this thread.

Publish Confirmation

For each publish the server sends back to the client the confirmation or an error. The client provides an interface to receive the confirmation:

//optional publish confirmation channel
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm)

func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
	go func() {
		for confirmed := range confirms {
			for _, msg := range confirmed {
				if msg.IsConfirmed() {
					fmt.Printf("message %s stored \n  ", msg.GetMessage().GetData())
				} else {
					fmt.Printf("message %s failed \n  ", msg.GetMessage().GetData())
				}
			}
		}
	}()
}

In the MessageStatus struct you can find two publishingId:

//first one
messageStatus.GetMessage().GetPublishingId()
// second one
messageStatus.GetPublishingId()

The first one is provided by the user for special cases like Deduplication. The second one is assigned automatically by the client. In case the user specifies the publishingId with:

msg = amqp.NewMessage([]byte("mymessage"))
msg.SetPublishingId(18) // <---

The filed: messageStatus.GetMessage().HasPublishingId() is true and
the values messageStatus.GetMessage().GetPublishingId() and messageStatus.GetPublishingId() are the same.

See also "Getting started" example in the examples directory

Deduplication

The stream plugin can handle deduplication data, see this blog post for more details: https://blog.rabbitmq.com/posts/2021/07/rabbitmq-streams-message-deduplication/
You can find a "Deduplication" example in the examples directory.
Run it more than time, the messages count will be always 10.

To retrieve the last sequence id for producer you can use:

publishingId, err := producer.GetLastPublishingId()

Sub Entries Batching

The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame, meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.
You can find a "Sub Entries Batching" example in the examples directory.

Default compression is None (no compression) but you can define different kind of compressions: GZIP,SNAPPY,LZ4,ZSTD
Compression is valid only is SubEntrySize > 1

producer, err := env.NewProducer(streamName, stream.NewProducerOptions().
		SetSubEntrySize(100).
		SetCompression(stream.Compression{}.Gzip()))

Ha Producer Experimental

The ha producer is built up the standard producer.
Features:

  • auto-reconnect in case of disconnection
  • handle the unconfirmed messages automatically in case of fail.

You can find a "HA producer" example in the examples directory.

haproducer := NewHAProducer(
	env *stream.Environment, // mandatory
	streamName string, // mandatory
	producerOptions *stream.ProducerOptions, //optional
	confirmMessageHandler ConfirmMessageHandler // mandatory
	)

Consume messages

In order to consume messages from a stream you need to use the NewConsumer interface, ex:

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
	fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data)
}

consumer, err := env.NewConsumer(
		"my-stream",
		handleMessages,
		....

With ConsumerOptions it is possible to customize the consumer behaviour.

  stream.NewConsumerOptions().
  SetConsumerName("my_consumer").                  // set a consumer name
  SetCRCCheck(false).  // Enable/Disable the CRC control.
  SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning

Disabling the CRC control can increase the performances.

See also "Offset Start" example in the examples directory

Close the consumer: consumer.Close() the consumer is removed from the server. TCP connection is closed if there aren't other consumers

Manual Track Offset

The server can store the current delivered offset given a consumer, in this way:

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
		if atomic.AddInt32(&count, 1)%1000 == 0 {
			err := consumerContext.Consumer.StoreOffset()  // commit all messages up to the current message's offset
			....

consumer, err := env.NewConsumer(
..
stream.NewConsumerOptions().
			SetConsumerName("my_consumer"). <------

A consumer must have a name to be able to store offsets.
Note: AVOID to store the offset for each single message, it will reduce the performances

See also "Offset Tracking" example in the examples directory

The server can also store a previous delivered offset rather than the current delivered offset, in this way:

processMessageAsync := func(consumer stream.Consumer, message *amqp.Message, offset int64) {
    ....
    err := consumer.StoreCustomOffset(offset)  // commit all messages up to this offset
    ....

This is useful in situations where we have to process messages asynchronously and we cannot block the original message handler. Which means we cannot store the current or latest delivered offset as we saw in the handleMessages function above.

Automatic Track Offset

The following snippet shows how to enable automatic tracking with the defaults:

stream.NewConsumerOptions().
			SetConsumerName("my_consumer").
			SetAutoCommit(stream.NewAutoCommitStrategy() ...

nil is also a valid value. Default values will be used

stream.NewConsumerOptions().
			SetConsumerName("my_consumer").
			SetAutoCommit(nil) ...

Set the consumer name (mandatory for offset tracking)

The automatic tracking strategy has the following available settings:

  • message count before storage: the client will store the offset after the specified number of messages,
    right after the execution of the message handler. The default is every 10,000 messages.

  • flush interval: the client will make sure to store the last received offset at the specified interval.
    This avoids having pending, not stored offsets in case of inactivity. The default is 5 seconds.

Those settings are configurable, as shown in the following snippet:

stream.NewConsumerOptions().
	// set a consumerOffsetNumber name
	SetConsumerName("my_consumer").
	SetAutoCommit(stream.NewAutoCommitStrategy().
		SetCountBeforeStorage(50). // store each 50 messages stores
		SetFlushInterval(10*time.Second)). // store each 10 seconds
	SetOffset(stream.OffsetSpecification{}.First()))

See also "Automatic Offset Tracking" example in the examples directory

Get consumer offset

It is possible to query the consumer offset using:

offset, err := env.QueryOffset("consumer_name", "streamName")

An error is returned if the offset doesn't exist.

Handle Close

Client provides an interface to handle the producer/consumer close.

channelClose := consumer.NotifyClose()
defer consumerClose(channelClose)
func consumerClose(channelClose stream.ChannelClose) {
	event := <-channelClose
	fmt.Printf("Consumer: %s closed on the stream: %s, reason: %s \n", event.Name, event.StreamName, event.Reason)
}

In this way it is possible to handle fail-over

Performance test tool

Performance test tool it is useful to execute tests. See also the Java Performance tool

To install you can download the version from github:

Mac:

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_darwin_amd64.tar.gz

Linux:

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_linux_amd64.tar.gz

Windows

https://github.com/rabbitmq/rabbitmq-stream-go-client/releases/latest/download/stream-perf-test_windows_amd64.zip

execute stream-perf-test --help to see the parameters. By default it executes a test with one producer, one consumer.

here an example:

stream-perf-test --publishers 3 --consumers 2 --streams my_stream --max-length-bytes 2GB --uris rabbitmq-stream://guest:guest@localhost:5552/  --fixed-body 400 --time 10

Performance test tool Docker

A docker image is available: pivotalrabbitmq/go-stream-perf-test, to test it:

Run the server is host mode:

 docker run -it --rm --name rabbitmq --network host \
    rabbitmq:3.9-management

enable the plugin:

 docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream

then run the docker image:

docker run -it --network host  pivotalrabbitmq/go-stream-perf-test

To see all the parameters:

docker run -it --network host  pivotalrabbitmq/go-stream-perf-test --help

Build form source

make build

To execute the tests you need a docker image, you can use:

make rabbitmq-server

to run a ready rabbitmq-server with stream enabled for tests.

then make test

More Repositories

1

rabbitmq-server

Open source RabbitMQ: core server and tier 1 (built-in) plugins
Starlark
11,952
star
2

rabbitmq-tutorials

Tutorials for using RabbitMQ in various ways
Java
6,393
star
3

rabbitmq-dotnet-client

RabbitMQ .NET client for .NET Standard 2.0+ and .NET 4.6.2+
C#
2,056
star
4

rabbitmq-delayed-message-exchange

Delayed Messaging for RabbitMQ
Erlang
1,992
star
5

internals

High level architecture overview
1,444
star
6

amqp091-go

An AMQP 0-9-1 Go client maintained by the RabbitMQ team. Originally by @streadway: `streadway/amqp`
Go
1,319
star
7

rabbitmq-java-client

RabbitMQ Java client
Java
1,227
star
8

cluster-operator

RabbitMQ Cluster Kubernetes Operator
Go
865
star
9

ra

A Raft implementation for Erlang and Elixir that strives to be efficient and make it easier to use multiple Raft clusters in a single system.
Erlang
800
star
10

rabbitmq-website

RabbitMQ website
JavaScript
769
star
11

erlang-rpm

Latest Erlang/OTP releases packaged as a zero dependency RPM, just enough for running RabbitMQ
Shell
540
star
12

rabbitmq-management

RabbitMQ Management UI and HTTP API
Erlang
369
star
13

tls-gen

Generates self-signed x509/TLS/SSL certificates useful for development
Python
362
star
14

rabbitmq-perf-test

A load testing tool
Java
355
star
15

khepri

Khepri is a tree-like replicated on-disk database library for Erlang and Elixir.
Erlang
317
star
16

erlando

Erlando
Erlang
306
star
17

rabbitmq-sharding

Sharded logical queues for RabbitMQ: a queue type which provides improved parallelism and thoughput at the cost of total ordering
Erlang
303
star
18

rabbitmq-peer-discovery-k8s

Kubernetes-based peer discovery mechanism for RabbitMQ
Erlang
296
star
19

rabbitmq-objc-client

RabbitMQ client for Objective-C and Swift
Objective-C
241
star
20

chef-cookbook

Development repository for Chef cookbook RabbitMQ
Ruby
211
star
21

rmq-0mq

ZeroMQ support in RabbitMQ
Erlang
210
star
22

rabbitmq-consistent-hash-exchange

RabbitMQ Consistent Hash Exchange Type
Erlang
209
star
23

rabbitmq-auth-backend-http

HTTP-based authorisation and authentication for RabbitMQ
Makefile
199
star
24

rabbitmq-erlang-client

Erlang client for RabbitMQ
Erlang
185
star
25

rabbitmq-mqtt

RabbitMQ MQTT plugin
Erlang
173
star
26

rabbitmq-stream-rust-client

A client library for RabbitMQ streams
Rust
148
star
27

rabbitmq-prometheus

A minimalistic Prometheus exporter of core RabbitMQ metrics
Erlang
145
star
28

looking_glass

An Erlang/Elixir/BEAM profiler tool
Erlang
139
star
29

hop

RabbitMQ HTTP API client for Java, Groovy, and other JVM languages
Java
137
star
30

messaging-topology-operator

RabbitMQ messaging topology operator
Go
123
star
31

rabbitmq-cli

Command line tools for RabbitMQ
Elixir
105
star
32

rabbitmq-stream-dotnet-client

RabbitMQ client for the stream protocol
C#
100
star
33

rabbitmq-web-stomp-examples

Makefile
94
star
34

rabbitmq-amqp1.0

AMQP 1.0 support for RabbitMQ
Erlang
93
star
35

rabbitmq-web-stomp

Provides support for STOMP over WebSockets
Erlang
89
star
36

rabbitmq-recent-history-exchange

RabbitMQ Recent History Exchange
Makefile
82
star
37

diy-kubernetes-examples

Examples that demonstrate how deploy a RabbitMQ cluster to Kubernetes, the DIY way
Makefile
82
star
38

rabbitmq-event-exchange

Expose broker events as messages
Erlang
78
star
39

rabbitmq-clusterer

This project is ABANDONWARE. Use https://www.rabbitmq.com/cluster-formation.html instead.
Erlang
72
star
40

rabbitmq-message-timestamp

A RabbitMQ plugin that adds a timestamp to all incoming messages
Makefile
72
star
41

rabbitmq-common

Common library used by rabbitmq-server and rabbitmq-erlang-client
Erlang
66
star
42

rabbitmq-c

The official rabbitmq-c sources have moved to:
C
65
star
43

tgir

Official repository for Thank Goodness It's RabbitMQ (TGIR)!
Makefile
65
star
44

rabbitmq-jms-client

RabbitMQ JMS client
Java
61
star
45

rabbit-socks

Websocket and Socket.IO support for RabbitMQ (deprecated -- see https://github.com/sockjs/sockjs-erlang instead)
Erlang
58
star
46

rabbitmq-web-mqtt

Provides support for MQTT over WebSockets
Erlang
55
star
47

rabbitmq-stream-java-client

RabbitMQ Stream Java Client
Java
55
star
48

rabbitmq-top

Adds top-like information on the Erlang VM to the management plugin.
Makefile
55
star
49

rabbitmq-shovel

RabbitMQ Shovel plugin
Erlang
53
star
50

rabbitmq-auth-mechanism-ssl

RabbitMQ TLS (x509 certificate) authentication mechanism
Makefile
52
star
51

aten

An adaptive accrual node failure detection library for Elixir and Erlang
Erlang
50
star
52

rabbitmq-stomp

RabbitMQ STOMP plugin
Erlang
49
star
53

rabbitmq-tracing

RabbitMQ Tracing
Erlang
48
star
54

mnevis

Raft-based, consensus oriented implementation of Mnesia transactions
Erlang
48
star
55

gen-batch-server

A generic batching server for Erlang and Elixir
Erlang
47
star
56

rabbitmq-priority-queue

Priority Queues
46
star
57

osiris

Log based streaming subsystem for RabbitMQ
Erlang
45
star
58

rabbitmq-management-visualiser

RabbitMQ Topology Visualiser
JavaScript
41
star
59

rabbitmq-oauth2-tutorial

Explore integration of RabbitMQ with Oauth 2.0 auth backend plugin
Shell
41
star
60

rabbitmq-peer-discovery-consul

Consul-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
40
star
61

rabbitmq-federation

RabbitMQ Federation plugin
Erlang
39
star
62

rabbitmq-auth-backend-oauth2

RabbitMQ authorization backend that uses OAuth 2.0 (JWT) tokens
Erlang
38
star
63

rabbitmq-codegen

RabbitMQ protocol code-generation and machine-readable spec
Python
37
star
64

support-tools

A staging area for various support and troubleshooting tools that are not (or not yet) included into RabbitMQ distribution
Shell
36
star
65

ra-kv-store

Raft-based key-value store
Clojure
33
star
66

rabbitmq-perf-html

Web page to view performance results
JavaScript
33
star
67

rabbitmq-web-mqtt-examples

Examples for the Web MQTT plugin
JavaScript
32
star
68

rabbitmq-public-umbrella

Work with ease on multiple RabbitMQ sub-projects, e.g. core broker, plugins and some client libraries
Makefile
32
star
69

rules_erlang

Bazel rules for building Erlang applications and libraries
Starlark
32
star
70

rabbitmq-smtp

RabbitMQ SMTP gateway
Erlang
31
star
71

rabbitmq-metronome

RabbitMQ example plugin
Makefile
27
star
72

rabbitmq-rtopic-exchange

RabbitMQ Reverse Topic Exchange
Erlang
27
star
73

horus

Erlang library to create standalone modules from anonymous functions
Erlang
25
star
74

workloads

Continuous validation of RabbitMQ workloads
JavaScript
24
star
75

rabbitmq-tracer

AMQP 0-9-1 protocol analyzer
Java
24
star
76

rabbitmq-peer-discovery-aws

AWS-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
24
star
77

rabbitmq-shovel-management

RabbitMQ Shovel Management
Makefile
23
star
78

rabbitmq-auth-backend-ldap

RabbitMQ LDAP authentication
Erlang
22
star
79

rabbitmq-management-themes

Makefile
22
star
80

rabbitmq-auth-backend-amqp

Authentication over AMQP RPC
Erlang
20
star
81

rabbitmq-amqp1.0-client

Erlang AMQP 1.0 client
Erlang
20
star
82

erlang-data-structures

Erlang Data Structures
Erlang
20
star
83

chocolatey-package

RabbitMQ chocolatey package
PowerShell
19
star
84

lz4-erlang

LZ4 compression library for Erlang.
C
19
star
85

rabbitmq-service-nodejs-sample

A simple node.js sample app for the RabbitMQ service/add-on
JavaScript
18
star
86

rabbitmq-auth-backend-oauth2-spike

See rabbitmq/rabbitmq-auth-backend-oauth2 instead.
Erlang
17
star
87

rabbitmq-msg-store-index-eleveldb

LevelDB-based message store index for RabbitMQ
Erlang
17
star
88

rabbitmq-management-agent

RabbitMQ Management Agent
Erlang
17
star
89

rabbitmq-auth-backend-cache

Authorisation result caching plugin (backend) for RabbitMQ
Erlang
17
star
90

rabbitmq-jsonrpc-channel

RabbitMQ JSON-RPC Channels
JavaScript
15
star
91

rabbitmq-ha

Highly available queues for RabbitMQ
Erlang
15
star
92

rabbitmq-jsonrpc

RabbitMQ JSON-RPC Integration
Makefile
15
star
93

stdout_formatter

Erlang library to format paragraphs, lists and tables as plain text
Erlang
15
star
94

rabbitmq-peer-discovery-etcd

etcd-based peer discovery backend for RabbitMQ 3.7.0+
Erlang
15
star
95

rabbitmq-federation-management

RabbitMQ Federation Management
Makefile
14
star
96

credentials-obfuscation

Tiny library/OTP app for credential obfuscation
Erlang
14
star
97

rabbitmq-server-release

RabbitMQ packaging and release engineering bits that do not belong to the Concourse pipelines.
Shell
13
star
98

seshat

Erlang
13
star
99

rabbitmq-jms-topic-exchange

Custom exchange that implements JMS topic selection for RabbitMQ
Erlang
13
star
100

rabbitmq-store-exporter

RabbitMQ Store Exporter
Erlang
12
star