• Stars
    star
    28
  • Rank 852,272 (Top 18 %)
  • Language
    Go
  • License
    MIT License
  • Created over 5 years ago
  • Updated about 4 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Go client to reliable queues based on Redis Cluster Streams

Ami

Go client to reliable queues based on Redis Cluster Streams.

Godoc Coverage Status Go Report Card Go

Consume/produce performance

Performance is dependent from:

  • Redis Cluster nodes count;
  • ping RTT from client to Redis Cluster master nodes;
  • network speed between nodes;
  • message sizes;
  • Ami configuration.

As example, 10-nodes Redis Cluster with half of nodes in other datacenter (50 msec ping), 1 master/1 slave, with message "{}" got:

$ go run examples/performance/main.go
Produced 1000000 in 3.423883 sec, rps 292066.022156
Consumed 151000 in 1.049238 sec, rps 143913.931722
Acked 151000 in 0.973587 sec, rps 155096.612263

Producer example

	type errorLogger struct{}

	func (l *errorLogger) AmiError(err error) {
		println("Got error from Ami:", err.Error())
	}

	pr, err := ami.NewProducer(
		ami.ProducerOptions{
			ErrorNotifier:     &errorLogger{},
			Name:              "ruthie",
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
			ShardsCount:       10,
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
			ReadTimeout:  time.Second * 60,
			WriteTimeout: time.Second * 60,
		},
	)
	if err != nil {
		panic(err)
	}

	for i := 0; i < 10000; i++ {
		pr.Send("{}")
	}

	pr.Close()

Consumer example

	type errorLogger struct{}

	func (l *errorLogger) AmiError(err error) {
		println("Got error from Ami:", err.Error())
	}

	cn, err := ami.NewConsumer(
		ami.ConsumerOptions{
			Consumer:          "alice",
			ErrorNotifier:     &errorLogger{},
			Name:              "ruthie",
			PendingBufferSize: 10000000,
			PipeBufferSize:    50000,
			PipePeriod:        time.Microsecond * 1000,
			PrefetchCount:     100,
			ShardsCount:       10,
		},
		&redis.ClusterOptions{
			Addrs:        []string{"172.17.0.1:7001", "172.17.0.1:7002"},
			ReadTimeout:  time.Second * 60,
			WriteTimeout: time.Second * 60,
		},
	)
	if err != nil {
		panic(err)
	}

	c := cn.Start()

	wg := sync.WaitGroup{}
	wg.Add(1)

	go func() {
		for {
			m, more := <-c
			if !more {
				break
			}
			println("Got", m.Body, "ID", m.ID)
			cn.Ack(m)
		}
		wg.Done()
	}()

	time.Sleep(time.Second)

	cn.Stop()
	wg.Wait()

	cn.Close()