• Stars
    star
    101
  • Rank 327,129 (Top 7 %)
  • Language
    C
  • License
    Mozilla Public Li...
  • Created over 5 years ago
  • Updated over 3 years ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Dafka is a decentralized distributed streaming platform

GitHub release license

Dafka - Decentralized Distributed Streaming Platform

Build Status

Contents

Overview

Design

Implementation

Ownership and License

Using Dafka

Contributing

Overview

Scope and Goals

Dafka is a decentralize distributed streaming platform. What exactly does that mean?

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

Dafka is generally used for two broad classes of applications:

  • Building real-time streaming data pipelines that reliably get data between systems or applications
  • Building real-time streaming applications that transform or react to the streams of data

To understand how Dafka does these things, let's dive in and explore Dafka's capabilities from the bottom up.

First a few concepts:

  • Dafka is run as a cluster on one or more servers.
  • The Dafka cluster stores streams of records in categories called topics.
  • Each record consists of a arbitrary value.
  • Producers send record to the Cluster and directly to the Consumers.
  • Missed records are obtained either from the Producer or the Cluster.

In Dafka the communication between clients is done with a simple, high-performance, language and transport agnostic protocol. This protocol is versioned and maintains backwards compatibility with older version. We provide a C and Java client for Dafka.

Topics and Partitions

Dafka provides an abstraction for records called topic.

A topic is a name to which records are published. Topics in Dafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the records written to it.

Each Dafka topic consists of at least one partitions that looks like this:

1

Each partition is an ordered, immutable sequence of records that is continually appended to. The records in the partitions are each assigned a sequential id number called the offset that uniquely identifies each record within the partition.

The Dafka cluster durably persists all published records — whether or not they have been consumed.

2

Consumers maintain their own offset while reading records of a partition. In fact neither the Dafka Cluster nor the producers keep track of the consumers offset. This design allows Consumer to either reset their offset to an older offset and re-read records or set their offset to a newer offset and skip ahead.

In that way consumer have no influence on the cluster, the producer and other consumers. They simply can come and go as they please.

Stores

Partitions are distributed to the Dafka Cluster which consists of Dafka Stores. Each partition is replicated to each store for fault tolerance.

Producer

Producers publish records to a topic. Each producer creates its own partition that only it publishes to. Records are send directly to stores and consumers. When a producer goes offline its partition is still available to consumers from the Dafka stores.

Consumer

Consumers subscribe to a topic. Each consumer will receive records published to that topic from all partitions.

Tower

Each Dafka cluster has one or more towers. The towers are used to connect producer, consumers and stores to each other. At this point no traffic is proxied through the towers.

Guarantees

Dafka gives the following guarantees:

  • Records sent by a producer are appended in the stores in the same order they are sent.
  • Consumers will provide records of a partition to the user in the same order they are sent by the producer.

Design

We designed Dafka to be a drop-in replacement for Apache Kafka.

While Kafka makes it easy for consumers to come and go as they like, their consumer group feature which relies on finding consensus in a group of peers makes joining very expensive. It can take seconds before a consumer is ready to consume records. The same is true for producer. Dafka tries to avoid finding consensus and therefore intentionally avoids features like consumer groups in favor of higher throughput, lower latency as well as faster consumer and producer initialization.

This design section discusses the different message types of the Dafka protocol.

Producing and Storing

Producers published records using the RECORD message type. RECORD messages are send directly to all connected stores as well as all connected consumers. Once a producer published its first records it starts sending HEAD messages at a regular interval informing both stores and consumer about the last published records which gives stores and consumers a chance to figure out whether or not the missed one or more records.

3

Because producers publish records directly to consumers the presence of a store is not necessarily required. When a new consumer joins they can request the producers to supply all already published records. Therefore the producer must store all published records that are not stored by a configurable minimum number stores. To inform a producer about the successful storing of a records the stores send a ACK message to the producer.

4

Subscribing to topics

Consumer will only start listening for HEAD message once they subscribed for a topic. Whenever a new subscription created by a consumer it is not enough to listen to producers HEAD messages to catch up upon the current offset of their partition. For one there's a time penalty until producers HEAD intervals triggered and more severe a producer may already have disappeared. Hence consumers will send a GET-HEADS message to the stores to request the offset for each partition they stored for a topic.

5

As a response each stores will answer with DIRECT-HEAD messages each containing the offset for a partition.

6

Missed records

Consumer can discover missed records by either receiving HEAD messages or receiving a RECORD messages with a higher offset than they currently have for a certain partition. In order to fetch missed messages consumers send a FETCH message to all connected stores and the producer of that message to request the missed messages.

7

As a response to a FETCH message a store and/or producer may send all missed records that the consumer requested directly to the consumer with the DIRECT-RECORD.

8

Implementation

The implementation is documented in RFC 46/DAFKA

Ownership and License

The contributors are listed in AUTHORS. This project uses the MPL v2 license, see LICENSE.

Dafka uses the C4.1 (Collective Code Construction Contract) process for contributions.

Dafka uses the CLASS (C Language Style for Scalability) guide for code style.

