• Stars
    star
    435
  • Rank 100,085 (Top 2 %)
  • Language
    Go
  • License
    MIT License
  • Created over 9 years ago
  • Updated over 6 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Connect UNIX pipes and message queues

pipecat Build Status MIT licensed

pipecat

Pipecat allows you to scale any program supporting the FACK contract using traditional UNIX pipes and AMQP. Think of it as netcat but with message acknowledgments. It is the successor of redis-pipe.

# Publish sequence of numbers to a job queue.
seq 1 1000 | pipecat publish numbers

# Multiply each number with 10 and store results in a different queue.
pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results

# Aggregate the results and calculate the sum
pipecat consume results --autoack --non-blocking \
  | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

If you are into streams and UNIX pipes checkout my Haskell based awk and sed alternative

Support

Pipecat supports a local mode and all AMQP 0.9.1 message brokers.

Install

You can download a single binary for Linux, OSX or Windows.

OSX

wget -O pipecat https://github.com/lukasmartinelli/pipecat/releases/download/v0.3/pipecat_darwin_amd64
chmod +x pipecat

./pipecat --help

Linux

wget -O pipecat https://github.com/lukasmartinelli/pipecat/releases/download/v0.3/pipecat_linux_amd64
chmod +x pipecat

./pipecat --help

Install from Source

go get github.com/lukasmartinelli/pipecat

If you are using Windows or 32-bit architectures you need to download the appropriate binary yourself.

Using pipecat

pipecat connects message queues and UNIX pipes. The need arose when I started building messaging support into utilities in order to make them scalable but still wanted to leave my programs the way they are without heavy dependencies and still be able to scale the process reliably.

In this example we will calculate the sum of a sequence of numbers.

Connect the broker

Specify the AMQP_URI env var to connect to the message broker.

export AMQP_URI=amqp://user:pass@host:5672/vhost

Create the queue

Let's create a new queue numbers and publish a sequence of numbers from 1 to 1000.

seq 1 1000 | pipecat publish numbers

Process input

Multiply the input sequence with factor 10 and publish the results to an additional results queue. This step can be run on multiple hosts. We want to acknowledge all received messages automatically with --autoack.

pipecat consume numbers --autoack | xargs -n 1 expr 10 '*' | pipecat publish results

Aggregate results

Now let's sum up all the numbers. Because we want to end after receiving all numbers we specify the --non-blocking mode which will close the connection if no messages have been received after a timeout.

pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

Local RabbitMQ with Docker

If you do not have an existing AMQP broker at hand you can run RabbitMQ in a docker container, expose the ports and connect to it.

docker run -d -p 5672:5672 --hostname pipecat-rabbit --name pipecat-rabbit rabbitmq:3

Now connect to localhost with the default guest login.

export AMQP_URI=amqp://guest:guest@localhost:5672/

Publish messages to Exchange

If you are using existing message queue infrastructure you can also publish messages to an exchange, with the first parameter used as the routing key. Thanks to @kennon for the implementation.

seq 1 1000 | pipecat publish --exchange "my_exchange" --no-create-queue my.routing.key

The AMQP_EXCHANGE environment variable can also be used:

export AMQP_EXCHANGE=my_exchange

Make it failsafe

We already have written a small, concise and very scalable set of programs. We can now run the multiply.py step on many servers.

However, if the server dies while multiply.py is running the input lines already processed are lost.

If your program needs that ability you need to implement the FACK contract, demonstrated for the multiply.py sample.

FACK Contract

Any program that accepts output from stdin and writes to stdout should accept an environment variable FACK containing a file descriptor. If a single operation performed on a line from stdin was successful , that line should be written to FACK.

FACK contract Flow

Implement the contract

Implementing the contract is straightforward.

  1. Support the optional FACK environment variable containing a file name
  2. Write the received input into this file handle if we performed the operation successfully on it

Python Example

Below is a Python example multiply.py which multiplies the sequence of numbers as above but writes the input line to stdack if successfully processed.

import sys
import os

with open(os.getenv('FACK', os.devnull), 'w') as stdack: # Works even if FACK is not set
    for line in sys.stdin:
        num = int(line.strip())
        result = num * 10
        sys.stdout.write('{}\n'.format(result))
        stdack.write(line) # Ack the processed line
        stdack.flush() # Make sure line does not get lost in the buffer

