• Stars
    star
    2,356
  • Rank 19,528 (Top 0.4 %)
  • Language
    Go
  • License
    BSD 3-Clause "New...
  • Created over 7 years ago
  • Updated 2 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.

Goka

License Unit Tests/System Tests GoDoc Go Report Card

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go. Goka aims to reduce the complexity of building highly scalable and highly available microservices.

Goka extends the concept of Kafka consumer groups by binding a state table to them and persisting them in Kafka. Goka provides sane defaults and a pluggable architecture.

Features

  • Message Input and Output

    Goka handles all the message input and output for you. You only have to provide one or more callback functions that handle messages from any of the Kafka topics you are interested in. You only ever have to deal with deserialized messages.

  • Scaling

    Goka automatically distributes the processing and state across multiple instances of a service. This enables effortless scaling when the load increases.

  • Fault Tolerance

    In case of a failure, Goka will redistribute the failed instance's workload and state across the remaining healthy instances. All state is safely stored in Kafka and messages delivered with at-least-once semantics.

  • Built-in Monitoring and Introspection

    Goka provides a web interface for monitoring performance and querying values in the state.

  • Modularity

    Goka fosters a pluggable architecture which enables you to replace for example the storage layer or the Kafka communication layer.

Documentation

This README provides a brief, high level overview of the ideas behind Goka.

Package API documentation is available at GoDoc and the Wiki provides several tips for configuring, extending, and deploying Goka applications.

Installation

You can install Goka by running the following command:

$ go get -u github.com/lovoo/goka

Configuration

Goka relies on Sarama to perform the actual communication with Kafka, which offers many configuration settings. The config is documented here.

In most cases, you need to modify the config, e.g. to set the Kafka Version.

cfg := goka.DefaultConfig()
cfg.Version = sarama.V2_4_0_0
goka.ReplaceGlobalConfig(cfg)

This makes all goka components use the updated config.

If you do need specific configuration for different components, you need to pass customized builders to the component's constructor, e.g.

cfg := goka.DefaultConfig()
// modify the config with component-specific settings


// use the config by creating a builder which allows to override global config
goka.NewProcessor(// ...,
	goka.WithConsumerGroupBuilder(
		goka.ConsumerGroupBuilderWithConfig(cfg),
	),
	// ...
)

Concepts

Goka relies on Kafka for message passing, fault-tolerant state storage and workload partitioning.

  • Emitters deliver key-value messages into Kafka. As an example, an emitter could be a database handler emitting the state changes into Kafka for other interested applications to consume.

  • Processor is a set of callback functions that consume and perform state transformations upon delivery of these emitted messages. Processor groups are formed of one or more instances of a processor. Goka distributes the partitions of the input topics across all processor instances in a processor group. This enables effortless scaling and fault-tolerance. If a processor instance fails, its partitions and state are reassigned to the remaining healthy members of the processor group. Processors can also emit further messages into Kafka.

  • Group table is the state of a processor group. It is a partitioned key-value table stored in Kafka that belongs to a single processor group. If a processor instance fails, the remaining instances will take over the group table partitions of the failed instance recovering them from Kafka.

  • Views are local caches of a complete group table. Views provide read-only access to the group tables and can be used to provide external services for example through a gRPC interface.

  • Local storage keeps a local copy of the group table partitions to speedup recovery and reduce memory utilization. By default, the local storage uses LevelDB, but in-memory map and Redis-based storage are also available.

Get Started

An example Goka application could look like the following. An emitter emits a single message with key "some-key" and value "some-value" into the "example-stream" topic. A processor processes the "example-stream" topic counting the number of messages delivered for "some-key". The counter is persisted in the "example-group-table" topic. To locally start a dockerized Zookeeper and Kafka instances, execute make start with the Makefile in the examples folder.

package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/lovoo/goka"
	"github.com/lovoo/goka/codec"
)

var (
	brokers             = []string{"localhost:9092"}
	topic   goka.Stream = "example-stream"
	group   goka.Group  = "example-group"
)

// Emit messages forever every second
func runEmitter() {
	emitter, err := goka.NewEmitter(brokers, topic, new(codec.String))
	if err != nil {
		log.Fatalf("error creating emitter: %v", err)
	}
	defer emitter.Finish()
	for {
		time.Sleep(1 * time.Second)
		err = emitter.EmitSync("some-key", "some-value")
		if err != nil {
			log.Fatalf("error emitting message: %v", err)
		}
	}
}