To report an issue, use the Dafka issue tracker at github.com.

Using Dafka

Building and Installing on Linux and macOS

To start with, you need at least these packages:

  • git-all -- git is how we share code with other people.
  • build-essential, libtool, pkg-config - the C compiler and related tools.
  • autotools-dev, autoconf, automake - the GNU autoconf makefile generators.
  • cmake - the CMake makefile generators (an alternative to autoconf).

Plus some others:

  • uuid-dev, libpcre3-dev - utility libraries.
  • valgrind - a useful tool for checking your code.
  • pkg-config - an optional useful tool to make building with dependencies easier.

Which we install like this (using the Debian-style apt-get package manager):

sudo apt-get update
sudo apt-get install -y \
    git-all build-essential libtool \
    pkg-config autotools-dev autoconf automake cmake \
    uuid-dev libpcre3-dev valgrind

# only execute this next line if interested in updating the man pages as well (adds to build time):
sudo apt-get install -y asciidoc

Here's how to build DAFKA from GitHub (building from packages is very similar, you don't clone a repo but unpack a tarball), including the libzmq (ZeroMQ core) library (NOTE: skip ldconfig on OSX):

git clone git://github.com/zeromq/libzmq.git
cd libzmq
./autogen.sh
# do not specify "--with-libsodium" if you prefer to use internal tweetnacl security implementation (recommended for development)
./configure --with-libsodium
make check
sudo make install
sudo ldconfig
cd ..

git clone git://github.com/zeromq/czmq.git
cd czmq
./autogen.sh && ./configure && make check
sudo make install
sudo ldconfig
cd ..

git clone git://github.com/zeromq/dafka.git
cd dafka
./autogen.sh && ./configure && make check
sudo make install
sudo ldconfig
cd ..

To verify everything got installed correctly run:

make check

Quickstart

If you are interested in getting started with Dafka follow the instructions below.

Step 1: Start the Dafka Tower Deamon

We'll start to use dafka in a simple single producer/single consumer scenario using the dafka_console_producer and dafka_console_consumer commandline utilities.

$ dafka_towerd

Note: The tower will open two sockets. One on port 5556 to get notified by joining peers and one on port 5557 to notify joined peers about joining peers.

Step 2: Write some events into a topic

Run the console producer to write a few events into the hello topic. Each line you enter will result in a separate event being written to the topic.

$ dafka_console_producer hello
A first event
A second event

You can stop the producer client with Ctrl-C at any time.

Note: If no configuration is provided the producer tries to connect to a tower on localhost.

Step 3: Read the events

Open another terminal session and run the console consumer to read the events you just created:

$ dafka_console_consumer hello

You can stop the consumer client with Ctrl-C at any time.

Because events are kept by the producer until at least one store acknowledges they're stored, they can be read as many times and by as many consumers as you want. You can easily verify this by opening yet another terminal session and re-running the previous command again.

Note: If no configuration is provided the consumer tries to connect to a tower on localhost.

Getting started

The following getting started will show you how to use the producer and consumer API.

First we construct a dafka producer with default configuration and then publish the message HELLO WORLD to the topic hello.

zconfig_t *config = zconfig_new ("root", NULL);
const char *topic = "hello";

dafka_producer_args_t producer_args =  { topic, config };
zactor_t *producer = zactor_new (dafka_producer, &producer_args);

dafka_producer_msg_t *msg = dafka_producer_msg_new ();
dafka_producer_msg_set_content (msg, "HELLO WORLD");
dafka_producer_msg_send (msg, producer);

dafka_producer_msg_destroy (&msg);
zactor_destroy (&producer);
zconfig_destroy (&config);

To consume this message we constuct a dafka consumer, let it subscribe to topic hello, receive the message and then print the content of received message.

zconfig_t *config = zconfig_new ("root", NULL);
const char *topic = "hello";

dafka_consumer_args_t args = { .config = config };
dafka_consumer_t *consumer = dafka_consumer_new (&args);
dafka_consumer_subscribe (consumer, topic);

dafka_consumer_msg_t *msg = dafka_consumer_msg_new ();
while (true) {
    rc = dafka_consumer_msg_recv (msg, consumer);
    if (rc == -1)
        break;      // Interrupted

    char *content_str = dafka_consumer_msg_strdup (msg);
    printf ("%s\n", content_str);
    zstr_free (&content_str);
}

dafka_consumer_msg_destroy (&msg);
dafka_consumer_destroy (&consumer);
zconfig_destroy (&config);

Linking with an Application

Include dafka.h in your application and link with libdafka. Here is a typical gcc link command:

gcc myapp.c -o myapp -ldafka -lczmq -lzmq

API v1 Summary

This is the API provided by Dafka v1.x, in alphabetical order.

dafka_consumer - Implements the dafka consumer protocol

dafka_consumer - Consumes message either directly from producers or from stores

TODO:

  • Prioritize DIRECT_RECORD messages over RECORD this will avoid discarding MSGs when catching up

