• Stars
    star
    292
  • Rank 142,152 (Top 3 %)
  • Language
    C++
  • License
    MIT License
  • Created about 6 years ago
  • Updated 7 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

The Apache Kafka Client SDK

Introduction 中文

  • Qbusbridge is a client SDK for pub-sub messaging systems. Currently it supports:

    User could switch to any pub-sub messaging system by changing the configuration file. The default config is accessing Kafka, if you want to change it to Pulsar, change the config to:

    mq.type=pulsar
    # Other configs for pulsar...

    See config for more details.

    TODO: English config docs is missed currently.

  • Qbusbridge-Kafka is based on the librdkafka under the hook. A mass of details related to how to use has been hidden, that making QBus more simple and easy-to-use than librdkafka. For producing and consuming messages, the only thing need the users to do is to invoke a few APIs, for these they don't need to understand too much about Kafka.

  • The reliability of messages producing, that is may be the biggest concerns of the users, has been considerably improved.

Features

  • Multiple programming languages are supported, includes C++, PHP, Python, Golong, with very consistent APIs.
  • Few interfaces, so it is easy to use.
  • For advanced users, adapting librdkafka's configration by profiles is also supported.
  • In the case of writing data not by keys, the SDK will do the best to guarantee the messages being written successfully .
  • Two writing modes, synchronous and asynchronous, are supported.
  • As for messages consuming, the offset could be submited automatically, or by manual configurating.
  • For the case of using php-fpm, the connection is keeping-alived for reproduce messages uninterruptedly, saving the cost caused by recreating connections.

Compiling

Ensure your system has g++ (>= 4.8.5), boost (>= 1.41), cmake (>= 3.1) and swig (>= 3.0.12) installed.

In addition, qbus SDK is linking libstdc++ statically, so you must ensure that libstdc++.a exists. For CentOS users, run:

sudo yum install -y glibc-static libstdc++-static

git clone:

git clone --recursive https://github.com/Qihoo360/qbusbridge.git

1. Install submodules

Run ./build_dependencies.sh.

It will automatically download submodules and install them to cxx/thirdparts/local where CMakeLists.txt finds headers and libraries.

See ./cxx/thirdparts/local:

include/
  librdkafka/
    rdkafka.h
  log4cplus/
    logger.h
lib/
  librdkafka.a
  liblog4cplus.a

2. Build SDK

C++

Navigate to the cxx directory and run ./build.sh, following files will be generated:

include/
  qbus_consumer.h
  qbus_producer.h
lib/
  debug/libQBus.so
  release/libQBus.so

Though building C++ SDK requires C++11 support, the SDK could be used with older g++. eg. build qbus SDK with g++ 4.8.5 and use qbus SDK with g++ 4.4.7.

Go

Navigate to the golang directory and run ./build.sh, following files will be generated:

gopath/
  src/
    qbus/
      qbus.go
      libQBus_go.so

You can enable go module for examples by running USE_GO_MOD=1 ./build.sh. Then following files will be generated:

examples/
  go.mod
  qbus/
    qbus.go
    go.mod
    libQBus_go.so

Python

Navigate to the python directory and run ./build.sh, following files will be generated:

examples/
  qbus.py
  _qbus.so

PHP

Navigate to the php directory and run build.sh, following files will be generated:

examples/
  qbus.php
  qbus.so

3. Build examples

C++

Navigate to examples subdirectory and run ./build.sh [debug|release] to generate executable files. debug is using libQBus.so in lib/debug subdirectory, release is using libQBus.so in lib/release subdirectory. Run make clean to delete them.

If you want to build your own programs, see how Makefile does.

Go

Navigate to examples subdirectory and run ./build.sh to generate executable files, run ./clean.sh to delete them.

Add path of libQBus_go.so to env LD_LIBRARY_PATH, eg.

export LD_LIBRARY_PATH=$PWD/gopath/src/qbus:$LD_LIBRARY_PATH

If you want to build your own programs, add generated gopath directory to env GOPATH, or move gopath/src/qbus directory to $GOPATH/src.

Python

Copy generated qbus.py and _qbus.so to the path of the Python scripts to run.

PHP

Edit php.ini and add extension=<module-path>, <module-path> is the path of qbus.so.

Usage

Data Producing

  • In the case of writing data not by keys, the SDK will do it's best to submit every single message. As long as there is one broker in the Kafka cluster behaving normally, it will try to resend.
  • Writing data only need to invoke the produce interface, and in the asynchronous mode, by checking the return value, you could know whether the sending queue is full or not.
  • In the synchronous writing mode, produce interface will return value directively that indicate whether the current message has been written succuessfully. But that is at the expense of some extra performance loss and CPU usage. So asynchronous mode is recommended.
  • The following is a C++ example demonstrating how to invoke the produce interface:
