• Stars
    star
    501
  • Rank 88,002 (Top 2 %)
  • Language
    Go
  • License
    MIT License
  • Created over 11 years ago
  • Updated over 2 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

a durable message queue system for go based on redis, see also https://github.com/adjust/rmq

Note: This project is no longer actively maintained. Please refer to its spiritual successor rmq.

--

redismq

Build Status godoc

What is this

This is a fast, persistent, atomic message queue implementation that uses redis as its storage engine written in go. It uses atomic list commands to ensure that messages are delivered only once in the right order without being lost by crashing consumers.

Details can be found in the blog post about its initial design: http://big-elephants.com/2013-09/building-a-message-queue-using-redis-in-go/

A second article desribes the performance improvements of the current version: http://big-elephants.com/2013-10/tuning-redismq-how-to-use-redis-in-go/

What it's not

It's not a standalone server that you can use as a message queue, at least not for now. The implementation is done purely client side. All message queue commands are "translated" into redis commands and then executed via a redis client.

If you want to use this with any other language than go you have to translate all of the commands into your language of choice.

How to use it

All most all use cases are either covered in the examples or in the tests.

So the best idea is just to read those and figure it from there. But in any case:

Basics

To get started you need a running redis server. Since the tests run FlushDB() an otherwise unused database is highly recommended The first step is to create a new queue:

package main

import (
	"fmt"
	"github.com/adjust/redismq"
)

func main() {
	testQueue := redismq.CreateQueue("localhost", "6379", "", 9, "clicks")
	...
}

To write into the queue you simply use Put():

	...
	testQueue := redismq.CreateQueue("localhost", "6379", "", 9, "clicks")
	testQueue.Put("testpayload")
	...
}

The payload can be any kind of string, yes even a 10MB one.

To get messages out of the queue you need a consumer:

	...
	consumer, err := testQueue.AddConsumer("testconsumer")
	if err != nil {
		panic(err)
	}
	package, err := consumer.Get()
	if err != nil {
		panic(err)
	}
	fmt.Println(package.Payload)
	...
}

Payload will hold the original string, while package will have some additional header information.

To remove a package from the queue you have to Ack() it:

	...
	package, err := consumer.Get()
	if err != nil {
		panic(err)
	}
	err = package.Ack()
	if err != nil {
		panic(err)
	}
	...
}

Buffered Queues

When input speed is of the essence BufferedQueues will scratch that itch. They pipeline multiple puts into one fast operation. The only issue is that upon crashing or restart the packages in the buffer that haven't been written yet will be lost. So it's advised to wait one second before terminating your program to flush the buffer.

The usage is as easy as it gets:

	...
	bufferSize := 100
	testQueue := redismq.CreateBufferedQueue("localhost", "6379", "", 9, "clicks", bufferSize)
	testQueue.Start()
	...
}

Put() and Get() stay exactly the same. I have found anything over 200 as bufferSize not to increase performance any further.

To ensure that no packages are left in the buffer when you shut down your program you need to call FlushBuffer() which will tell the queue to flush the buffer and wait till it's empty.

	testQueue.FlushBuffer()

Multi Get

Like BufferedQueues for Get() MultiGet() speeds up the fetching of messages. The good news it comes without the buffer loss issues.

Usage is pretty straight forward with the only difference being the MultiAck():

	...
	packages, err := consumer.MultiGet(100)
	if err != nil {
		panic(err)
	}
	for i := range packages {
		fmt.Println(p[i].Payload)
	}
	packages[len(p)-1].MultiAck()
	...
}

MultiAck() can be called on any package in the array with all the prior packages being "acked". This way you can Fail() single packages.

Reject and Failed Queues

Similar to AMQP redismq supports Failed Queues meaning that packages that are rejected by a consumer will be stored in separate queue for further inspection. Alternatively a consumer can also Requeue() a package and put it back into the queue:

	...
	package, err := consumer.Get()
	if err != nil {
		panic(err)
	}
	err = package.Requeue()
	if err != nil {
		panic(err)
	}
	...
}