This is the class interface:

    //  This is a stable class, and may not change except for emergencies. It
    //  is provided in stable builds.
    //  Creates a new dafka consumer client that runs in its own background thread.
    //
    //  The args parameter consists of configuration and record sink.
    //
    //  If a record sink is provided this socket will be used the send the consumer
    //  messages to.
    //
    //  The configuration argument takes settings for both the consumer and the
    //  beacon, see below.
    //
    //  Consumer configuration:
    //    * consumer/offset/reset = earliest|latest (default: latest)
    //    * consumer/high_watermark (default: 1.000.000)
    //    * consumer/verbose = 0|1 (default: 0 -> false)
    //
    //  Beacon configuration:
    //    * beacon/interval (default: 1000) in ms
    //    * beacon/verbose = 0|1 (default: 0 -> false)
    //    * beacon/sub_address (default: tcp://127.0.0.1:5556)
    //    * beacon/pub_address (default: tcp://127.0.0.1:5557)
    DAFKA_EXPORT dafka_consumer_t *
        dafka_consumer_new (dafka_consumer_args_t *args);
    
    //  Destroys an instance of dafka consumer client by gracefully stopping its
    //  background thread.
    DAFKA_EXPORT void
        dafka_consumer_destroy (dafka_consumer_t **self_p);
    
    //  Subscribe to a given topic.
    DAFKA_EXPORT int
        dafka_consumer_subscribe (dafka_consumer_t *self, const char *subject);
    
    //  Unsubscribe from a topic currently subscribed to.
    DAFKA_EXPORT int
        dafka_consumer_unsubscribe (dafka_consumer_t *self, const char *subject);
    
    //  Returns the address of the consumer instance.
    DAFKA_EXPORT const char *
        dafka_consumer_address (dafka_consumer_t *self);
    
    //  Get the current subscription as list of strings.
    DAFKA_EXPORT zlist_t *
        dafka_consumer_subscription (dafka_consumer_t *self);
    
    //  Returns the internal record source socket.
    DAFKA_EXPORT zsock_t *
        dafka_consumer_record_source (dafka_consumer_t *self);
    
    //  Self test of this class.
    DAFKA_EXPORT void
        dafka_consumer_test (bool verbose);
    

Please add '@interface' section in './../src/dafka_consumer.c'.