bool QbusProducer::init(const string& broker_list,
                        const string& log_path,
                        const string& config_path,
                        const string& topic_name);
bool QbusProducer::produce(const char* data,
                           size_t data_len,
                           const std::string& key);
void QbusProducer::uninit();
  • C++ SDK use example:
#include <string>
#include <iostream>
#include "qbus_producer.h"

int main(int argc, const char* argv[]) {
    qbus::QbusProducer qbus_producer;
    if (!qbus_producer.init("127.0.0.1:9092",
                    "./log",
                    "./config",
                    "topic_test")) {
        std::cout << "Failed to init" << std::endl;
        return 0;
    }

    std::string msg("test\n");
    if (!qbus_producer.produce(msg.c_str(), msg.length(), "key")) {
        std::cout << "Failed to produce" << std::endl;
    }

    qbus_producer.uninit();

    return 0;
}

Data Consuming

  • Consuming data only need to invoke the subscribeOne to subscribe the 'topic' (also support subscribing multiple topics). The current process is not blocked, every message will send back to the user through the callback.
  • The SDK also supports submit offset manually, users can submit the offset in the code of the message body that returned by through callbacks.
  • The following is an example of C++, that demonstrate the usage of the consuming interface:
bool QbusConsumer::init(const std::string& broker_list,
                        const std::string& log_path,
                        const std::string& config_path,
                        const QbusConsumerCallback& callback);
bool QbusConsumer::subscribeOne(const std::string& group, const std::string& topic);
bool QbusConsumer::subscribe(const std::string& group,
                             const std::vector<std::string>& topics);
bool QbusConsumer::start();
void QbusConsumer::stop();
bool QbusConsumer::pause(const std::vector<std::string>& topics);
bool QbusConsumer::resume(const std::vector<std::string>& topics);
  • C++ SDK use example:
#include <iostream>
#include "qbus_consumer.h"

qbus::QbusConsumer qbus_consumer;
class MyCallback: public qbus::QbusConsumerCallback {
    public:
        virtual void deliveryMsg(const std::string& topic,
                    const char* msg,
                    const size_t msg_len) const {
            std::cout << "topic: " << topic << " | msg: " << std::string(msg, msg_len) << std::endl;
        }

};

int main(int argc, char* argv[]) {
    MyCallback my_callback;
    if (qbus_consumer.init("127.0.0.1:9092",
                    "log",
                    "config",
                    my_callback)) {
        if (qbus_consumer.subscribeOne("groupid_test", "topic_test")) {
            if (!qbus_consumer.start()) {
                std::cout << "Failed to start" << std::endl;
                return NULL;
            }

            while (1) sleep(1);  //other operations can appear here

            qbus_consumer.stop();
        } else {
            std::cout << "Failed subscribe" << std::endl;
        }
    } else {
        std::cout << "Failed init" << std::endl;
    }
    return 0;
}

You can use pause() and resume() methods to pause or resume consuming some topics, see qbus_pause_resume_example.cc

See examples in C examplesC++ examplesGo examplesPython examplesPHP examples for more usage.

CONFIGURATION

The configuration file is in INI format:

[global]

[topic]

[sdk]

See rdkafka 1.0.x configuration for global and topic configurations, and sdk configuration for sdk configuration.

Normally kafkabridge works with an empty configuration file, but if your broker version < 0.10.0.0, you must specify api.version-related configuration parameters, see broker version compatibility.

eg. for broker 0.9.0.1, following configurations are necessary:

[global]
api.version.request=false
broker.version.fallback=0.9.0.1

The default config is now compatible with broker 0.9.0.1. Therefore, if higher version broker is used, api.version.request should be set true. Otherwise, the message protocol would be older version, e.g. no timestamp field.

Contact

QQ group: 876834263

More Repositories

1

RePlugin

RePlugin - A flexible, stable, easy-to-use Android Plug-in Framework
Java
7,261
star
2

Atlas

A high-performance and stable proxy for MySQL, it is developed by Qihoo's DBA and infrastructure team
C
4,650
star
3

wayne

Kubernetes multi-cluster management and publishing platform
TypeScript
3,706
star
4

evpp

A modern C++ network library for developing high performance network services in TCP/UDP/HTTP protocols.
C++
3,564
star
5

ArgusAPM