To push the message into the Failed Queue of this consumer simply use Fail():

	...
	package, err := consumer.Get()
	if err != nil {
		panic(err)
	}
	err = package.Fail()
	if err != nil {
		panic(err)
	}
	package, err = suite.consumer.GetUnacked()
	...
}

As you can see there is also a command to get messages from the Failed Queue.

How fast is it

Even though the original implementation wasn't aiming for high speeds the addition of BufferedQueues and MultiGet make it go something like this.

All of the following benchmarks were conducted on a MacBook Retina with a 2.4 GHz i7. The InputRate is the number of messages per second that get inserted, WorkRate the messages per second consumed.

Single Publisher, Two Consumers only atomic Get and Put

InputRate:	12183
WorkRate:	12397

Single Publisher, Two Consumers using BufferedQueues and MultiGet

InputRate:	46994
WorkRate:	25000

And yes that is a persistent message queue that can move over 70k messages per second.

If you want to find out for yourself checkout the example folder. The load.go or buffered_queue.go will start a web server that will display performance stats under http://localhost:9999/stats.

How persistent is it

As redis is the underlying storage engine you can set your desired persistence somewhere between YOLO and fsync(). With somewhat sane settings you should see no significant performance decrease.

Copyright

redismq is Copyright Β© 2014 adjust GmbH.

It is free software, and may be redistributed under the terms specified in the LICENSE file.

More Repositories

1

rmq

Message queue system written in Go and backed by Redis
Go
1,479
star
2

go-wrk

a small heavy duty http/https benchmark tool written in go
Go
816
star
3

ios_sdk

This is the iOS SDK of
Objective-C
579
star
4

android_sdk

This is the Android SDK of
Java
573
star
5

parquet_fdw

Parquet foreign data wrapper for PostgreSQL
C++
317
star
6

shrimp

a phantomjs based pdf renderer
Ruby
283
star
7

sdks

SDKs of Adjust
141
star
8

unity_sdk

This is the Unity SDK of
C#
133
star
9

react_native_sdk

This is the React Native SDK of
Objective-C
119
star
10

kafka_fdw

kafka foreign database wrapper for postresql
C
101
star
11

gohub

github webhook based deloyment server
Go
60
star
12

gorails

A set of go packages to integrate your Go app into existing Rails project.
Go
60
star
13

flutter_sdk

This is the Flutter SDK of
Dart
54
star
14

web_sdk

JavaScript
40
star
15

pgbundle

bundling postgres extension
Ruby
39
star
16

postgresql_extension_demo

37
star
17

cordova_sdk

This is the Cordova SDK of
JavaScript
35
star
18

istore

development repo for integer hstore replacement in postgres
C
34
star
19

goem

go extension manager
Go
33
star
20

AEProductController

Small wrapper for SKStoreProductViewController that handles tracking links.
Objective-C
33
star
21

pg_cryogen

Compressed append-only pluggable storage for PostgreSQL 12+
C
30
star
22

adobe_air_sdk

This is the Adobe AIR SDK of
Java
24
star
23

rport

Connection management and SQL parallelisation for R analytics on big database clusters
R
23
star
24

nvd3-rails

nvd3 reusable charts for rails 3
Ruby
23
star
25

goenv

go boilerplate code
Go
23
star
26

pg-telemetry

Useful monitoring views for PostgreSQL, packaged as an extension
PLpgSQL
22
star
27

hydra-curl

a bash script to download with hundreds of parallel curls
Shell
17
star
28

pg-base36

a base36 extension for postgres
C
16
star
29

wltree

adjust's patched version of postgres ltree
C
14
star
30

xamarin_sdk

This is the Xamarin SDK of
C#
14
star
31

api-client-r

an R client for the KPI service https://docs.adjust.com/en/kpi-service/
R
14
star
32

unreal_sdk

This is the Unreal SDK of
C++
14
star
33

pg-currency

1 Byte Currency ISO type for PostgreSQL
C
12
star
34

windows_sdk

This is the Windows SDK of
C#
10
star
35

