kt - a Kafka tool that likes JSON
Some reasons why you might be interested:
- Consume messages on specific partitions between specific offsets.
- Display topic information (e.g., with partition offset and leader info).
- Modify consumer group offsets (e.g., resetting or manually setting offsets per topic and per partition).
- JSON output for easy consumption with tools like kp or jq.
- JSON input to facilitate automation via tools like jsonify.
- Configure brokers, topic, Kafka version and authentication via environment variables
KT_BROKERS
,KT_TOPIC
,KT_KAFKA_VERSION
andKT_AUTH
. - Fast start up time.
- No buffering of output.
- Binary keys and payloads can be passed and presented in base64 or hex encoding.
- Support for TLS authentication.
- Basic cluster admin functions: Create & delete topics.
I'm not using kt actively myself anymore, so if you think it's lacking some feature - please let me know by creating an issue!
Examples
Read details about topics that match a regex
$ kt topic -filter news -partitions
{
"name": "actor-news",
"partitions": [
{
"id": 0,
"oldest": 0,
"newest": 0
}
]
}
Produce messages
$ echo 'Alice wins Oscar' | kt produce -topic actor-news -literal
{
"count": 1,
"partition": 0,
"startOffset": 0
}
$ echo 'Bob wins Oscar' | kt produce -topic actor-news -literal
{
"count": 1,
"partition": 0,
"startOffset": 0
}
$ for i in {6..9} ; do echo Bourne sequel $i in production. | kt produce -topic actor-news -literal ;done
{
"count": 1,
"partition": 0,
"startOffset": 1
}
{
"count": 1,
"partition": 0,
"startOffset": 2
}
{
"count": 1,
"partition": 0,
"startOffset": 3
}
{
"count": 1,
"partition": 0,
"startOffset": 4
}
Or pass in JSON object to control key, value and partition
$ echo '{"value": "Terminator terminated", "key": "Arni", "partition": 0}' | kt produce -topic actor-news
{
"count": 1,
"partition": 0,
"startOffset": 5
}
Read messages at specific offsets on specific partitions
$ kt consume -topic actor-news -offsets 0=1:2
{
"partition": 0,
"offset": 1,
"key": "",
"value": "Bourne sequel 6 in production.",
"timestamp": "1970-01-01T00:59:59.999+01:00"
}
{
"partition": 0,
"offset": 2,
"key": "",
"value": "Bourne sequel 7 in production.",
"timestamp": "1970-01-01T00:59:59.999+01:00"
}
Follow a topic, starting relative to newest offset
$ kt consume -topic actor-news -offsets all=newest-1:
{
"partition": 0,
"offset": 4,
"key": "",
"value": "Bourne sequel 9 in production.",
"timestamp": "1970-01-01T00:59:59.999+01:00"
}
{
"partition": 0,
"offset": 5,
"key": "Arni",
"value": "Terminator terminated",
"timestamp": "1970-01-01T00:59:59.999+01:00"
}
^Creceived interrupt - shutting down
shutting down partition consumer for partition 0
View offsets for a given consumer group
$ kt group -group enews -topic actor-news -partitions 0
found 1 groups
found 1 topics
{
"name": "enews",
"topic": "actor-news",
"offsets": [
{
"partition": 0,
"offset": 6,
"lag": 0
}
]
}
Change consumer group offset
$ kt group -group enews -topic actor-news -partitions 0 -reset 1
found 1 groups
found 1 topics
{
"name": "enews",
"topic": "actor-news",
"offsets": [
{
"partition": 0,
"offset": 1,
"lag": 5
}
]
}
$ kt group -group enews -topic actor-news -partitions 0
found 1 groups
found 1 topics
{
"name": "enews",
"topic": "actor-news",
"offsets": [
{
"partition": 0,
"offset": 1,
"lag": 5
}
]
}
Create and delete a topic
$ kt admin -createtopic morenews -topicdetail <(jsonify =NumPartitions 1 =ReplicationFactor 1)
$ kt topic -filter news
{
"name": "morenews"
}
$ kt admin -deletetopic morenews
$ kt topic -filter news
Change broker address via environment variable
$ export KT_BROKERS=brokers.kafka:9092
$ kt <command> <option>
Installation
You can download kt via the Releases section.
Alternatively, the usual way via the go tool, for example:
$ go install github.com/fgeller/kt/v14@latest
Or via Homebrew on OSX:
$ brew tap fgeller/tap
$ brew install kt
Docker
@Paxa maintains an image to run kt in a Docker environment - thanks!
For more information: https://github.com/Paxa/kt
Usage:
$ kt -help
kt is a tool for Kafka.
Usage:
kt command [arguments]
The commands are:
consume consume messages.
produce produce messages.
topic topic information.
group consumer group information and modification.
admin basic cluster administration.
Use "kt [command] -help" for for information about the command.
Authentication:
Authentication with Kafka can be configured via a JSON file.
You can set the file name via an "-auth" flag to each command or
set it via the environment variable KT_AUTH.
Authentication / Encryption
Authentication configuration is possibly via a JSON file. You indicate the mode
of authentication you need and provide additional information as required for
your mode. You pass the path to your configuration file via the -auth
flag to
each command individually, or set it via the environment variable KT_AUTH
.
TLS
Required fields:
mode
: This needs to be set toTLS
client-certificate
: Path to your certificateclient-certificate-key
: Path to your certificate keyca-certificate
: Path to your CA certificate
Example for an authorization configuration that is used for the system tests:
{
"mode": "TLS",
"client-certificate": "test-secrets/kt-test.crt",
"client-certificate-key": "test-secrets/kt-test.key",
"ca-certificate": "test-secrets/snakeoil-ca-1.crt"
}
TLS one-way
Required fields:
mode
: This needs to be set toTLS-1way
Example:
{
"mode": "TLS-1way",
}
Other modes
Please create an issue with details for the mode that you need.