• Stars
    star
    298
  • Rank 134,451 (Top 3 %)
  • Language
    Ruby
  • License
    Other
  • Created over 10 years ago
  • Updated 5 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

Kafka input and output plugin for Fluentd

fluent-plugin-kafka, a plugin for Fluentd

GitHub Actions Status

A fluentd plugin to both consume and produce data for Apache Kafka.

Installation

Add this line to your application's Gemfile:

gem 'fluent-plugin-kafka'

And then execute:

$ bundle

Or install it yourself as:

$ gem install fluent-plugin-kafka --no-document

If you want to use zookeeper related parameters, you also need to install zookeeper gem. zookeeper gem includes native extension, so development tools are needed, e.g. ruby-devel, gcc, make and etc.

Requirements

  • Ruby 2.1 or later
  • Input plugins work with kafka v0.9 or later
  • Output plugins work with kafka v0.8 or later

Usage

Common parameters

SSL authentication

  • ssl_ca_cert
  • ssl_client_cert
  • ssl_client_cert_key
  • ssl_client_cert_key_password
  • ssl_ca_certs_from_system

Set path to SSL related files. See Encryption and Authentication using SSL for more detail.

SASL authentication

with GSSAPI
  • principal
  • keytab

Set principal and path to keytab for SASL/GSSAPI authentication. See Authentication using SASL for more details.

with Plain/SCRAM
  • username
  • password
  • scram_mechanism
  • sasl_over_ssl

Set username, password, scram_mechanism and sasl_over_ssl for SASL/Plain or Scram authentication. See Authentication using SASL for more details.

Input plugin (@type 'kafka')

Consume events by single consumer.

<source>
  @type kafka

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>

  # Optionally, you can manage topic offset by using zookeeper
  offset_zookeeper    <zookeer node list (<zookeeper1_host>:<zookeeper1_port>,<zookeeper2_host>:<zookeeper2_port>,..)>
  offset_zk_root_node <offset path in zookeeper> default => '/fluent-plugin-kafka'

  # ruby-kafka consumer options
  max_bytes     (integer) :default => nil (Use default of ruby-kafka)
  max_wait_time (integer) :default => nil (Use default of ruby-kafka)
  min_bytes     (integer) :default => nil (Use default of ruby-kafka)
</source>

Supports a start of processing from the assigned offset for specific topics.

<source>
  @type kafka

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  format <input text type (text|json|ltsv|msgpack)>
  <topic>
    topic     <listening topic>
    partition <listening partition: default=0>
    offset    <listening start offset: default=-1>
  </topic>
  <topic>
    topic     <listening topic>
    partition <listening partition: default=0>
    offset    <listening start offset: default=-1>
  </topic>
</source>

See also ruby-kafka README for more detailed documentation about ruby-kafka.

Consuming topic name is used for event tag. So when the target topic name is app_event, the tag is app_event. If you want to modify tag, use add_prefix or add_suffix parameters. With add_prefix kafka, the tag is kafka.app_event.

Input plugin (@type 'kafka_group', supports kafka group)

Consume events by kafka consumer group features..

<source>
  @type kafka_group

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,..
  consumer_group <consumer group name, must set>
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
  add_headers <If true, add kafka's message headers to record>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>
  retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
  use_record_time (Deprecated. Use 'time_source record' instead.) <If true, replace event time with contents of 'time' field of fetched record>
  time_source <source for message timestamp (now|kafka|record)> :default => now
  time_format <string (Optional when use_record_time is used)>

  # ruby-kafka consumer options
  max_bytes               (integer) :default => 1048576
  max_wait_time           (integer) :default => nil (Use default of ruby-kafka)
  min_bytes               (integer) :default => nil (Use default of ruby-kafka)
  offset_commit_interval  (integer) :default => nil (Use default of ruby-kafka)
  offset_commit_threshold (integer) :default => nil (Use default of ruby-kafka)
  fetcher_max_queue_size  (integer) :default => nil (Use default of ruby-kafka)
  refresh_topic_interval  (integer) :default => nil (Use default of ruby-kafka)
  start_from_beginning    (bool)    :default => true
</source>