schaufel

C
10
star
36

cocos2dx_sdk

This is the Cocos2d-x SDK of
C++
8
star
37

pg-country

country type for postgres
C
7
star
38

marmalade_sdk

This is the Marmalade SDK of http://www.adjust.com
C++
7
star
39

pg_c_dev

Repo for Extending postgresql with C
PLpgSQL
7
star
40

corona_sdk

This is the Corona SDK of
Java
7
star
41

pg-roleman

Role Management Extension with some Nice Common Functions
PLpgSQL
6
star
42

ajbool

triple bool for postgres
C
6
star
43

adjust_signature_sdk

6
star
44

go_conf

a go package to simplify configuring golang apps with database.yml rails style
Go
5
star
45

gentoo-overlay

adjust Gentoo overlay
Shell
5
star
46

AEPriceMatrix

Tier based currency conversion for iOS
Objective-C
4
star
47

pg-numhstore

a postgres extension to support inthstore and floathstore types
C
4
star
48

postgresql_exporter

Go
4
star
49

csv-gists-r

Facilitates maintaining CSV on gist.github.com directly from R objects
R
4
star
50

react-and-rockets

React & Rockets - Challenge for Adjust Frontend Developer
JavaScript
4
star
51

pg-device_type

a device_type extension for postgres
C
3
star
52

iOS6AdTracking

Objective-C
3
star
53

titanium_sdk

This is the Titanium SDK of
JavaScript
3
star
54

pg_querylog

Show queries running on PostgreSQL backends
C
3
star
55

pg_type_template

An extension template for Postgres type
Jinja
3
star
56

redis_failover

redis failover scripts
Perl
3
star
57

adjust_anes

Google Play Services ANE Builder
Makefile
2
star
58

pg_intmap

Compressed integer-to-integer map
C
2
star
59

githubWorkflows

Shared workflows
2
star
60

pg-ext-actions

Github action to build and test PostgreSQL extensions
Shell
2
star
61

rport_demo

A Demo Rport App
R
2
star
62

goautoneg

fork of goautoneg from https://bitbucket.org/ww/goautoneg
Go
2
star
63

pg-geoip2lookup

A PL/Perl based extension for PostgreSQL for lookup in data in geoip2 dos for PostgreSQL
SQLPL
2
star
64

all_substrings_tokenizer

C Postgres extension for extracting all substrings of a string
C
2
star
65

pg-language

Postgres extension of language enumeration type
PLpgSQL
2
star
66

pg_lock_pool

A postgres Extension to wait on a lock pool
PLpgSQL
2
star
67

file_cache

Filesystem-backed caching for data, big and small
Elixir
2
star
68

pg_spec

ruby minitest based pg_tap like test runner for postgres
PLpgSQL
1
star
69

webdev-assignment

Vue
1
star
70

design-tokens

CSS
1
star
71

airbrake-client-r

An R client for the Airbrake API
R
1
star
72

dumbo

postgres extension fun
Ruby
1
star
73

ios_adobe_extension

Adjust SDK extension for Adobe Experience Platform Mobile
Objective-C
1
star
74

postgres_tools

adjust postgres tools
Perl
1
star
75

michaelbot

Go
1
star
76

pg-ajversion

simple semantic version type for postgres
PLpgSQL
1
star
77

go_demo

gogo demo app
Ruby
1
star
78

mailbot

Perl
1
star
79

hubot

smart ass bot
CoffeeScript
1
star
80

zabbix-api

perl zabbix bindings
Perl
1
star
81

postgres_agg_funcs

aggregation helper functions written in c
C
1
star
82

pg-mvtbl

A postgres Extension to easily move tables around tablespaces
PLpgSQL
1
star
83

smart_banner_sdk

Adjust Smart Banner SDK
TypeScript
1
star
84

dev-docs

Developer documentation for Adjust's SDKs and APIs.
MDX
1
star
85

PGObject-Util-Replication-Slot

Replication Slot monitoring and management for PostgreSQL via Perl/CPAN
Makefile
1
star