aeron-go
Implementation of Aeron messaging client in Go.
Architecture, design, and protocol of Aeron can be found here
Usage
Example subscriber can be found here.
Example publication can be found here.
Common
Instantiate Aeron with Context:
ctx := aeron.NewContext().MediaDriverTimeout(time.Second * 10)
a := aeron.Connect(ctx)
Subscribers
Create subscription:
subscription := <-a.AddSubscription("aeron:ipc", 10)
defer subscription.Close()
aeron.AddSubscription()
returns a channel, so that the user has the choice
of blocking waiting for subscription to register with the driver or do async select
poll.
Define callback for message processing:
handler := func(buffer *buffers.Atomic, offset int32, length int32, header *logbuffer.Header) {
bytes := buffer.GetBytesArray(offset, length)
fmt.Printf("Received a fragment with payload: %s\n", string(bytes))
}
Poll for messages:
idleStrategy := idlestrategy.Sleeping{time.Millisecond}
for {
fragmentsRead := subscription.Poll(handler, 10)
idleStrategy.Idle(fragmentsRead)
}
Publications
Create publication:
publication := <-a.AddPublication("aeron:ipc", 10)
defer publication.Close()
aeron.AddPublication()
returns a channel, so that the user has the choice
of blocking waiting for publication to register with the driver or do async select
poll.
Create Aeron buffer to send the message:
message := fmt.Sprintf("this is a message %d", counter)
srcBuffer := buffers.MakeAtomic(([]byte)(message))
Optionally make sure that there are connected subscriptions:
for !publication.IsConnected() {
time.Sleep(time.Millisecond * 10)
}
Send the message, by calling publication.Offer
ret := publication.Offer(srcBuffer, 0, int32(len(message)), nil)
switch ret {
case aeron.NotConnected:
log.Print("not connected yet")
case aeron.BackPressured:
log.Print("back pressured")
default:
if ret < 0 {
log.Print("Unrecognized code: %d", ret)
} else {
log.Print("success!")
}
}