Kafka Konsumer
Description
Kafka Konsumer provides an easy implementation of Kafka consumer with a built-in retry/exception manager (kafka-cronsumer).
Guide
Installation
go get github.com/Trendyol/kafka-konsumer@latest
Examples
You can find a number of ready-to-run examples at this directory.
After running docker-compose up
command, you can run any application you want.
Simple Consumer
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
ConsumeFn: consumeFn,
RetryEnabled: false,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
Simple Consumer With Retry/Exception Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
ConsumeFn: consumeFn,
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func consumeFn(message kafka.Message) error {
fmt.Printf("Message From %s with value %s", message.Topic, string(message.Value))
return nil
}
With Batch Option
func main() {
consumerCfg := &kafka.ConsumerConfig{
Reader: kafka.ReaderConfig{
Brokers: []string{"localhost:29092"},
Topic: "standart-topic",
GroupID: "standart-cg",
},
LogLevel: kafka.LogLevelDebug,
RetryEnabled: true,
RetryConfiguration: kafka.RetryConfiguration{
Brokers: []string{"localhost:29092"},
Topic: "retry-topic",
StartTimeCron: "*/1 * * * *",
WorkDuration: 50 * time.Second,
MaxRetry: 3,
},
BatchConfiguration: kafka.BatchConfiguration{
MessageGroupLimit: 1000,
MessageGroupDuration: time.Second,
BatchConsumeFn: batchConsumeFn,
},
}
consumer, _ := kafka.NewConsumer(consumerCfg)
defer consumer.Stop()
consumer.Consume()
}
func batchConsumeFn(messages []kafka.Message) error {
fmt.Printf("%d\n comes first %s", len(messages), messages[0].Value)
return nil
}
With Grafana & Prometheus
In this example, we are demonstrating how to create Grafana dashboard and how to define alerts in Prometheus. You can
see the example by going to the with-grafana folder in the examples folder
and running the infrastructure with docker compose up
and then the application.
With SASL-PLAINTEXT Authentication
Under the examples - with-sasl-plaintext folder, you can find an example
of a consumer integration with SASL/PLAIN mechanism. To try the example, you can run the command docker compose up
under the specified folder and then start the application.
Configurations
config | description | default |
---|---|---|
reader |
Describes all segmentio kafka reader configurations | |
consumeFn |
Kafka consumer function, if retry enabled it, is also used to consume retriable messages | |
logLevel |
Describes log level; valid options are debug , info , warn , and error |
info |
concurrency |
Number of goroutines used at listeners | 1 |
retryEnabled |
Retry/Exception consumer is working or not | false |
rack |
see doc | |
clientId |
see doc | |
retryConfiguration.clientId |
see doc | |
retryConfiguration.startTimeCron |
Cron expression when retry consumer (kafka-cronsumer) starts to work at | |
retryConfiguration.workDuration |
Work duration exception consumer actively consuming messages | |
retryConfiguration.topic |
Retry/Exception topic names | |
retryConfiguration.brokers |
Retry topic brokers urls | |
retryConfiguration.maxRetry |
Maximum retry value for attempting to retry a message | 3 |
retryConfiguration.tls.rootCAPath |
see doc | "" |
retryConfiguration.tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
retryConfiguration.sasl.authType |
SCRAM or PLAIN |
|
retryConfiguration.sasl.username |
SCRAM OR PLAIN username | |
retryConfiguration.sasl.password |
SCRAM OR PLAIN password | |
batchConfiguration.messageGroupLimit |
Maximum number of messages in a batch | |
batchConfiguration.messageGroupDuration |
Maximum time to wait for a batch | |
tls.rootCAPath |
see doc | "" |
tls.intermediateCAPath |
Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" |
sasl.authType |
SCRAM or PLAIN |
|
sasl.username |
SCRAM OR PLAIN username | |
sasl.password |
SCRAM OR PLAIN password | |
logger |
If you want to custom logger | info |
apiEnabled |
Enabled metrics | false |
apiConfiguration.port |
Set API port | 8090 |
apiConfiguration.healtCheckPath |
Set Health check path | healthcheck |
metricConfiguration.path |
Set metric endpoint path | /metrics |
Monitoring
Kafka Konsumer offers an API that handles exposing several metrics.
Exposed Metrics
Metric Name | Description | Value Type |
---|---|---|
kafka_konsumer_processed_messages_total | Total number of processed messages. | Counter |
kafka_konsumer_processed_batch_messages_total | Total number of processed batch messages. | Counter |
kafka_konsumer_unprocessed_messages_total | Total number of unprocessed messages. | Counter |
kafka_konsumer_unprocessed_batch_messages_total | Total number of unprocessed batch messages. | Counter |