This is the class self test code:

    zconfig_t *config = zconfig_new ("root", NULL);
    zconfig_put (config, "test/verbose", verbose ? "1" : "0");
    zconfig_put (config, "beacon/interval", "50");
    zconfig_put (config, "beacon/verbose", verbose ? "1" : "0");
    zconfig_put (config, "beacon/sub_address", "inproc://consumer-tower-sub");
    zconfig_put (config, "beacon/pub_address", "inproc://consumer-tower-pub");
    zconfig_put (config, "tower/verbose", verbose ? "1" : "0");
    zconfig_put (config, "tower/sub_address", "inproc://consumer-tower-sub");
    zconfig_put (config, "tower/pub_address", "inproc://consumer-tower-pub");
    zconfig_put (config, "consumer/verbose", verbose ? "1" : "0");
    zconfig_put (config, "producer/verbose", verbose ? "1" : "0");
    zconfig_put (config, "store/verbose", verbose ? "1" : "0");
    zconfig_put (config, "store/db", SELFTEST_DIR_RW "/storedb");
    
    zactor_t *tower = zactor_new (dafka_tower_actor, config);
    
    // --------------
    // Protocol Tests
    // --------------
    
    // Scenario: STORE-HELLO -> CONSUMER-HELLO without subscription
    //   Given a dafka consumer with no subscriptions
    //   When a STORE-HELLO command is sent by a store
    //   Then the consumer responds with CONSUMER-HELLO and 0 topics
    zconfig_put (config, "consumer/offset/reset", "earliest");
    
    zactor_t *test_peer = zactor_new (dafka_test_peer, config);
    assert (test_peer);
    
    //  GIVEN a dafka consumer with no subscription
    dafka_consumer_args_t consumer_args = { .config = config };
    dafka_consumer_t *consumer = dafka_consumer_new (&consumer_args);
    assert (consumer);
    zclock_sleep (250); // Make sure both peers are connected to each other
    
    zlist_t *subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 0);
    
    //  WHEN a STORE-HELLO command is send by a store
    dafka_test_peer_send_store_hello (test_peer, dafka_consumer_address (consumer));
    
    // THEN the consumer responds with CONSUMER-HELLO and 0 topics
    dafka_proto_t *msg = dafka_test_peer_recv (test_peer);
    assert_consumer_hello_msg (msg, 0);
    
    dafka_consumer_destroy (&consumer);
    zactor_destroy (&test_peer);
    
    // Scenario: STORE-HELLO -> CONSUMER-HELLO with subscription
    //   Given a dafka consumer with a subscription to topic "hello"
    //   When a STORE-HELLO command is sent by a store
    //   Then the consumer responds with CONSUMER-HELLO and 1 topic
    zconfig_put (config, "consumer/offset/reset", "earliest");
    
    test_peer = zactor_new (dafka_test_peer, config);
    assert (test_peer);
    
    //  GIVEN a dafka consumer with a subscription to topic "hello"
    consumer_args.record_sink = NULL;
    consumer = dafka_consumer_new (&consumer_args);
    assert (consumer);
    zclock_sleep (250); // Make sure both peers are connected to each other
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 0);
    
    t_subscribe_to_topic (consumer, "hello", test_peer, config);
    zclock_sleep (250);
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 1);
    
    //  WHEN a STORE-HELLO command is send by a store
    dafka_test_peer_send_store_hello (test_peer, dafka_consumer_address (consumer));
    
    // THEN the consumer responds with CONSUMER-HELLO and 1 topic
    msg = dafka_test_peer_recv (test_peer);
    assert_consumer_hello_msg (msg, 1);
    
    dafka_consumer_destroy (&consumer);
    zactor_destroy (&test_peer);
    
    // Scenario: First record for topic with offset reset earliest
    //   Given a dafka consumer subscribed to topic 'hello'
    //   When a RECORD message with sequence larger 0 is sent on topic 'hello'
    //   Then the consumer will send a FETCH message for the topic 'hello'
    //   When a RECORD message with sequence 0 and content 'CONTENT' is send on topic 'hello'
    //   Then a consumer_msg is sent to the user with topic 'hello' and content 'CONTENT'
    zconfig_put (config, "consumer/offset/reset", "earliest");
    
    test_peer = zactor_new (dafka_test_peer, config);
    assert (test_peer);
    
    //  GIVEN a dafka consumer subscribed to topic 'hello'
    consumer = dafka_consumer_new (&consumer_args);
    assert (consumer);
    zclock_sleep (250); //  Make sure both peers are connected to each other
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 0);
    
    t_subscribe_to_topic (consumer, "hello", test_peer, config);
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 1);
    
    //  WHEN a RECORD msg with sequence larger 0 is sent on topic 'hello'
    dafka_test_peer_send_record (test_peer, "hello", 1, "CONTENT");
    
    // THEN the consumer will send a FETCH msg for the topic 'hello'
    msg = dafka_test_peer_recv (test_peer);
    assert_fetch_msg (msg, "hello", 0);
    
    //  WHEN a RECORD msg with sequence 0 and content 'CONTENT' is send on topic'hello'
    dafka_test_peer_send_record (test_peer, "hello", 0, "CONTENT");
    
    //  THEN a consumer msg is sent to the user with topic 'hello' and content CONTENT'
    dafka_consumer_msg_t *c_msg = dafka_consumer_msg_new ();
    dafka_consumer_msg_recv (c_msg, consumer);
    assert_consumer_msg (c_msg, "hello", "CONTENT");
    
    dafka_consumer_msg_destroy (&c_msg);
    dafka_consumer_destroy (&consumer);
    zactor_destroy (&test_peer);
    
    
    // Scenario: First record for topic with offset reset latest
    //   Given a dafka consumer subscribed to topic 'hello'
    //   When a RECORD message with sequence 2 is sent on topic 'hello'
    //   Then a consumer_msg is sent to the user with topic 'hello' and content 'CONTENT'
    zconfig_put (config, "consumer/offset/reset", "latest");
    
    test_peer = zactor_new (dafka_test_peer, config);
    assert (test_peer);
    
    //  GIVEN a dafka consumer subscribed to topic 'hello'
    consumer = dafka_consumer_new (&consumer_args);
    assert (consumer);
    zclock_sleep (250); //  Make sure both peers are connected to each other
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 0);
    
    t_subscribe_to_topic (consumer, "hello", test_peer, config);
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 1);
    
    zclock_sleep (250); //  Wait until subscription is active
    
    //  WHEN a RECORD msg with sequence 2 is sent on topic 'hello'
    dafka_test_peer_send_record (test_peer, "hello", 2, "CONTENT");
    
    //  THEN a consumer msg is sent to the user with topic 'hello' and content'CONTENT'
    c_msg = dafka_consumer_msg_new ();
    dafka_consumer_msg_recv (c_msg, consumer);
    assert_consumer_msg (c_msg, "hello", "CONTENT");
    
    dafka_consumer_msg_destroy (&c_msg);
    dafka_consumer_destroy (&consumer);
    zactor_destroy (&test_peer);
    
    // ---------
    // API Tests
    // ---------
    
    // Test with Producer + Store and 'consumer.offset.reset = earliest'
    // ------------------------------------------------------------------
    zconfig_put (config, "consumer/offset/reset", "earliest");
    
    dafka_producer_args_t pub_args = {"hello", config};
    zactor_t *producer = zactor_new (dafka_producer, &pub_args);
    assert (producer);
    
    zactor_t *store = zactor_new (dafka_store_actor, config);
    assert (store);
    
    consumer = dafka_consumer_new (&consumer_args);
    assert (consumer);
    zclock_sleep (250);
    
    dafka_producer_msg_t *p_msg = dafka_producer_msg_new ();
    dafka_producer_msg_set_content_str (p_msg, "HELLO MATE");
    int rc = dafka_producer_msg_send (p_msg, producer);
    assert (rc == 0);
    zclock_sleep (100);  // Make sure message is published before consumer subscribes
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 0);
    
    rc = dafka_consumer_subscribe (consumer, "hello");
    assert (rc == 0);
    zclock_sleep (250);  // Make sure subscription is active before sending the next message
    
    subscription = dafka_consumer_subscription (consumer);
    assert (zlist_size (subscription) == 1);
    
    // This message is discarded but triggers a FETCH from the store
    dafka_producer_msg_set_content_str (p_msg, "HELLO ATEM");
    rc = dafka_producer_msg_send (p_msg, producer);
    assert (rc == 0);
    
    // Make sure the first two messages have been received from the store and the consumer is now up to date
    zclock_sleep (100);
    
    dafka_producer_msg_set_content_str (p_msg, "HELLO TEMA");
    rc = dafka_producer_msg_send (p_msg, producer);
    assert (rc == 0);
    
    // Receive the first message from the STORE
    c_msg = dafka_consumer_msg_new ();
    dafka_consumer_msg_recv (c_msg, consumer);
    assert_consumer_msg (c_msg, "hello", "HELLO MATE");
    
    // Receive the second message from the STORE as the original has been discarded
    dafka_consumer_msg_recv (c_msg, consumer);
    assert_consumer_msg (c_msg, "hello", "HELLO ATEM");
    
    // Receive the third message from the PUBLISHER
    dafka_consumer_msg_recv (c_msg, consumer);
    assert_consumer_msg (c_msg, "hello", "HELLO TEMA");
    
    rc = dafka_consumer_unsubscribe (consumer, "hello");
    assert (rc == 0);
    
    dafka_producer_msg_destroy (&p_msg);
    dafka_consumer_msg_destroy (&c_msg);
    zactor_destroy (&producer);
    zactor_destroy (&store);
    dafka_consumer_destroy (&consumer);
    
    // Test with Producer + Store and consumer.offset.reset = latest
    // --------------------------------------------------------------
    zconfig_put (config, "consumer/offset/reset", "latest");
    
    producer = zactor_new (dafka_producer, &pub_args);
    assert (producer);
    
    consumer = dafka_consumer_new (&consumer_args);
    assert (consumer);
    zclock_sleep (250);
    
    //  This message is missed by the consumer and later ignored because the
    //  offset reset is set to latest.
    p_msg = dafka_producer_msg_new ();
    dafka_producer_msg_set_content_str (p_msg, "HELLO MATE");
    rc = dafka_producer_msg_send (p_msg, producer);
    assert (rc == 0);
    zclock_sleep (100);  // Make sure message is published before consumer subscribes
    
    rc = dafka_consumer_subscribe (consumer, "hello");
    assert (rc == 0);
    zclock_sleep (250);  // Make sure subscription is active before sending the next message
    
    dafka_producer_msg_set_content_str (p_msg, "HELLO ATEM");
    rc = dafka_producer_msg_send (p_msg, producer);
    assert (rc == 0);
    
    // Receive the second message from the PRODUCER
    c_msg = dafka_consumer_msg_new ();
    dafka_consumer_msg_recv (c_msg, consumer);
    assert_consumer_msg (c_msg, "hello", "HELLO ATEM");
    
    // We have to create a store in-order to ack all publisher messages and allow the publisher to terminate
    store = zactor_new (dafka_store_actor, config);
    assert (store);
    
    
    dafka_producer_msg_destroy (&p_msg);
    dafka_consumer_msg_destroy (&c_msg);
    zactor_destroy (&tower);
    zactor_destroy (&producer);
    zactor_destroy (&store);
    dafka_consumer_destroy (&consumer);
    zconfig_destroy (&config);

