• Stars
    star
    176
  • Rank 216,987 (Top 5 %)
  • Language
    Go
  • License
    Apache License 2.0
  • Created over 8 years ago
  • Updated 6 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Efficient reliable UDP unicast, UDP multicast, and IPC message transport - Go port

Build Status Go Report Card Join the chat at https://gitter.im/aeron-go/Lobby

aeron-go

Implementation of Aeron messaging client in Go.

Architecture, design, and protocol of Aeron can be found here

Usage

Example subscriber can be found here.

Example publication can be found here.

Common

Instantiate Aeron with Context:

ctx := aeron.NewContext().MediaDriverTimeout(time.Second * 10)

a := aeron.Connect(ctx)

Subscribers

Create subscription:

subscription := <-a.AddSubscription("aeron:ipc", 10)

defer subscription.Close()

aeron.AddSubscription() returns a channel, so that the user has the choice of blocking waiting for subscription to register with the driver or do async select poll.

Define callback for message processing:

handler := func(buffer *buffers.Atomic, offset int32, length int32, header *logbuffer.Header) {
    bytes := buffer.GetBytesArray(offset, length)

    fmt.Printf("Received a fragment with payload: %s\n", string(bytes))
}

Poll for messages:

idleStrategy := idlestrategy.Sleeping{time.Millisecond}

for {
    fragmentsRead := subscription.Poll(handler, 10)
    idleStrategy.Idle(fragmentsRead)
}

Publications

Create publication:

publication := <-a.AddPublication("aeron:ipc", 10)

defer publication.Close()

aeron.AddPublication() returns a channel, so that the user has the choice of blocking waiting for publication to register with the driver or do async select poll.

Create Aeron buffer to send the message:

message := fmt.Sprintf("this is a message %d", counter)

srcBuffer := buffers.MakeAtomic(([]byte)(message))

Optionally make sure that there are connected subscriptions:

for !publication.IsConnected() {
    time.Sleep(time.Millisecond * 10)
}

Send the message, by calling publication.Offer

ret := publication.Offer(srcBuffer, 0, int32(len(message)), nil)
switch ret {
case aeron.NotConnected:
    log.Print("not connected yet")
case aeron.BackPressured:
    log.Print("back pressured")
default:
    if ret < 0 {
        log.Print("Unrecognized code: %d", ret)
    } else {
        log.Print("success!")
    }
}