• Stars
    star
    2,076
  • Rank 22,254 (Top 0.5 %)
  • Language
    Go
  • License
    MIT License
  • Created over 4 years ago
  • Updated 5 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

A swiss army knife CLI tool for interacting with Kafka, RabbitMQ and other messaging systems.

Brief Demo

Master build status Go Report Card slack

plumber is a CLI devtool for inspecting, piping, messaging and redirecting data in message systems like Kafka, RabbitMQ , GCP PubSub and many more. [1]

The tool enables you to:

  • Safely view the contents of your data streams
  • Write plain or encoded data to any system
  • Route data from one place to another
  • Decode protobuf/avro/thrift/JSON data in real-time
    • Support for both Deep and Shallow protobuf envelope types
    • Support for google.protobuf.Any fields
  • Relay data to the Streamdal platform
  • Ship change data capture events to Streamdal platform
  • Replay events into a message system on your local network
  • And many other features (for a full list: plumber -h)

[1] It's like curl for messaging systems.

Why do you need it?

Messaging systems are black boxes - gaining visibility into what is passing through them is an involved process that requires you to write brittle consumer code that you will eventually throw away.

plumber enables you to stop wasting time writing throw-away code - use it to look into your queues and data streams, use it to connect disparate systems together or use it for debugging your event driven systems.

Demo

Brief Demo

Install

Via brew

$ brew tap streamdal/public
$ brew install plumber

Manually

Plumber is a single binary, to install you simply need to download it, give it executable permissions and call it from your shell. Here's an example set of commands to do this:

$ curl -L -o plumber https://github.com/streamdal/plumber/releases/latest/download/plumber-darwin
$ chmod +x plumber
$ mv plumber /usr/local/bin/plumber

Usage

Write messages

❯ plumber write kafka --topics test --input foo
INFO[0000] Successfully wrote message to topic 'test'    backend=kafka
INFO[0000] Successfully wrote '1' message(s)             pkg=plumber

Read message(s)

❯ plumber read kafka --topics test
INFO[0000] Initializing (could take a minute or two) ...  backend=kafka

------------- [Count: 1 Received at: 2021-11-30T12:51:32-08:00] -------------------

+----------------------+------------------------------------------+
| Key                  |                                     NONE |
| topic                |                                     test |
| Offset               |                                        8 |
| Partition            |                                        0 |
| Header(s)            |                                     NONE |
+----------------------+------------------------------------------+

foo

NOTE: Add -f to perform a continuous read (like tail -f)

Write messages via pipe

Write multiple messages

NOTE: Multiple messages are separated by a newline.

$ cat mydata.txt
line1
line2
line3

$ cat mydata.txt | plumber write kafka --topics foo

INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go
INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go
INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go

Write each element of a JSON array as a message

$ cat mydata.json
[{"key": "value1"},{"key": "value2"}]

$ cat mydata.json | plumber write kafka --topics foo --input-as-json-array

INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go
INFO[0000] Successfully wrote message to topic 'foo'  pkg=kafka/write.go

Documentation

Getting Help

A full list of available flags can be displayed by using the --help flag after different parts of the command:

$ plumber --help
$ plumber read --help
$ plumber read kafka --help

Features

  • Encode & decode for multiple formats
    • Protobuf (Deep and Shallow envelope)
    • Avro
    • Thrift
    • Flatbuffer
    • GZip
    • JSON
    • JSONPB (protobuf serialized as JSON)
    • Base64
  • --continuous support (ie. tail -f)
  • Support for most messaging systems
  • Supports writing via string, file or pipe
  • Observe, relay and archive messaging data
  • Single-binary, zero-config, easy-install

Hmm, what is this Streamdal thing?

We are distributed system enthusiasts that started a company called Streamdal.

Our company focuses on solving data stream observability for complex systems and workflows. Our goal is to allow everyone to build asynchronous systems, without the fear of introducing too much complexity.

While working on our company, we built a tool for reading and writing messages from our messaging systems and realized that there is a serious lack of tooling in this space.

We wanted a swiss army knife type of tool for working with messaging systems (we use Kafka and RabbitMQ internally), so we created plumber.

Why the name plumber?

We consider ourselves "internet plumbers" of sort - so the name seemed to fit :)