dafka_producer - Implements the dafka producer protocol

dafka_producer - Publishes messages to one partion of one topic

Please add '@discuss' section in './../src/dafka_producer.c'.

This is the class interface:

    //  This is a stable class, and may not change except for emergencies. It
    //  is provided in stable builds.
    //
    DAFKA_EXPORT void
        dafka_producer (zsock_t *pipe, void *args);
    
    //
    DAFKA_EXPORT const char *
        dafka_producer_address (zactor_t *self);
    
    //  Self test of this class.
    DAFKA_EXPORT void
        dafka_producer_test (bool verbose);
    

Please add '@interface' section in './../src/dafka_producer.c'.

This is the class self test code:

    //  Simple create/destroy test
    zconfig_t *config = zconfig_new ("root", NULL);
    zconfig_put (config, "beacon/verbose", verbose ? "1" : "0");
    zconfig_put (config, "beacon/sub_address","inproc://producer-tower-sub");
    zconfig_put (config, "beacon/pub_address","inproc://producer-tower-pub");
    zconfig_put (config, "tower/verbose", verbose ? "1" : "0");
    zconfig_put (config, "tower/sub_address","inproc://producer-tower-sub");
    zconfig_put (config, "tower/pub_address","inproc://producer-tower-pub");
    zconfig_put (config, "producer/verbose", verbose ? "1" : "0");
    zconfig_put (config, "store/verbose", verbose ? "1" : "0");
    zconfig_put (config, "store/db", SELFTEST_DIR_RW "/storedb");
    
    zactor_t *tower = zactor_new (dafka_tower_actor, config);
    
    dafka_producer_args_t args = {"dummy", config};
    zactor_t *producer = zactor_new (dafka_producer, &args);
    assert (producer);
    
    zactor_destroy (&producer);
    zactor_destroy (&tower);
    zconfig_destroy (&config);