// process messages until ctrl-c is pressed
func runProcessor() {
	// process callback is invoked for each message delivered from
	// "example-stream" topic.
	cb := func(ctx goka.Context, msg interface{}) {
		var counter int64
		// ctx.Value() gets from the group table the value that is stored for
		// the message's key.
		if val := ctx.Value(); val != nil {
			counter = val.(int64)
		}
		counter++
		// SetValue stores the incremented counter in the group table for in
		// the message's key.
		ctx.SetValue(counter)
		log.Printf("key = %s, counter = %v, msg = %v", ctx.Key(), counter, msg)
	}

	// Define a new processor group. The group defines all inputs, outputs, and
	// serialization formats. The group-table topic is "example-group-table".
	g := goka.DefineGroup(group,
		goka.Input(topic, new(codec.String), cb),
		goka.Persist(new(codec.Int64)),
	)

	p, err := goka.NewProcessor(brokers, g)
	if err != nil {
		log.Fatalf("error creating processor: %v", err)
	}
	ctx, cancel := context.WithCancel(context.Background())
	done := make(chan bool)
	go func() {
		defer close(done)
		if err = p.Run(ctx); err != nil {
			log.Fatalf("error running processor: %v", err)
		} else {
			log.Printf("Processor shutdown cleanly")
		}
	}()

	wait := make(chan os.Signal, 1)
	signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
	<-wait   // wait for SIGINT/SIGTERM
	cancel() // gracefully stop processor
	<-done
}

func main() {
	go runEmitter() // emits one message and stops
	runProcessor()  // press ctrl-c to stop
}

A very similar example is also in 1-simplest. Just run go run examples/1-simplest/main.go.

Note that tables have to be configured in Kafka with log compaction. For details check the Wiki.

How to contribute

Contributions are always welcome. Please fork the repo, create a pull request against master, and be sure tests pass. See the GitHub Flow for details.

More Repositories

1

NSFWDetector

A NSFW (aka porn) detector with CoreML
Swift
1,584
star
2

jenkins_exporter

Prometheus Metrics exporter for Jenkins
Python
163
star
3

nsq_exporter

Prometheus Metrics exporter for NSQ
Go
94
star
4

ipmi_exporter

IPMI Exporter for prometheus.io, written in Go.
Go
80
star
5

gcloud-opentracing

OpenTracing Tracer implementation for GCloud StackDriver in Go.
Go
45
star
6

nats_exporter

Prometheus Metrics exporter for NATS
Go
43
star
7

grpc-android-demo

A minimal GRPC demo project with automatic generation of GRPC implementations from a proto file. Also includes a minimalistic server implementation. Designed to be a dead simple playground for your first GRPC on Android experiments.
Java
42
star
8

xenstats_exporter

Prometheus exporter for xen (Storage, Host CPUs, Memory usage)
Go
21
star
9

android-tutorial-bubbles

A little ui framework that displays a styled tutorial bubble, which positions and scales itself based on a given anchor view.
Java
19
star
10

LVForgivingAssert

Create unit tests for methods that use NSAssert as pre-condition check.
Objective-C
16
star
11

goka-tools

Goka Tools provides different tools to be used with Goka
Go
11
star
12

android-pickpic

Ready to use library that allows people to select pictures from their device and Facebook account.
Kotlin
11
star
13

Github-action-confluence-sync

Github action for syncing files with confluence pages.
Python
10
star
14

LVModalQueue

This fixes 'NSInternalInconsistencyException's when presentViewController: and dismissViewController: are called, while a transition is already in progress. Transitions are queued for later execution.
Objective-C
9
star
15

cofire

An online Collaborative Filtering system for recommendations
Go
7
star
16

protoc-gen-go-grpcmock

Google protocol buffer compiler plugin to generate Mocks for gRPC Services in Go.
Go
6
star
17

tinycsv

Tiny CSV helper toolkit
Go
6
star
18

RuledScrollView

Android ScrollView implementation with additional Touch-Interception-Rules
Java
5
star
19

mozaik-ext-bitrise

A Mozaik extension for bitrise CI
JavaScript
5
star
20

nscd_exporter

Exports statistics from NSCD (Name service caching daemon) and publishes them for scraping by Prometheus.
Go
4
star
21

careers

Want to work at LOVOO?
2
star
22

tailor

Tailor sews buttons on any Android EditText.
Kotlin
2
star
23

opsgenie-cardiogram

Simple Heartbeat Reporter for Opsgenie
Go
2
star
24

bitrise-step-github-release

Bitrise step for making automated Github releases.
Go
2
star
25

drone-gcr

Drone 0.5 plugin for publishing Docker images to the Google Container Registry.
Go
1
star
26

lastwords

lastwords is a little library written solely in Kotlin which notifies you when your app is terminated - that is all activities are either finishing or have been destroyed.
Kotlin
1
star