Powerful, comprehensive (Android) application performance management platform. 360线上移动性能检测平台
Java
2,673
star
6

safe-rules

详细的C/C++编程规范指南,由360质量工程部编著,适用于桌面、服务端及嵌入式软件系统。
2,363
star
7

Quicksql

A Flexible, Fast, Federated(3F) SQL Analysis Middleware for Multiple Data Sources
Java
2,057
star
8

poseidon

A search engine which can hold 100 trillion lines of log data.
Go
1,966
star
9

QConf

Qihoo Distributed Configuration Management System
C++
1,865
star
10

hbox

AI on Hadoop
Java
1,727
star
11

phptrace

A tracing and troubleshooting tool for PHP scripts.
C
1,677
star
12

mysql-sniffer

mysql-sniffer is a network traffic analyzer tool for mysql, it is developed by Qihoo DBA and infrastructure team
C
845
star
13

huststore

High-performance Distributed Storage
C
823
star
14

doraemon

Doraemon is a Prometheus based monitor system
JavaScript
655
star
15

logkafka

Collect logs and send lines to Apache Kafka
C++
500
star
16

zeppelin

A Scalable, High-Performance Distributed Key-Value Platform
C++
399
star
17

tensornet

C++
316
star
18

360zhinao

360zhinao
Python
274
star
19

XSQL

Unified SQL Analytics Engine Based on SparkSQL
Scala
210
star
20

WatchAD2.0

WatchAD2.0是一款针对域威胁的日志分析与监控系统
CSS
206
star
21

zendAPI

The C++ wrapper of zend engine
C++
183
star
22

mongosync

mongosync is simple && useful tool to sync data between mongo replicaSet, it is developed by Qihoo's DBA and infrastructure team
C++
154
star
23

artdumper

从oat文件中dump出来dex的工具
C++
138
star
24

influx-proxy

influxdb HA
Go
128
star
25

kmemcache

linux kernel memcache server
C
126
star
26

XLearning-XDML

extremely distributed machine learning
Scala
123
star
27

simcc

A simple C++ common base library used in Qihoo 360
C++
116
star
28

nemo

A library that provide multiply data structure. Such as map, hash, list, set. We build these data structure base on rocksdb as the storage layer for Pika https://github.com/OpenAtomFoundation/pika .
C++
115
star
29

ngx_http_subrange_module

Split one big HTTP/Range request to multiple subrange requesets
C
107
star
30

blackwidow

A library implements REDIS commands(Strings, Hashes, Lists, Sorted Sets, Sets, Keys, HyperLogLog) based on rocksdb, as the storage layer for Pika https://github.com/OpenAtomFoundation/pika .
C++
99
star
31

QNAT

C
88
star
32

Mario

A Library that make the write from synchronous to asynchronous.
C++
78
star
33

Luwak

利用预训练语言模型从非结构化威胁报告中提取 MITRE ATT&CK TTP 信息
Python
68
star
34

mpic

A C++ embedded library of multiple processes framework developed and used at Qihoo360.
C++
50
star
35

nemo-rocksdb

Add TTL feature on rocksdb, and compatible with rocksdb
C++
44
star
36

dgl-operator

The DGL Operator makes it easy to run Deep Graph Library (DGL) graph neural network training on Kubernetes
Go
44
star
37

ironwill

Useful iOS components for your project. 健壮且有用的OC代码, 可以直接在你的iOS应用中使用.
Objective-C
37
star
38

elog

A erlang log nif
C++
28
star
39

rust-jsonnet

rust-jsonnet - The Google Jsonnet( operation data template language) for rust
Rust
24
star
40

zeppelin-gateway

Object Gateway Provide Applications with a RESTful Gateway to zeppelin
C++
23
star
41

zeppelin-client

Client Library for zeppelin
C++
21
star
42

luajit-jsonnet

The Google Jsonnet( operation data template language) for Luajit
C++
16
star
43

HTTPSLayer

PHP
16
star
44

CReSS

Cross-model Retrieval between 13C NMR Spectrum and Structure
Python
15
star
45

wayne-backend-plugins

Wayne backend plugins
Go
13
star
46

gpstall

Stall Postgres' insert command
C++
8
star
47

cloud-website

360 cloud official website
PHP
8
star
48

wayne-frontend-plugins

Wayne UI Plugins
TypeScript
7
star
49

SEEChat

一见多模态对话模型
Python
5
star
50

wiki

wiki for qihoo infrastructure team
2
star
51

se-office

se-office扩展,提供基于开放标准的全功能办公生产力套件,基于浏览器预览和编辑office。
JavaScript
1
star