dafka_store - no title found

dafka_store -

Please add '@discuss' section in './../src/dafka_store.c'.

This is the class interface:

    //  Create new dafka_store actor instance.
    //  @TODO: Describe the purpose of this actor!
    //
    //      zactor_t *dafka_store = zactor_new (dafka_store, NULL);
    //
    //  Destroy dafka_store instance.
    //
    //      zactor_destroy (&dafka_store);
    //
    //  Start dafka_store actor.
    //
    //      zstr_sendx (dafka_store, "START", NULL);
    //
    //  Stop dafka_store actor.
    //
    //      zstr_sendx (dafka_store, "STOP", NULL);
    //
    //  This is the dafka_store constructor as a zactor_fn;
    DAFKA_EXPORT void
        dafka_store_actor (zsock_t *pipe, void *args);
    
    //  Self test of this actor
    DAFKA_EXPORT void
        dafka_store_test (bool verbose);

Please add '@interface' section in './../src/dafka_store.c'.

This is the class self test code:

    // ----------------------------------------------------
    //  Cleanup old test artifacts
    // ----------------------------------------------------
    if (zsys_file_exists (SELFTEST_DIR_RW "/storedb")) {
        zdir_t *store_dir = zdir_new (SELFTEST_DIR_RW "/storedb", NULL);
        zdir_remove (store_dir, true);
        zdir_destroy (&store_dir);
    }
    
    //  Simple create/destroy test
    zconfig_t *config = zconfig_new ("root", NULL);
    zconfig_put (config, "beacon/verbose", verbose ? "1" : "0");
    zconfig_put (config, "beacon/sub_address", "inproc://store-tower-sub");
    zconfig_put (config, "beacon/pub_address", "inproc://store-tower-pub");
    zconfig_put (config, "tower/verbose", verbose ? "1" : "0");
    zconfig_put (config, "tower/sub_address", "inproc://store-tower-sub");
    zconfig_put (config, "tower/pub_address", "inproc://store-tower-pub");
    zconfig_put (config, "store/verbose", verbose ? "1" : "0");
    zconfig_put (config, "consumer/verbose", verbose ? "1" : "0");
    zconfig_put (config, "producer/verbose", verbose ? "1" : "0");
    zconfig_put (config, "store/db", SELFTEST_DIR_RW "/storedb");
    zconfig_put (config, "consumer/offset/reset", "earliest");
    
    // Creating the store
    zactor_t *tower = zactor_new (dafka_tower_actor, config);
    
    // Creating the publisher
    dafka_producer_args_t args = {"TEST", config};
    zactor_t *producer = zactor_new (dafka_producer, &args);
    
    // Producing before the store is alive, in order to test fetching between producer and store
    dafka_producer_msg_t *p_msg = dafka_producer_msg_new ();
    dafka_producer_msg_set_content_str (p_msg, "1");
    dafka_producer_msg_send (p_msg, producer);
    
    dafka_producer_msg_set_content_str (p_msg, "2");
    dafka_producer_msg_send (p_msg, producer);
    
    // Starting the store
    zactor_t *store = zactor_new (dafka_store_actor, config);
    zclock_sleep (100);
    
    // Producing another message
    dafka_producer_msg_set_content_str (p_msg, "3");
    dafka_producer_msg_send (p_msg, producer);
    zclock_sleep (100);
    
    // Killing the producer, to make sure the HEADs are coming from the store
    zactor_destroy (&producer);
    
    // Starting a consumer and check that consumer recv all 3 messages
    dafka_consumer_args_t consumer_args = { .config = config };
    dafka_consumer_t *consumer = dafka_consumer_new (&consumer_args);
    dafka_consumer_subscribe (consumer, "TEST");
    
    dafka_consumer_msg_t *c_msg = dafka_consumer_msg_new ();
    dafka_consumer_msg_recv (c_msg, consumer);
    assert (dafka_consumer_msg_streq (c_msg, "1"));
    
    dafka_consumer_msg_recv (c_msg, consumer);
    assert (dafka_consumer_msg_streq (c_msg, "2"));
    
    dafka_consumer_msg_recv (c_msg, consumer);
    assert (dafka_consumer_msg_streq (c_msg, "3"));
    
    dafka_consumer_msg_destroy (&c_msg);
    dafka_consumer_destroy (&consumer);
    dafka_producer_msg_destroy (&p_msg);
    zactor_destroy (&store);
    zactor_destroy (&tower);
    zconfig_destroy (&config);
    
    // ----------------------------------------------------
    //  Cleanup test artifacts
    // ----------------------------------------------------
    zdir_t *store_dir = zdir_new (SELFTEST_DIR_RW "/storedb", NULL);
    zdir_remove (store_dir, true);
    zdir_destroy (&store_dir);

dafka_tower - no title found

dafka_tower -