Use named queues for ACKs

Now your program can no longer lose messages with pipecat because you can feed the FACK output back into pipecat using named pipes which will only then acknowledge the messages from the message queue.

Pipecat Flow Diagram

Fill the queue again.

seq 1 1000 | pipecat publish numbers

And use a named pipe to funnel the acknowledged input lines back into pipecat.

mkfifo ack
cat ack | pipecat consume numbers \
| FACK=ack python -u multiply.py \
| pipecat publish results
rm ack

Consume all messages to reduce a result. In the reduce operation we need to autoack all received messages because we can't possibly hold the entire result set in memory until the operation has performed.

pipecat consume results --autoack --non-blocking | python -cu 'import sys; print(sum(map(int, sys.stdin)))'

With a few lines additional code only depending on the standard library you can now make any program in any language scalable using message queues. Without any dependencies and without changing the behavior bit.

Usage Examples

Create local Queue Backup

pipecat consume results --autoack --non-blocking > results_backup.json
cat results_backup.json | pipecat publish results

Cross Compile Release

We use gox to create distributable binaries for Windows, OSX and Linux.

docker run --rm -v "$(pwd)":/usr/src/pipecat -w /usr/src/pipecat tcnksm/gox:1.4.2-light

More Repositories

1

pgfutter

Import CSV and JSON into PostgreSQL the easy way
Go
1,311
star
2

py14

Python to C++ 14 transpiler
C++
573
star
3

pgclimb

Export data from PostgreSQL into different data formats
Go
386
star
4

nigit

Web server that wraps around programs and shell scripts and exposes them as API
Go
383
star
5

redis-pipe

Treat Redis Lists like Unix Pipes
Go
283
star
6

postgis-editor

An accessible PostGIS query editor and visualizer.
JavaScript
192
star
7

mapbox-gl-inspect

Inspection plugin for Mapbox GL JS
JavaScript
142
star
8

naturalearthtiles

Natural Earth vector tiles (MVT) and raster tiles free and ready to use.
PLpgSQL
89
star
9

hwk

A Haskell based awk and sed alternative
Haskell
67
star
10

osm-noise-pollution

Approximate global noise pollution with OSM data and very simple noise model
Shell
66
star
11

php-dos-attack

Exploit json_decode vulnerability of PHP
PHP
35
star
12

osm-activity

Show global OpenStreetMap activity on a map
JavaScript
30
star
13

swissdem

Digital Elevation Model for Switzerland from SRTM (1 arc second / 25m) as download
Shell
17
star
14

osm-lakelines

Calculate nice centered linestrings for labelling OpenStreetMap lakes
Shell
15
star
15

px-to-csv

Convert PC-Axis files to CSV
JavaScript
14
star
16

mbtoolbox

MBTiles tools for optimizing and verifying MBTiles files
Python
13
star
17

push-it

Plays an encouraging sound when you do a git push
Shell
13
star
18

ghrr

Create realtime apps on top of GitHub
JavaScript
12
star
19

sharpen

Solve algorithmic Python challenges to sharpen the tools.
Python
11
star
20

detectivegit

Detective git takes a look at your repo and shows the hotspots and possible bugs.
JavaScript
9
star
21

osm-qa-filter

Extract GeoJSON features from OSM QA tiles
JavaScript
9
star
22

osm-simple-features

Defines an opinionated mapping from OSM to simple GeoJSON features with multiple layers and defined schemas.
JavaScript
7
star
23

swissnames

Curated extracts from the free swissNAMES3D data set from swisstopo.
Shell
5
star
24

delptr

Informs the world about people who still use naked pointers in C++
JavaScript
5
star
25

location-history-to-geojson

Turn your Google Location History into a GeoJSON feature collection
JavaScript
3
star
26

lukasmartinelli.github.io

Personal blog and portfolio.
HTML
2
star
27

battle-of-britain-map

A Mapbox GL map showcasing the air battle of Britain
CSS
2
star
28

biketour

Tracking my bike tour from Switzerland to Greece
HTML
1
star