AtomicKafka is a lightweight NPM Package developed to simplify the process of establishing bidirectional, real-time data streaming with Apache Kafka in your web-app.
Website | Library | Demo Apps | Featured on Medium
Table of Contents
Features
- Websocket connections between the client and the server that accept user-defined event strings and callbacks
- Broker initialization and connection to Apache Kafka
- Consumer and Producer classes are predefined to be as modular as possible
- Consumer functions accept user-defined callback functions to support lightweight stream processing
- React Hook that throttles the websocket event listener with a time interval to maintain client performance
- Supports multiple Kafka streams
Benefits of AtomicKafka
Getting Started
1. Initialize Kafka cluster
AtomicKafka currently supports running Apache Kafka clusters either using a Docker image or by connecting to Confluent Cloud.
Docker:
- Download this .yml and run the following command in your terminal:
docker-compose up -d
Confluent Cloud:
- Follow the steps on Confluent Cloud to create a free account with Confluent cloud. Obtain the API_ACCESS_KEY, API_ACCESS_SECRET, and BOOTSTRAP_SERVER
2. Configure .env file
Include the following lines in your .env depending on your Kafka environment. Set the PORT variable to the port where AtomicKafkaServer will be initialized in the next step.
- Docker .env config: (API_KEY and API_SECRET are intentionally left blank)
PORT=<USER_DEFINED> API_KEY= API_SECRET= KAFKA_BOOTSTRAP_SERVER=localhost:9092
- Confluent Cloud .env config: (PORT intentionally left blank)
PORT= API_KEY=<API_ACCESS_KEY> API_SECRET=<API_ACCESS_SECRET> KAFKA_BOOTSTRAP_SERVER=<BOOTSTRAP_SERVER>
3. Install AtomicKafka
$ npm install atomic-kafka
4. Create Server Instance
Initialize a server instance of your choice (HTTP, Node.js, etc). The example below contemplates a Node.js Express server.
ATTENTION: a Server instance must be created for every remote AtomicKafkaClient.
- Initialize and configure expressApp according to desired specifications.
- Require in AtomicKafkaServer.
- Define a server that listens on the user-defined PORT environment variable.
- Initialize an AtomicKafkaServer instance aks by passing in the server.
/* initialize and configure Node.js expressApp according to user specifications
then add the following: */
const AtomicKafkaServer = require('atomic-kafka/server');
const server = expressApp.listen(process.env.PORT, () => {
console.log(`Listening on port ${process.env.PORT}`);
})
const aks = new AtomicKafkaServer(server);
5A. Create the Consumer and enable the built-in websocket on the server
- Initialize a newConsumer on the aks instance and pass in the group_ID_string.
- Enable the built-in websocket by invoking socketConsume and passing in the group_ID_string, an event_string, and the topic_string.
/* AKS_Producer_Init */
aks.newProducer('topic');
aks.globalProduce('produceMessageEvent', 'topic');
5B. Create the Producer and enable the built-in websocket on the server
- Initialize a newProducer on the aks instance and pass in the topic_string.
- Enable the built-in websocket by invoking globalProducer and passing in an event_string and the topic_string.
/* AKS_Consumer_Init */
aks.newConsumer('group_ID');
aks.socketConsume('group_ID', 'consumeMessageEvent', 'topic');
6A. JavaScript - Import Client Interface (React & Hooks)
/* in your React.jsx Component */
import AtomicKafkaClient from 'atomic-kafka/client';
6B. TypeScript - Import Client Interface (React & Hooks)
/* in your TypeScript React Component */
declare function require(name:string);
const AtomicKafkaClient = require('atomic-kafka/client').default;
7A. Create and implement Consumer client component (JS & TS)
- Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
- Define a callback to process message payload through the React state management tool of your choice.
- Implement useInterval to consume from the kafka cluster on interval.
- Return the invocation of the consumer function on the akc instance. Pass in a user-defined websocket event_string, the previously defined callback, and the interval_delay in milliseconds.
function ConsumerComponent() {
const akc = new AtomicKafkaClient('ATOMIC_KAFKA_SERVER_URI_STRING');
const callback = (payload) => {
/* user-provided data stream processing function definition
that effects state change */
}
/* Throttles message consumption. Interval in milliseconds,
can be any number */
akc.useInterval(() => akc.consumer('consumeMessageEvent', callback), 4000);
}
7B. Create and implement Producer client component (JS & TS)
- Initialize akc as an AtomicKafkaClient. Pass in AtomicKafkaServer instance host's URI_STRING
- Generate a payload formatted as an arbitrarily-nested JSON object. The example below defines a payload, but it can be generated at any point in the client according to the user's specification.
- Invoke the consumer function. Pass in the websocket event_string and the payload.
function ProducerComponent() {
const akc = new AtomicKafkaClient('ATOMIC_KAFKA_SERVER_URI_STRING');
const payload = {
/* Data to be sent to the cluster. Arbitrarily-nested JSON format.
Can be defined anywhere in the app. */
}
akc.producer('produceMessageEvent', payload);
}
Contribute
We want this open-sourced project to continue to improve. If you would like to make a contribution to AtomicKafka, please fork this repo, add your awesome changes to a well-named feature branch of this repository, and make a pull request. We look forward to your input! And if you want to support AtomicKafka, please click on the
MaintainersNikhil Massand
Built With
License
This product is released under the MIT License