Please add '@discuss' section in './../src/dafka_tower.c'.

This is the class interface:

    //  Create new dafka_tower actor instance.
    //  @TODO: Describe the purpose of this actor!
    //
    //      zactor_t *dafka_tower = zactor_new (dafka_tower, NULL);
    //
    //  Destroy dafka_tower instance.
    //
    //      zactor_destroy (&dafka_tower);
    //
    //  Start dafka_tower actor.
    //
    //      zstr_sendx (dafka_tower, "START", NULL);
    //
    //  Stop dafka_tower actor.
    //
    //      zstr_sendx (dafka_tower, "STOP", NULL);
    //
    //  This is the dafka_tower constructor as a zactor_fn;
    DAFKA_EXPORT void
        dafka_tower_actor (zsock_t *pipe, void *args);
    
    //  Self test of this actor
    DAFKA_EXPORT void
        dafka_tower_test (bool verbose);

Please add '@interface' section in './../src/dafka_tower.c'.

This is the class self test code:

    //  Simple create/destroy test
    /*
    zactor_t *dafka_tower = zactor_new (dafka_tower_actor, NULL);
    assert (dafka_tower);
    
    zactor_destroy (&dafka_tower);
    */

Contributing

Documentation

Man pages are generated from the class header and source files via the doc/mkman tool, and similar functionality in the gitdown tool (http://github.com/imatix/gitdown). The header file for a class must wrap its interface as follows (example is from include/zclock.h):

//  @interface
//  Sleep for a number of milliseconds
void
    zclock_sleep (int msecs);

//  Return current system clock as milliseconds
int64_t
    zclock_time (void);

//  Self test of this class
int
    zclock_test (Bool verbose);
//  @end

The source file for a class must provide documentation as follows:

/*
@header
...short explanation of class...
@discuss
...longer discussion of how it works...
@end
*/

The source file for a class then provides the self test example as follows:

//  @selftest
int64_t start = zclock_time ();
zclock_sleep (10);
assert ((zclock_time () - start) >= 10);
//  @end

The template for man pages is in doc/mkman.

Development

Dafka is developed through a test-driven process that guarantees no memory violations or leaks in the code:

  • Modify a class or method.
  • Update the test method for that class.
  • Run the 'selftest' script, which uses the Valgrind memcheck tool or AdressSanatizer.
  • Repeat until perfect.

To run the 'selftest' script with autotools:

make check
make memcheck

To run the 'selftest' script with cmake:

make test or ctest
ctest -T memcheck

Hints to Contributors

Don't include system headers in source files. The right place for these is dafka_prelude.h. If you need to check against configured libraries and/or headers, include platform.h in the source before including dafka.h.

Do read your code after you write it and ask, "Can I make this simpler?" We do use a nice minimalist and yet readable style. Learn it, adopt it, use it.

Before opening a pull request read our contribution guidelines. Thanks!

Code Generation

We generate scripts for build systems like autotools, cmake and others as well as class skeletons, class headers, the selftest runner, bindings to higher level languages and more using zproject. Generated files will have a header and footer telling you that this file was generated. To re-generate those files it is recommended to use the latest zeromqorg/zproject docker image.

Docker

  • Clone libzmq into the same directory as dafka.
  • Clone czmq into the same directory as dafka.

Next always download the latest image:

# Make sure
docker pull zeromqorg/zproject:latest

Then run the following command:

# Shell and Powershell
docker run -v ${PWD}/..:/workspace -e BUILD_DIR=/workspace/czmq zeromqorg/zproject

# Windows CMD
docker run -v %cd%/..:/workspace -e BUILD_DIR=/workspace/czmq zeromqorg/zproject

This Document

This documentation was generated from dafka/README.txt using Gitdown

More Repositories

1

libzmq

ZeroMQ core engine in C++, implements ZMTP/3.1
C++
8,996
star
2

pyzmq

PyZMQ: Python bindings for zeromq
Python
3,480
star
3

netmq

A 100% native C# implementation of ZeroMQ for .NET
C#
2,822
star
4

jeromq

Pure Java ZeroMQ
Java
2,288
star
5

cppzmq

Header-only C++ binding for libzmq
C++
1,737
star
6

zeromq.js

⚡ Node.js bindings to the ØMQ library
TypeScript
1,371
star
7

czmq

High-level C binding for ØMQ
C
1,110
star
8

zmq.rs

A native implementation of ØMQ in Rust
Rust
1,015
star
9

zyre

Zyre - an open-source framework for proximity-based peer-to-peer applications
C
843
star
10

jzmq

Java binding for ZeroMQ
Java
584
star
11

goczmq

goczmq is a golang wrapper for CZMQ.
Go
552
star
12

php-zmq

ZeroMQ for PHP
C
543
star
13

zeromq4-x

ØMQ 4.x stable release branch - bug fixes only
C++
446
star
14

zmqpp

0mq 'highlevel' C++ bindings
C++
418
star
15

zeromq2-x

ØMQ/2.x distribution
C++
365
star
16

malamute

The ZeroMQ Enterprise Messaging Broker
C
311
star
17

gomq

Pure Go Implementation of a Subset of ZeroMQ
Go
290
star
18

clrzmq

CLR (.NET & Mono) binding for 0MQ
C#
272
star
19

filemq

FileMQ is a publish-subscribe file service based on 0MQ
Java
271
star
20

rbzmq

Ruby binding for 0MQ
C
248
star
21

clrzmq4

ZeroMQ C# namespace (.NET and mono, Windows, Linux and MacOSX, x86 and amd64)
C#
235
star
22

zproto

A protocol framework for ZeroMQ
C
228
star
23

zeromq3-x

ØMQ/3.2 release branch - bug fixes only
C++
228
star
24

JSMQ

Javascript client for ZeroMQ/NetMQ
JavaScript
190
star
25

chumak

Pure Erlang implementation of ZeroMQ Message Transport Protocol.
Erlang
189
star
26

erlzmq2

Erlang binding for 0MQ (v2)
C
165
star
27

libcurve

An encryption and authentication library for ZeroMQ applications
C
161
star
28

zproject

CLASS Project Generator
Shell
142
star
29

lzmq

Lua binding to ZeroMQ
Lua
133
star
30

gitdown

Turn github into your publishing platform
Perl 6
130
star
31

zeromq4-1

ZeroMQ 4.1.x stable release branch - bug fixes only
C++
125
star
32

pyre

Python port of Zyre
Python
116
star
33

exzmq

ZeroMQ for Elixir
Elixir
115
star
34

fszmq

An F# binding for the ZeroMQ distributed computing library. For more information, please visit:
F#
112
star
35

majordomo

Majordomo Project
C
111
star
36

cljzmq

Clojure bindings for ØMQ
Clojure
105
star
37

rfc

ZeroMQ RFC project
C
104
star
38

gyre

Golang port of Zyre
Go
87
star
39

zwssock

ZeroMQ WebSocket library for CZMQ
C
85
star
40

jszmq

Javascript port of zeromq
TypeScript
76
star
41

libzmtp

Minimal ZMTP implementation in C
C
53
star
42

ingescape

Model-based framework for broker-free distributed software environments. Any language, any OS, web, cloud.
C
52
star
43

zbroker

Elastic pipes
C
50
star
44

czmqpp

C++ wrapper for czmq. Aims to be minimal, simple and consistent.
C++
43
star
45

zeromq.org

ZeroMQ Website
HTML
32
star
46

cookbook

ZeroMQ Cookbook
Python
32
star
47

pyczmq

Python CZMQ bindings
Python
31
star
48

zebra

REST/HTTP to XRAP gateway
C++
26
star
49

zmtpdump

ZeroMQ Transport Protocol packet analyzer
Roff
25
star
50

jeromq-jms

JeroMQ JMS
Java
24
star
51

jzmq-api

A Java ØMQ API for abstracting the various implementations of ZeroMQ Message Transport Protocol
Java
24
star
52

zmq-jni

Simple High Performance JNI Wrapper for ØMQ
Java
22
star
53

perlzmq

version agnostic Perl bindings for zeromq
Perl
22
star
54

zmtp

Stuff related to the ZMTP protocol
C
18
star
55

contiki-zmtp

ZMTP for Contiki OS
C
16
star
56

jyre

Java implementation of ZRE protocol
Java
14
star
57

zccp

ZeroMQ Command & Control Protocol
Go
14
star
58

f77_zmq

Fortran binding for ZeroMQ
C
13
star
59

ztools

Tools for ØMQ auto-builds and API site
Perl
12
star
60

zeps

ZeroMQ Enterprise Publish-Subscribe (ZEPS) **DEPRECATED**
C
12
star
61

mruby-zmq

mruby bindings for libzmq (v4)
C
10
star
62

zeromq2-0

Packaging project for ØMQ/2.0 series
C++
9
star
63

zkernel

z kernel
C
9
star
64

nimczmq

Nim ( http://nim-lang.org/ ) bindings for CZMQ
Nim
6
star
65

curvezmq-java

Java
5
star
66

gozyre

Go bindings for zeromq libzyre - an open-source framework for proximity-based peer-to-peer applications
Go
5
star
67

issues

Issue test cases
C
4
star
68

zdiscgo

CZMQ service discovery zactor with support for Go plugins.
C
4
star
69

jdafka

Dafka protocol implementation in Java
Java
4
star
70

zlabs

Labs project for experimenting with CZMQ
Shell
4
star
71

zeromq-buildbot

A buildbot based regression tester for Zeromq
Python
3
star
72

azmq1-0

v1.0 release of azmq
C++
3
star
73

lyre

Lua port of Zyre
Lua
3
star
74

jeromq3-x

Java
3
star
75

jzmq3-x

Java
2
star
76

zmtp-java

Java
2
star
77

libzmq-relicense

This repo contains information regarding re-licensing libzmq
Python
2
star
78

clrzmq2

Old repository path for clrzmq
2
star
79

libzmq-fuzz-corpora

fuzzers corpus files for libzmq are stored in binary format in this repository
1
star
80

zeromq-download-redirect

HTML
1
star