• Stars
    star
    1,968
  • Rank 23,561 (Top 0.5 %)
  • Language
    C
  • License
    MIT License
  • Created over 9 years ago
  • Updated over 1 year ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Production-ready, stable Kafka client for PHP

PHP Kafka client - php-rdkafka

Join the chat at https://gitter.im/arnaud-lb/php-rdkafka

Supported librdkafka versions: >= 0.11 Supported Kafka versions: >= 0.8 Supported PHP versions: 7.x .. 8.x

PHP-rdkafka is a stable, production-ready, long term support, and fast Kafka client for PHP based on librdkafka.

It supports PHP 7, PHP 8, PHP 5 (in older versions), all librdkafka versions since 0.11, all Kafka versions since 0.8. This makes it easy to deploy the extension in production.

The goal of the extension is to be a low-level un-opinionated librdkafka binding focused on production and long term support.

The high level and low level consumers, producer, and metadata APIs are supported.

Documentation is available here.

Sponsors

Upstash

Upstash: Serverless Kafka

  • True Serverless Kafka with per-request-pricing
  • Managed Apache Kafka, works with all Kafka clients
  • Built-in REST API designed for serverless and edge functions

Start for free in 30 seconds!

php-rdkafka supports Ukraine. Proceeds from our generous sponsors are currently donated to the Support Ukraine collective.

Table of Contents

  1. Installation
  2. Examples
  3. Usage
  4. Documentation
  5. Credits
  6. License

Installation

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.setup.html

Examples

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/rdkafka.examples.html

Usage

Configuration parameters used below can be found in Librdkafka Configuration reference

Producing

Creating a producer

For producing, we first need to create a producer, and to add brokers (Kafka servers) to it:

<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Producer($conf);
$rk->addBrokers("10.0.0.1:9092,10.0.0.2:9092");

Producing messages

Warning Make sure that your producer follows proper shutdown (see below) to not lose messages.

Next, we create a topic instance from the producer:

<?php

$topic = $rk->newTopic("test");

From there, we can produce as much messages as we want, using the produce method:

<?php

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

The first argument is the partition. RD_KAFKA_PARTITION_UA stands for unassigned, and lets librdkafka choose the partition.
The second argument are message flags and should be either 0
or RD_KAFKA_MSG_F_BLOCK to block produce on full queue. The message payload can be anything.

Proper shutdown

This should be done prior to destroying a producer instance
to make sure all queued and in-flight produce requests are completed
before terminating. Use a reasonable value for $timeout_ms.

Warning Not calling flush can lead to message loss!

$rk->flush($timeout_ms);

In case you don't care about sending messages that haven't been sent yet, you can use purge() before calling flush():

// Forget messages that are not fully sent yet
$rk->purge(RD_KAFKA_PURGE_F_QUEUE);

$rk->flush($timeout_ms);

High-level consuming

The RdKafka\KafkaConsumer class supports automatic partition assignment/revocation. See the example here.

Low-level consuming (legacy)

Note The low-level consumer is a legacy API, please prefer using the high-level consumer

We first need to create a low level consumer, and to add brokers (Kafka servers) to it:

<?php
$conf = new RdKafka\Conf();
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("10.0.0.1,10.0.0.2");

Next, create a topic instance by calling the newTopic() method, and start consuming on partition 0:

<?php

$topic = $rk->newTopic("test");

// The first argument is the partition to consume from.
// The second argument is the offset at which to start consumption. Valid values
// are: RD_KAFKA_OFFSET_BEGINNING, RD_KAFKA_OFFSET_END, RD_KAFKA_OFFSET_STORED.
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

Next, retrieve the consumed messages:

<?php

