• Stars
    star
    13
  • Rank 1,462,355 (Top 30 %)
  • Language
    Go
  • License
    MIT License
  • Created over 1 year ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Redis Streams queue driver for https://godoc.org/gocloud.dev/pubsub package

redispubsub

Redis driver for https://godoc.org/gocloud.dev/pubsub package.

A great alternative to using Kafka, with the ability to quickly switch to it. You can use this driver for MVP, local, small or medium projects. When your project grows you can simply switch to another driver from https://pkg.go.dev/gocloud.dev/pubsub#section-directories.

Using Redis Streams, this driver supports at-least-once delivery.

The driver uses these Redis commands:

  • XADD
  • XGROUP CREATE
  • XREADGROUP (with pending and then new messages - only this library actually supports it)
  • XACK
  • XAUTOCLAIM

Many other queuing implementations with Redis Streams contain a big bug. They incorrectly support reconnecting a consumer to a topic if a message has been received but not acknowledged. They use ">" streaming strategy, which does not deliver unacknowledged messages more than once. And you miss messages when microservices are restarted. This library does not have this disadvantage.

Connection to Redis

The connection string must be defined in the REDIS_URL environment value.

Warning about creating a topic consumer group for the first time

All consumers already have a group, even if there is only one consumer in the group.

Consumer groups receive the same messages from the topic, and consumers within the group receive these messages exclusively.

Messages flow

This driver supports new consumers joining with a new group name after the publisher has sent multiple messages to a topic before the group was created. These consumers will receive all previous non-ACK-ed messages from the beginning of the topic.

How to open topic and send message

import (
    _ "github.com/covrom/redispubsub"
    "gocloud.dev/pubsub"
)

ctx := context.Background()
topic, err := pubsub.OpenTopic(ctx, "redis://topics/1")
if err != nil {
    return fmt.Errorf("could not open topic: %v", err)
}
defer topic.Shutdown(ctx)

m := &pubsub.Message{
    Body: []byte("Hello, World!\n"),
    // Metadata is optional and can be nil.
    Metadata: map[string]string{
        // These are examples of metadata.
        // There is nothing special about the key names.
        "language":   "en",
        "importance": "high",
    },
}

err = topic.Send(ctx, m)
if err != nil {
    return err
}

OpenTopic connection string host/path is required and must contain the topic name.

Connection string query parameters:

  • maxlen is MAXLEN parameter for XADD (limit queue length), unlimited if not set.

How to subscribe on topic

import (
    _ "github.com/covrom/redispubsub"
    "gocloud.dev/pubsub"
)

subs, err := pubsub.OpenSubscription(ctx, "redis://group1?consumer=cons1&topic=topics/1")
if err != nil {
    return err
}
defer subs.Shutdown(ctx)

msg, err := subs.Receive(ctx)
if err != nil {
    // Errors from Receive indicate that Receive will no longer succeed.
    return fmt.Errorf("Receiving message: %v", err)
}
// Do work based on the message, for example:
fmt.Printf("Got message: %q\n", msg.Body)
// Messages must always be acknowledged with Ack.
msg.Ack()

OpenSubscription connection string host(path) is required and must contain the consumer group name.

Connection string query parameters:

  • topic is topic name, required
  • consumer is unique consumer name, required
  • from is FROM option for creating a consumer group (if not exists) with XGROUP CREATE, default is '0'
  • autoclaim is min-idle-time option for XAUTOCLAIM, 30 min by default
  • noack is NOACK option for XREADGROUP, not used by default

See basic_test.go for full usage example.

Monitoring with Prometheus & Grafana

Use redis-exporter prometheus exporter with check-streams option.

See streams.go for details.

More Repositories

1

gonec

Платформа создания микросервисов на 1С-подобном языке
Go
181
star
2

hls-streamer

hls video streaming from OBS Studio source to clients browsers
Go
35
star
3

goerd

Golang PostgreSQL schema migration tool
Go
25
star
4

highloadcup2018

HighLoad Cup 2018
Go
14
star
5

xml2json

Universal unmarshal xml and re-marshal it to json
Go
10
star
6

fsm

finite state machine in Go
Go
6
star
7

hex_arch_example

Example of Hexagonal Architecture for training course
Go
5
star
8

rustime

format time (time.Time) in russian language - library
Go
4
star
9

bingo

Embed binary or html/css/js file or weblink into go package variable
Go
4
star
10

pgparty

Postgres database/sql access layer with Go generics 1.18+
Go
4
star
11

ypayfunc

Yoomoney payment notification function for Yandex Cloud Functions
Go
3
star
12

chandal

Abstract data layer with channels - example
Go
3
star
13

ldcache

Loading cache with Go generics 1.18+
Go
3
star
14

easysettings

Easy getting app settings from json, environment and flags with struct tags
Go
2
star
15

inmemdb

simple in-memory datatables with join iterators on column indexes, compatible with sqlx longtime store
Go
2
star
16

galg

Some algorithms and data structures with Go generics 1.18+
Go
2
star
17

hexagonarch

Hexagonal (clean) architecture template with Golang
Go
2
star
18

goscript

Execution of scripts in the language go
Go
2
star
19

wallpaperloader

Randomly load wallpaper from images.yandex.ru or Bing or Unsplash services
Go
2
star
20

diff

fast diff library for Myers algorithm
Go
2
star
21

siphash

Fast hash for short bytes and strings
Go
1
star
22

kafkaframe

Go and Kafka pubsub templates
Go
1
star
23

grpcmtls

Golang mTLS grpc server and client
Go
1
star
24

torrentfs

torrentfs with multiple dirs watching
Go
1
star
25

bulmactl

Bulma CSS controller for rapid SPA webapps with Go
Go
1
star
26

cmemdb

Column in-memory database engine with join operations
Go
1
star
27

taxi1c

Extracting some of 1c interface taxi
CSS
1
star