See also ruby-kafka README for more detailed documentation about ruby-kafka options.

topics supports regex pattern since v0.13.1. If you want to use regex pattern, use /pattern/ like /foo.*/.

Consuming topic name is used for event tag. So when the target topic name is app_event, the tag is app_event. If you want to modify tag, use add_prefix or add_suffix parameter. With add_prefix kafka, the tag is kafka.app_event.

Input plugin (@type 'rdkafka_group', supports kafka consumer groups, uses rdkafka-ruby)

⚠️ The in_rdkafka_group consumer was not yet tested under heavy production load. Use it at your own risk!

With the introduction of the rdkafka-ruby based input plugin we hope to support Kafka brokers above version 2.1 where we saw compatibility issues when using the ruby-kafka based @kafka_group input type. The rdkafka-ruby lib wraps the highly performant and production ready librdkafka C lib.

<source>
  @type rdkafka_group
  topics <listening topics(separate with comma',')>
  format <input text type (text|json|ltsv|msgpack)> :default => json
  message_key <key (Optional, for text format only, default is message)>
  kafka_message_key <key (Optional, If specified, set kafka's message key to this key)>
  add_headers <If true, add kafka's message headers to record>
  add_prefix <tag prefix (Optional)>
  add_suffix <tag suffix (Optional)>
  retry_emit_limit <Wait retry_emit_limit x 1s when BuffereQueueLimitError happens. The default is nil and it means waiting until BufferQueueLimitError is resolved>
  use_record_time (Deprecated. Use 'time_source record' instead.) <If true, replace event time with contents of 'time' field of fetched record>
  time_source <source for message timestamp (now|kafka|record)> :default => now
  time_format <string (Optional when use_record_time is used)>

  # kafka consumer options
  max_wait_time_ms 500
  max_batch_size 10000
  kafka_configs {
    "bootstrap.servers": "brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>",
    "group.id": "<consumer group name>"
  }
</source>

See also rdkafka-ruby and librdkafka for more detailed documentation about Kafka consumer options.

Consuming topic name is used for event tag. So when the target topic name is app_event, the tag is app_event. If you want to modify tag, use add_prefix or add_suffix parameter. With add_prefix kafka, the tag is kafka.app_event.

Output plugin

This kafka2 plugin is for fluentd v1 or later. This plugin uses ruby-kafka producer for writing data. If ruby-kafka doesn't fit your kafka environment, check rdkafka2 plugin instead. This will be out_kafka plugin in the future.

<match app.**>
  @type kafka2

  brokers               <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly

  # Kafka topic, placerholders are supported. Chunk keys are required in the Buffer section inorder for placeholders
  # to work.
  topic                 (string) :default => nil
  topic_key             (string) :default => 'topic'
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil
  default_partition_key (string) :default => nil
  record_key            (string) :default => nil
  default_message_key   (string) :default => nil
  exclude_topic_key     (bool)   :default => false
  exclude_partition_key (bool)   :default => false
  exclude_partition     (bool)   :default => false
  exclude_message_key   (bool)   :default => false
  get_kafka_client_log  (bool)   :default => false
  headers               (hash)   :default => {}
  headers_from_record   (hash)   :default => {}
  use_event_time        (bool)   :default => false
  use_default_for_unknown_topic (bool) :default => false
  discard_kafka_delivery_failed (bool) :default => false (No discard)
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'
  share_producer        (bool)   :default => false

  # If you intend to rely on AWS IAM auth to MSK with long lived credentials
  # https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html
  #
  # For AWS STS support, see status in
  # - https://github.com/zendesk/ruby-kafka/issues/944
  # - https://github.com/zendesk/ruby-kafka/pull/951
  sasl_aws_msk_iam_access_key_id (string) :default => nil
  sasl_aws_msk_iam_secret_key_id (string) :default => nil
  sasl_aws_msk_iam_aws_region    (string) :default => nil

  <format>
    @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  </format>

  # Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
  <inject>
    tag_key tag
    time_key time
  </inject>

  # See fluentd document for buffer related parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
  # Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
  <buffer topic>
    flush_interval 10s
  </buffer>

  # ruby-kafka producer options
  idempotent        (bool)    :default => false
  sasl_over_ssl     (bool)    :default => true
  max_send_retries  (integer) :default => 1
  required_acks     (integer) :default => -1
  ack_timeout       (integer) :default => nil (Use default of ruby-kafka)
  compression_codec (string)  :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
</match>

The <formatter name> in <format> uses fluentd's formatter plugins. See formatter article.

Note: Java based Kafka client uses murmur2 as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2 instead of crc32. Note that for using murmur2 hash partitioner function, you must install digest-murmurhash gem.

ruby-kafka sometimes returns Kafka::DeliveryFailed error without good information. In this case, get_kafka_client_log is useful for identifying the error cause. ruby-kafka's log is routed to fluentd log so you can see ruby-kafka's log in fluentd logs.

Supports following ruby-kafka's producer options.

  • max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
  • required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
  • ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
  • compression_codec - default: nil - The codec the producer uses to compress messages.
  • max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
  • discard_kafka_delivery_failed - default: false - discard the record where Kafka::DeliveryFailed occurred

If you want to know about detail of monitoring, see also https://github.com/zendesk/ruby-kafka#monitoring

See also Kafka::Client for more detailed documentation about ruby-kafka.

This plugin supports compression codec "snappy" also. Install snappy module before you use snappy compression.

$ gem install snappy --no-document

snappy gem uses native extension, so you need to install several packages before. On Ubuntu, need development packages and snappy library.

$ sudo apt-get install build-essential autoconf automake libtool libsnappy-dev

On CentOS 7 installation is also necessary.

$ sudo yum install gcc autoconf automake libtool snappy-devel

This plugin supports compression codec "lz4" also. Install extlz4 module before you use lz4 compression.

$ gem install extlz4 --no-document

This plugin supports compression codec "zstd" also. Install zstd-ruby module before you use zstd compression.

$ gem install zstd-ruby --no-document

Load balancing

Messages will be assigned a partition at random as default by ruby-kafka, but messages with the same partition key will always be assigned to the same partition by setting default_partition_key in config file. If key name partition_key_key exists in a message, this plugin set the value of partition_key_key as key.

default_partition_key partition_key_key behavior
Not set Not exists All messages are assigned a partition at random
Set Not exists All messages are assigned to the specific partition
Not set Exists Messages which have partition_key_key record are assigned to the specific partition, others are assigned a partition at random
Set Exists Messages which have partition_key_key record are assigned to the specific partition with partition_key_key, others are assigned to the specific partition with default_parition_key

If key name message_key_key exists in a message, this plugin publishes the value of message_key_key to kafka and can be read by consumers. Same message key will be assigned to all messages by setting default_message_key in config file. If message_key_key exists and if partition_key_key is not set explicitly, messsage_key_key will be used for partitioning.

Headers

It is possible to set headers on Kafka messages. This only works for kafka2 and rdkafka2 output plugin.

The format is like key1:value1,key2:value2. For example:

<match app.**>
  @type kafka2
  [...]
  headers some_header_name:some_header_value
<match>

You may set header values based on a value of a fluentd record field. For example, imagine a fluentd record like:

{"source": { "ip": "127.0.0.1" }, "payload": "hello world" }

And the following fluentd config:

<match app.**>
  @type kafka2
  [...]
  headers_from_record source_ip:$.source.ip
<match>

The Kafka message will have a header of source_ip=12.7.0.0.1.

The configuration format is jsonpath. It is descibed in https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor

Excluding fields

Fields can be excluded from output data. Only works for kafka2 and rdkafka2 output plugin.

Fields must be specified using an array of dot notation $., for example:

<match app.**>
  @type kafka2
  [...]
  exclude_fields $.source.ip,$.HTTP_FOO
<match>

This config can be used to remove fields used on another configs.

For example, $.source.ip can be extracted with config headers_from_record and excluded from message payload.

Using this config to remove unused fields is discouraged. A filter plugin can be used for this purpose.

Send only a sub field as a message payload

If record_key is provided, the plugin sends only a sub field given by that key. The configuration format is jsonpath.

e.g. When the following configuration and the incoming record are given:

configuration:

<match **>
  @type kafka2
  [...]
  record_key '$.data'
</match>

record:

{
    "specversion" : "1.0",
    "type" : "com.example.someevent",
    "id" : "C234-1234-1234",
    "time" : "2018-04-05T17:31:00Z",
    "datacontenttype" : "application/json",
    "data" : {
        "appinfoA" : "abc",
        "appinfoB" : 123,
        "appinfoC" : true
    },
    ...
}

only the data field will be serialized by the formatter and sent to Kafka. The toplevel data key will be removed.

Buffered output plugin

This plugin uses ruby-kafka producer for writing data. This plugin is for v0.12. If you use v1, see kafka2. Support of fluentd v0.12 has ended. kafka_buffered will be an alias of kafka2 and will be removed in the future.

<match app.**>
  @type kafka_buffered

  # Brokers: you can choose either brokers or zookeeper. If you are not familiar with zookeeper, use brokers parameters.
  brokers             <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  zookeeper           <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
  zookeeper_path      <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka

  topic_key             (string) :default => 'topic'
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  exclude_topic_key     (bool)   :default => false
  exclude_partition_key (bool)   :default => false
  exclude_partition     (bool)   :default => false
  exclude_message_key   (bool)   :default => false
  output_data_type      (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  output_include_tag    (bool) :default => false
  output_include_time   (bool) :default => false
  exclude_topic_key     (bool) :default => false
  exclude_partition_key (bool) :default => false
  get_kafka_client_log  (bool) :default => false
  use_event_time        (bool) :default => false
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'

  # See fluentd document for buffer related parameters: https://docs.fluentd.org/v/0.12/buffer

  # ruby-kafka producer options
  idempotent                   (bool)    :default => false
  sasl_over_ssl                (bool)    :default => true
  max_send_retries             (integer) :default => 1
  required_acks                (integer) :default => -1
  ack_timeout                  (integer) :default => nil (Use default of ruby-kafka)
  compression_codec            (string)  :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
  kafka_agg_max_bytes          (integer) :default => 4096
  kafka_agg_max_messages       (integer) :default => nil (No limit)
  max_send_limit_bytes         (integer) :default => nil (No drop)
  discard_kafka_delivery_failed   (bool) :default => false (No discard)
  monitoring_list              (array)   :default => []
</match>

kafka_buffered supports the following ruby-kafka parameters:

  • max_send_retries - default: 2 - Number of times to retry sending of messages to a leader.
  • required_acks - default: -1 - The number of acks required per request. If you need flush performance, set lower value, e.g. 1, 2.
  • ack_timeout - default: nil - How long the producer waits for acks. The unit is seconds.
  • compression_codec - default: nil - The codec the producer uses to compress messages.
  • max_send_limit_bytes - default: nil - Max byte size to send message to avoid MessageSizeTooLarge. For example, if you set 1000000(message.max.bytes in kafka), Message more than 1000000 byes will be dropped.
  • discard_kafka_delivery_failed - default: false - discard the record where Kafka::DeliveryFailed occurred
  • monitoring_list - default: [] - library to be used to monitor. statsd and datadog are supported

kafka_buffered has two additional parameters:

  • kafka_agg_max_bytes - default: 4096 - Maximum value of total message size to be included in one batch transmission.
  • kafka_agg_max_messages - default: nil - Maximum number of messages to include in one batch transmission.

Note: Java based Kafka client uses murmur2 as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2 instead of crc32. Note that for using murmur2 hash partitioner function, you must install digest-murmurhash gem.

Non-buffered output plugin

This plugin uses ruby-kafka producer for writing data. For performance and reliability concerns, use kafka_bufferd output instead. This is mainly for testing.

<match app.**>
  @type kafka

  # Brokers: you can choose either brokers or zookeeper.
  brokers        <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  zookeeper      <zookeeper_host>:<zookeeper_port> # Set brokers via Zookeeper
  zookeeper_path <broker path in zookeeper> :default => /brokers/ids # Set path in zookeeper for kafka

  default_topic         (string) :default => nil
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  output_data_type      (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  output_include_tag    (bool) :default => false
  output_include_time   (bool) :default => false
  exclude_topic_key     (bool) :default => false
  exclude_partition_key (bool) :default => false
  partitioner_hash_function (enum) (crc32|murmur2) :default => 'crc32'

  # ruby-kafka producer options
  max_send_retries    (integer) :default => 1
  required_acks       (integer) :default => -1
  ack_timeout         (integer) :default => nil (Use default of ruby-kafka)
  compression_codec   (string)  :default => nil (No compression. Depends on ruby-kafka: https://github.com/zendesk/ruby-kafka#compression)
  max_buffer_size     (integer) :default => nil (Use default of ruby-kafka)
  max_buffer_bytesize (integer) :default => nil (Use default of ruby-kafka)
</match>

This plugin also supports ruby-kafka related parameters. See Buffered output plugin section.

Note: Java based Kafka client uses murmur2 as partitioner function by default. If you want to use same partitioning behavior with fluent-plugin-kafka, change it to murmur2 instead of crc32. Note that for using murmur2 hash partitioner function, you must install digest-murmurhash gem.

rdkafka based output plugin

This plugin uses rdkafka instead of ruby-kafka for kafka client. You need to install rdkafka gem.

# rdkafka is C extension library. Need to install development tools like ruby-devel, gcc and etc
# for v0.12 or later
$ gem install rdkafka --no-document
# for v0.11 or earlier
$ gem install rdkafka -v 0.6.0 --no-document

rdkafka2 is for fluentd v1.0 or later.

<match app.**>
  @type rdkafka2

  brokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly

  topic_key             (string) :default => 'topic'
  default_topic         (string) :default => nil
  partition_key         (string) :default => 'partition'
  partition_key_key     (string) :default => 'partition_key'
  message_key_key       (string) :default => 'message_key'
  default_topic         (string) :default => nil
  use_default_for_unknown_topic           (bool) :default => false
  use_default_for_unknown_partition_error (bool) :default => false
  default_partition_key (string) :default => nil
  default_message_key   (string) :default => nil
  exclude_topic_key     (bool) :default => false
  exclude_partition_key (bool) :default => false
  discard_kafka_delivery_failed (bool) :default => false (No discard)
  use_event_time        (bool) :default => false

  # same with kafka2
  headers               (hash) :default => {}
  headers_from_record   (hash) :default => {}
  record_key            (string) :default => nil

  <format>
    @type (json|ltsv|msgpack|attr:<record name>|<formatter name>) :default => json
  </format>

  # Optional. See https://docs.fluentd.org/v/1.0/configuration/inject-section
  <inject>
    tag_key tag
    time_key time
  </inject>

  # See fluentd document for buffer section parameters: https://docs.fluentd.org/v/1.0/configuration/buffer-section
  # Buffer chunk key should be same with topic_key. If value is not found in the record, default_topic is used.
  <buffer topic>
    flush_interval 10s
  </buffer>

  # You can set any rdkafka configuration via this parameter: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  rdkafka_options {
    "log_level" : 7
  }

  # rdkafka2 specific parameters

  # share kafka producer between flush threads. This is mainly for reducing kafka operations like kerberos
  share_producer (bool) :default => false
  # Timeout for polling message wait. If 0, no wait.
  rdkafka_delivery_handle_poll_timeout (integer) :default => 30
  # If the record size is larger than this value, such records are ignored. Default is no limit
  max_send_limit_bytes (integer) :default => nil
  # The maximum number of enqueueing bytes per second. It can reduce the
  # load of both Fluentd and Kafka when excessive messages are attempted
  # to send. Default is no limit.
  max_enqueue_bytes_per_second (integer) :default => nil
</match>

If you use v0.12, use rdkafka instead.

<match kafka.**>
  @type rdkafka

  default_topic kafka
  flush_interval 1s
  output_data_type json

  rdkafka_options {
    "log_level" : 7
  }
</match>

FAQ

Why fluent-plugin-kafka can't send data to our kafka cluster?

We got lots of similar questions. Almost cases, this problem happens by version mismatch between ruby-kafka and kafka cluster. See ruby-kafka README for more details: https://github.com/zendesk/ruby-kafka#compatibility

To avoid the problem, there are 2 approaches:

  • Upgrade your kafka cluster to latest version. This is better because recent version is faster and robust.
  • Downgrade ruby-kafka/fluent-plugin-kafka to work with your older kafka.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Added some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

More Repositories

1

fluentd

Fluentd: Unified Logging Layer (project under CNCF)
Ruby
12,329
star
2

fluent-bit

Fast and Lightweight Logs and Metrics processor for Linux, BSD, OSX and Windows
C
5,345
star
3

fluentd-kubernetes-daemonset

Fluentd daemonset for Kubernetes and it Docker image
Ruby
1,210
star
4

fluentd-ui

Web UI for Fluentd
Ruby
596
star
5

fluent-operator

Operate Fluent Bit and Fluentd in the Kubernetes way - Previously known as FluentBit Operator
Go
535
star
6

fluent-bit-kubernetes-logging

Fluent Bit Kubernetes Daemonset
466
star
7

fluentd-docker-image

Docker image for Fluentd
Dockerfile
452
star
8

fluent-logger-python

A structured logger for Fluentd (Python)
Python
424
star
9

fluent-logger-golang

A structured logger for Fluentd (Golang)
Go
380
star
10

helm-charts

Helm Charts for Fluentd and Fluent Bit
Mustache
355
star
11

fluent-plugin-s3

Amazon S3 input and output plugin for Fluentd
Ruby
308
star
12

fluentd-forwarder

Fluentd Forwarder: Lightweight Data Collector in Golang
Go
283
star
13

fluent-logger-node

A structured logger for Fluentd (Node.js)
JavaScript
257
star
14

fluent-plugin-prometheus

A fluent plugin that collects metrics and exposes for Prometheus.
Ruby
253
star
15

fluent-logger-ruby

A structured logger for Fluentd (Ruby)
Ruby
251
star
16

fluent-logger-php

A structured logger for Fluentd (PHP)
PHP
216
star
17

fluent-logger-java

A structured logger for Fluentd (Java)
Java
205
star
18

sigdump

Use signal to show stacktrace of a Ruby process without restarting it
Ruby
183
star
19

fluent-bit-go

Fluent Bit Golang package to build plugins
Go
173
star
20

fluent-plugin-mongo

MongoDB input and output plugin for Fluentd
Ruby
171
star
21

fluent-plugin-rewrite-tag-filter

Fluentd Output filter plugin to rewrite tags that matches specified attribute.
Ruby
168
star
22

fluent-bit-docs

Fluent Bit - Official Documentation
Shell
119
star
23

fluent-plugin-grok-parser

Fluentd's Grok parser
Ruby
103
star
24

fluent-plugin-sql

SQL input/output plugin for Fluentd
Ruby
102
star
25

nginx-fluentd-module

Nginx module for Fluentd data collector
C
85
star
26

fluent-bit-docker-image

Docker image for Fluent Bit
Shell
67
star
27

fluent-plugin-webhdfs

Hadoop WebHDFS output plugin for Fluentd
Ruby
59
star
28

fluent-plugin-opensearch

OpenSearch Plugin for Fluentd
Ruby
49
star
29

fluentd-docs

This repository is deprecated. Go to fluentd-docs-gitbook repository.
Ruby
49
star
30

fluentd-benchmark

Benchmark collection of fluentd use cases
Shell
47
star
31

fluent-logger-scala

A structured logger implementation in Scala.
Shell
45
star
32

NLog.Targets.Fluentd

C#
44
star
33

fluent-logger-perl

A structured logger for Fluentd (Perl)
Perl
43
star
34

fluent-plugin-multiprocess

Multiprocess agent plugin for Fluentd
Ruby
42
star
35

fluentd-docs-gitbook

Fluentd documentation project in Gitbook format
JavaScript
41
star
36

fluent-plugin-splunk

Fluentd Plugin for Splunk
Ruby
38
star
37

fluent-plugin-parser-cri

CRI log parser for Fluentd
Ruby
32
star
38

fluent-bit-perf

Fluent Bit Performance Tools
C
31
star
39

fluent-plugin-windows-eventlog

Fluentd plugin to collect windows event logs
Ruby
31
star
40

fluent-plugin-flume

Flume input and output plugin for Fluentd
Ruby
23
star
41

kafka-connect-fluentd

Kafka Connect for Fluentd
Java
23
star
42

chunkio

Simple library to manage chunks of data in memory and file system
C
21
star
43

fluent-package-builder

td-agent (Fluentd) Building and Packaging System
Shell
21
star
44

fluent-plugin-scribe

Scribe input/output plugin for Fluentd data collector
Ruby
20
star
45

fluent-plugins

18
star
46

cmetrics

A standalone library to create and manipulate metrics in C
C
15
star
47

website

http://fluentd.org/
CSS
14
star
48

fluent-plugin-sanitizer

Ruby
14
star
49

fluent-bit-plugin

Fluent Bit Dynamic Plugin Development
C
13
star
50

fluent-bit-packaging

Fluent Bit Linux Packaging environment using Docker
Dockerfile
12
star
51

fluent-logger-forward-node

A fluent forward protocol implementation for Node.js
TypeScript
11
star
52

fluentd-website

For fluentd.org
CSS
10
star
53

fluent-logger-erlang

A structured logger for Fluentd (Erlang)
Erlang
10
star
54

fluent-plugin-msgpack-rpc

MessagePack-RPC input plugin for Fluentd data collector
Ruby
8
star
55

fluent-bit-ci

CI/CD for Fluent-bit
Shell
7
star
56

fluent-logger-ocaml

A structured logger for Fluentd (OCaml)
OCaml
7
star
57

fluent-plugin-hoop

Hoop (HDFS over HTTP) Plugin for Fluentd data collector
Ruby
6
star
58

data-collection

Data Collection with Fluentd
6
star
59

fluent-logger-d

A structured logger for Fluentd (D)
JavaScript
6
star
60

diagtool

Bringing productivity of trouble shooting to the next level by automating collection of Fluentd configurations, settings and OS parameters as well as masking sensitive information in logs and configurations.
Ruby
5
star
61

fluent-bit-tutorials

Fluent Bit Tutorials, custom articles to get started
5
star
62

m3-workshop-fluentcon

Shell
4
star
63

fluentbit-website-v3

CSS
4
star
64

fluent.github.com

website
JavaScript
4
star
65

fluentd-aggregator-docker-image

A Fluentd container image to be used for log aggregation and based on the official Fluentd Docker image.
Dockerfile
4
star
66

fluent-bit-observability-demo

JavaScript
3
star
67

fluent-bit-docs-stream-processing

Fluent Bit Stream Processing Guide
3
star
68

onigmo

Onigmo library with security and stable patches on top by Fluent maintainers
C
3
star
69

fluent-bit-website

Fluent Bit Website (work in process)
HTML
3
star
70

fluent-bit-test

Testing infrastructure for Fluent Bit
2
star
71

fluent-bit-labs

Fluent Bit Dev Labs
2
star
72

fluent-bit-website-old

Fluent Bit website
CSS
2
star
73

fluentbit-website-v2

Fluent Bit Website v2
CSS
2
star
74

fluent-plugin-buffer-chunkio

Ruby
2
star
75

fluent-bit-infra

Automation related to fluent-bit infrastructure
HCL
2
star
76

fluent-plugin-sd-dns

DNS based service discovery plugin for Fluentd
Ruby
2
star
77

fluent-plugin-parser-winevt_xml

Fluentd Parser plugin to parse XML rendered windows event log.
Ruby
1
star
78

cfl

Tiny library for data structures management, call it c:\ floppy
C
1
star
79

fluentd-docs-kubernetes

Fluentd DaemonSet Documentation for Kubernetes
1
star
80

fluent-bit-sandbox

A repository to covering the setup and configuration of the Fluent Bit Sandbox.
Shell
1
star
81

fluent-plugin-prometheus_pushgateway

Ruby
1
star
82

fluentd-website-hugo

SCSS
1
star
83

fluent-bit-chatops-demo

Demo of using Fluent Bit for ChatOps - created for Cloud Native Rejekts EU 2024 talk
Java
1
star
84

ctraces

Library to create and manipulate traces in C
C
1
star