while (true) {
    // The first argument is the partition (again).
    // The second argument is the timeout.
    $msg = $topic->consume(0, 1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Low-level consuming from multiple topics / partitions (legacy)

Note The low-level consumer is a legacy API, please prefer using the high-level consumer

Consuming from multiple topics and/or partitions can be done by telling librdkafka to forward all messages from these topics/partitions to an internal queue, and then consuming from this queue:

Creating the queue:

<?php
$queue = $rk->newQueue();

Adding topic partitions to the queue:

<?php

$topic1 = $rk->newTopic("topic1");
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2");
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

Next, retrieve the consumed messages from the queue:

<?php

while (true) {
    // The only argument is the timeout.
    $msg = $queue->consume(1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}

Using stored offsets

Broker (default)

librdkafka per default stores offsets on the broker.

File offsets (deprecated)

If you're using local file for offset storage, then by default the file is created in the current directory, with a name based on the topic and the partition. The directory can be changed by setting the offset.store.path configuration property.

Consumer settings

Low-level consumer: auto commit settings

To manually control the offset, set enable.auto.offset.store to false.
The settings auto.commit.interval.ms and auto.commit.enable will control
if the stored offsets will be auto committed to the broker and in which interval.

High-level consumer: auto commit settings

To manually control the offset, set enable.auto.commit to false.

High level consumer: max.poll.interval.ms

Maximum allowed time between calls to consume messages for high-level consumers.
If this interval is exceeded the consumer is considered failed and the group will
rebalance in order to reassign the partitions to another consumer group member.

Consumer group id (general)

group.id is responsible for setting your consumer group ID and it should be unique (and should not change). Kafka uses it to recognize applications and store offsets for them.

<?php

$topicConf = new RdKafka\TopicConf();
$topicConf->set("auto.commit.interval.ms", 1e3);

$topic = $rk->newTopic("test", $topicConf);

$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

Interesting configuration parameters

Librdkafka Configuration reference

queued.max.messages.kbytes

librdkafka will buffer up to 1GB of messages for each consumed partition by default. You can lower memory usage by reducing the value of the queued.max.messages.kbytes parameter on your consumers.

topic.metadata.refresh.sparse and topic.metadata.refresh.interval.ms

Each consumer and producer instance will fetch topics metadata at an interval defined by the topic.metadata.refresh.interval.ms parameter. Depending on your librdkafka version, the parameter defaults to 10 seconds, or 600 seconds.

librdkafka fetches the metadata for all topics of the cluster by default. Setting topic.metadata.refresh.sparse to the string "true" makes sure that librdkafka fetches only the topics he uses.

Setting topic.metadata.refresh.sparse to "true", and topic.metadata.refresh.interval.ms to 600 seconds (plus some jitter) can reduce the bandwidth a lot, depending on the number of consumers and topics.

internal.termination.signal

This setting allows librdkafka threads to terminate as soon as librdkafka is done with them. This effectively allows your PHP processes / requests to terminate quickly.

When enabling this, you have to mask the signal like this:

<?php
// once
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
// any time
$conf->set('internal.termination.signal', SIGIO);

socket.blocking.max.ms (librdkafka < 1.0.0)

Maximum time a broker socket operation may block. A lower value improves responsiveness at the expense of slightly higher CPU usage.

Reducing the value of this setting improves shutdown speed. The value defines the maximum time librdkafka will block in one iteration of a read loop. This also defines how often the main librdkafka thread will check for termination.

queue.buffering.max.ms

This defines the maximum and default time librdkafka will wait before sending a batch of messages. Reducing this setting to e.g. 1ms ensures that messages are sent ASAP, instead of being batched.

This has been seen to reduce the shutdown time of the rdkafka instance, and of the PHP process / request.

Performance / Low-latency settings

Here is a configuration optimized for low latency. This allows a PHP process / request to send messages ASAP and to terminate quickly.

<?php

$conf = new \RdKafka\Conf();
$conf->set('socket.timeout.ms', 50); // or socket.blocking.max.ms, depending on librdkafka version
if (function_exists('pcntl_sigprocmask')) {
    pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
    $conf->set('internal.termination.signal', SIGIO);
} else {
    $conf->set('queue.buffering.max.ms', 1);
}

$producer = new \RdKafka\Producer($conf);
$consumer = new \RdKafka\Consumer($conf);

It is advised to call poll at regular intervals to serve callbacks. In php-rdkafka:3.x
poll was also called during shutdown, so not calling it in regular intervals might
lead to a slightly longer shutdown. The example below polls until there are no more events in the queue:

$producer->produce(...);
while ($producer->getOutQLen() > 0) {
    $producer->poll(1);
}

Documentation

https://arnaud-lb.github.io/php-rdkafka-doc/phpdoc/book.rdkafka.html
The source of the documentation can be found here

Asking for Help

If the documentation is not enough, feel free to ask a questions on the php-rdkafka channels on Gitter or Google Groups.

Stubs

Because your IDE is not able to auto discover php-rdkadka api you can consider usage of external package providing a set of stubs for php-rdkafka classes, functions and constants: kwn/php-rdkafka-stubs

Contributing

If you would like to contribute, thank you :)

Before you start, please take a look at the CONTRIBUTING document to see how to get your changes merged in.

Credits

Documentation copied from librdkafka.

Authors: see contributors.

License

php-rdkafka is released under the MIT license.

More Repositories

1

php-memory-profiler

Memory profiler for PHP. Helps finding memory leaks in PHP scripts.
C
773
star
2

MtHaml

Multi target HAML (HAML for PHP, Twig, <your language here>)
PHP
363
star
3

vim-php-namespace

PHP namespace support for VIM. Types "use" statements for you
Vim Script
252
star
4

php-go

php-go allows to call Go code from PHP, with minimal code boilerplate
C
174
star
5

php-inotify

Inotify bindings for PHP 5, 7, and next
C
39
star
6

MtHamlBundle

Symfony2 HAML bundle
PHP
38
star
7

imagesize.js

Get the size of an image without reading or downloading it entirely
JavaScript
30
star
8

php-sema

A library for semantic analysis of PHP code
PHP
19
star
9

alb-oembed

Simple PHP oEmbed consumer library with discovery support
PHP
16
star
10

Zwig

Twig / Zend Framework adapter
PHP
16
star
11

TwigReflectionBundle

Displays what's in Twig
PHP
15
star
12

goresize

image resizing proxy written in golang
Go
12
star
13

xtrabackup-manager

xtrabackup-manager fork. Original code at https://code.google.com/p/xtrabackup-manager/
PHP
9
star
14

TwigShellBundle

Simple Twig Shell
PHP
8
star
15

Silex-MtHaml

HAML templating for Silex
PHP
8
star
16

fselectmenu

JavaScript
7
star
17

phpketama

Pure-PHP implementation of libketama, a consistent hashing library
PHP
6
star
18

php-rdkafka-doc

PHP
6
star
19

php-inotify-ffi

Pure-PHP inotify binding (FFI-based)
PHP
5
star
20

binsort

Binsort is a tool to sort files of fixed-length binary records
Go
5
star
21

AlbOpenIDServerBundle

OpenID Provider bundle
PHP
5
star
22

php-throttle

php module for throttling file upload speed
C
4
star
23

fdinfo

progress bar for quiet cli tools
C
4
star
24

OpenISETL

Scala/JVM implementation of the ISETL language
Scala
4
star
25

run

`nohup something >/dev/null 2>&1 &`; shortened
C
3
star
26

sfbootstrap

PHP
3
star
27

mediawiki-php-FastStringSearch

Github mirror of PHP extension FastStringSearch - our actual code is hosted with Gerrit (please see https://www.mediawiki.org/wiki/Developer_access for contributing)
C
2
star
28

zf

zf mirror
PHP
1
star
29

iterm-to-neovim

Converts Iterm2 color schemes to neovim g:terminal_color_x
Go
1
star
30

jQuery.event.queueHandler

Queues handlers to be ran after the current event's handlers have been ran
JavaScript
1
star
31

graphql-demo

A Symfony application demoing a simple GraphQL API
PHP
1
star