Supported Messaging Systems

  • Kafka
  • RabbitMQ
  • RabbitMQ Streams
  • Google Cloud Platform PubSub
  • MQTT
  • Amazon Kinesis Streams
  • Amazon SQS
  • Amazon SNS (Publishing)
  • ActiveMQ (STOMP protocol)
  • Azure Service Bus
  • Azure Event Hub
  • NATS
  • NATS Streaming (Jetstream)
  • Redis-PubSub
  • Redis-Streams
  • Postgres CDC (Change Data Capture)
  • MongoDB CDC (Change Data Capture)
  • Apache Pulsar
  • NSQ
  • KubeMQ
  • Memphis - NEW!

NOTE: If your messaging tech is not supported - submit an issue and we'll do our best to make it happen!

Kafka

You need to ensure that you are using the same consumer group on all plumber instances.

RabbitMQ

Make sure that all instances of plumber are pointed to the same queue.

Note on boolean flags

In order to flip a boolean flag to false, prepend --no to the flag.

ie. --queue-declare is true by default. To make it false, use --no-queue-declare.

Tunnels

plumber can now act as a replay destination (tunnel). Tunnel mode allows you to run an instance of plumber, on your local network, which will then be available in the Streamdal platform as a replay destination.

This mitigates the need make firewall changes to replay messages from a Streamdal collection back to your message bus.

See https://docs.streamdal.com/what-are/what-are-destinations/plumber-as-a-destination for full documentation.

High Performance & High Availability

plumber comes with a "server" mode which will cause plumber to operate as a highly available cluster.

You can read more about "server mode" here.

Server mode examples can be found in docs/server.md

Acknowledgments

Huge shoutout to jhump and for his excellent protoreflect library, without which plumber would not be anywhere near as easy to implement. Thank you!

Release

To push a new plumber release:

  1. git tag v0.18.0 master
  2. git push origin v0.18.0
  3. Watch the github action
  4. New release should be automatically created under https://github.com/streamdal/plumber/releases/
  5. Update release to include any relevant info
  6. Update homebrew SHA and version references

Contribute

We love contributions! Prior to sending us a PR, open an issue to discuss what you intend to work on. When ready to open PR - add good tests and let's get this thing merged! For further guidance check out our contributing guide.

More Repositories

1

streamdal

Code-Native Data Privacy
Rust
572
star
2

rabbit

RabbitMQ wrapper for steadway/amqp that supports auto-reconnects
Go
45
star
3

go-template

Golang microservice template
Go
16
star
4

gophercon2021

Go
9
star
5

server

Go
9
star
6

natty

Batteries included NATS Jetstream Go client
Go
9
star
7

go-sdk

Go
8
star
8

console

UI for Streamdal Server
TypeScript
8
star
9

schema-publisher

Go
8
star
10

streamdal-examples

Demo SDK's
TypeScript
7
star
11

prisma-extension-streamdal

Add code native data pipelines to any prisma query operation
TypeScript
7
star
12

plumber-schemas

Protobuf schemas used by plumber and plumber-ui.
JavaScript
6
star
13

node-sdk

Node data quality library
TypeScript
5
star
14

kng

High throughput oriented kafka wrapper lib
Go
5
star
15

protos

Streamdal's Protobuf definitions and generated code
Rust
5
star
16

kafka-sink-connector

Sink connector that pulls records off of Kakfa topics and funnels them to our collector
Java
4
star
17

python-sdk

Python SDK
Python
4
star
18

wasm

Rust
3
star
19

njst

Distributed benchmark tool for NATS Jetstream
Go
3
star
20

docs

Streamdal Documentation
MDX
3
star
21

cli

Go
3
star
22

terraform-provider-batchsh

Terraform Provider for Batch.sh
Go
2
star
23

casbin-nats-kv-adapter

Go
1
star
24

wasm-detective

Rust
1
star
25

wasm-transform

Rust
1
star
26

kafka-demo

Accompanies youtube video
Python
1
star
27

thrifty

Library for decoding thrift wire format from binary to JSON using IDL definition
Go
1
star
28

terraform-provider-streamdal

Terraform provider for the Streamdal ecosystem
Go
1
star
29

homebrew-tap

Python
1
star
30

plumber-server-client

Demo app showing how to use plumber-schemas protobuf definitions to talk to a Plumber server instance
Go
1
star