• Stars
    star
    154
  • Rank 242,095 (Top 5 %)
  • Language
    Java
  • License
    Apache License 2.0
  • Created about 9 years ago
  • Updated 3 months ago

Reviews

There are no reviews yet. Be the first to send feedback to the community and the maintainers!

Repository Details

High throughput data ingestion logger to Fluentd, AWS S3 and Treasure Data

Fluency

Maven Central Coverage Status

High throughput data ingestion logger to Fluentd and Fluent Bit (and AWS S3 and Treasure Data.)

This document is for version 2. If you're looking for a document for version 1, see this.

Ingestion to Fluentd

Features

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-fluentd:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-fluentd</artifactId>
    <version>${fluency.version}</version>
</dependency>

Usage

Create Fluency instance

For single Fluentd
// Single Fluentd(localhost:24224 by default)
//   - TCP heartbeat (by default)
//   - Asynchronous flush (by default)
//   - Without ack response (by default)
//   - Flush attempt interval is 600ms (by default)
//   - Initial chunk buffer size is 1MB (by default)
//   - Threshold chunk buffer size to flush is 4MB (by default)
//   - Threshold chunk buffer retention time to flush is 1000 ms (by default)
//   - Max total buffer size is 512MB (by default)
//   - Use off heap memory for buffer pool (by default)
//   - Max retries of sending events is 8 (by default)
//   - Max wait until all buffers are flushed is 10 seconds (by default)
//   - Max wait until the flusher is terminated is 10 seconds (by default)
//   - Socket connection timeout is 5000 ms (by default)
//   - Socket read timeout is 5000 ms (by default)
Fluency fluency = new FluencyBuilderForFluentd().build();
For multiple Fluentd with failover
// Multiple Fluentd(localhost:24224, localhost:24225)
Fluency fluency = new FluencyBuilderForFluentd().build(
        Arrays.asList(
                new InetSocketAddress(24224),
                new InetSocketAddress(24225)));
Enable ACK response mode
// Single Fluentd(localhost:24224)
//   - With ack response
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setAckResponseMode(true);
Fluency fluency = builder.build();
Enable file backup mode

In this mode, Fluency takes backup of unsent memory buffers as files when closing and then resends them when restarting

// Single Fluentd(localhost:24224)
//   - Backup directory is the temporary directory
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFileBackupDir(System.getProperty("java.io.tmpdir"));
Fluency fluency = builder.build();
Buffer configuration

Fluency has some parameters to configure a flush timing of buffer. This diagram may help to understand it. buffer-flush

For high throughput data ingestion with high latency
// Single Fluentd(xxx.xxx.xxx.xxx:24224)
//   - Initial chunk buffer size is 16MB
//   - Threshold chunk buffer size to flush is 64MB
//     Keep this value (BufferRetentionSize) between `Initial chunk buffer size` and `Max total buffer size`
//   - Max total buffer size = 1024MB
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setBufferChunkInitialSize(16 * 1024 * 1024);
builder.setBufferChunkRetentionSize(64 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
Fluency fluency = builder.build("xxx.xxx.xxx.xxx", 24224);
Socket configuration
// Single Fluentd(localhost:24224)
//   - Socket connection timeout is 15000 ms
//   - Socket read timeout is 10000 ms
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setConnectionTimeoutMilli(15000);
builder.setReadTimeoutMilli(10000);
Fluency fluency = builder.build();
Waits on close sequence
// Single Fluentd(localhost:24224)
//   - Max wait until all buffers are flushed is 30 seconds
//   - Max wait until the flusher is terminated is 40 seconds
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setWaitUntilBufferFlushed(30);
builder.setWaitUntilFlusherTerminated(40);
Fluency fluency = builder.build();
Register Jackson modules
// Single Fluentd(localhost:24224)
//   - SimpleModule that has FooSerializer is enabled
SimpleModule simpleModule = new SimpleModule();
simpleModule.addSerializer(Foo.class, new FooSerializer());
FluentdRecordFormatter.Config recordFormatterConfig =
	new FluentdRecordFormatter.Config();
recordFormatterConfig.setJacksonModules(
	Collections.singletonList(simpleModule));
FluencyBuilderForFluentd builder = new FluencyBuilder();
builder.setRecordFormatter(new FluentdRecordFormatter(recordFormatterConfig));

Fluency fluency = builder.build();
Set a custom error handler
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setErrorHandler(ex -> {
  // Send a notification
});
Fluency fluency = builder.build();

    :

// If flushing events to Fluentd fails and retried out, the error handler is called back.
fluency.emit("foo.bar", event);
Send requests over SSL/TLS
// Single Fluentd(localhost:24224)
//   - Enable SSL/TLS
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setSslEnabled(true);

// Or, provide your own SSLSocketFactory at runtime (replace with your own)
builder.setSslSocketFactory(SSLSocketFactory.getDefault())

Fluency fluency = builder.build();

If you want to use a custom truststore, specify the JKS file path using -Djavax.net.ssl.trustStore (and -Djavax.net.ssl.trustStorePassword if needed). You can create a custom truststore like this:

$ keytool -import -file server.crt -alias mytruststore -keystore truststore.jks

For server side configuration, see https://docs.fluentd.org/v1.0/articles/in_forward#how-to-enable-tls/ssl-encryption .

Mutual TLS

See this project.

Other configurations
// Multiple Fluentd(localhost:24224, localhost:24225)
//   - Flush attempt interval is 200ms
//   - Max retry of sending events is 12
//   - Use JVM heap memory for buffer pool
FluencyBuilderForFluentd builder = new FluencyBuilderForFluentd();
builder.setFlushAttemptIntervalMillis(200);
builder.setSenderMaxRetryCount(12);
builder.setJvmHeapBufferMode(true);
Fluency fluency = builder.build(
        Arrays.asList(
                new InetSocketAddress(24224),
                new InetSocketAddress(24225));

Emit event

String tag = "foo_db.bar_tbl";
Map<String, Object> event = new HashMap<String, Object>();
event.put("name", "komamitsu");
event.put("age", 42);
event.put("rate", 3.14);
fluency.emit(tag, event);

If you want to use EventTime as a timestamp, call Fluency#emit with an EventTime object in the following way

int epochSeconds;
int nanoseconds;
    :
EventTime eventTime = EventTime.fromEpoch(epochSeconds, nanoseconds);

// You can also create an EventTime object like this
// EventTime eventTime = EventTime.fromEpochMilli(System.currentTimeMillis());

fluency.emit(tag, eventTime, event);

Error handling

Fluency#emit keeps buffered data in memory even if a retriable exception happens. But in case of buffer full, the method throws org.komamitsu.fluency.BufferFullException. There are 2 options to handle the exception.

a) Ignore the exception so that main application isn't blocked
try {
    fluency.emit(tag, event);
}
catch (BufferFullException e) {
    // Just log the error and move forward
    logger.warn("Fluency's buffer is full", e);
}
b) Retry until the data is successfully buffered
// Considering maximum retry count would be also good
while (true) {
	try {
	    fluency.emit(tag, event);
	    break;
	}
	catch (BufferFullException e) {
	    // Log the error, sleep and retry
	    logger.warn("Fluency's buffer is full. Retrying", e);
	    TimeUnit.SECONDS.sleep(5);
	}
}

Which to choose depends on how important the data is and how long the application can be blocked.

Wait until buffered data is flushed and release resource

fluency.close();

Know how much Fluency is allocating memory

LOG.debug("Memory size allocated by Fluency is {}", fluency.getAllocatedBufferSize());

Know how much Fluency is buffering unsent data in memory

LOG.debug("Unsent data size buffered by Fluency in memory is {}", fluency.getBufferedDataSize());

Ingestion to Fluentd to use extra features with Java 16 or later

Features

  • UNIX domain socket support

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-fluentd:${fluency.version}"
    compile "org.komamitsu:fluency-fluentd-ext:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-fluentd</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-fluentd-ext</artifactId>
    <version>${fluency.version}</version>
</dependency>

Usage

Create Fluency instance to communicate with Fluentd via UNIX domain socket

// Single Fluentd(UNIX socket path: /tmp/fluentd/ingest.socket)
//   - UNIX domain socket heartbeat (by default)
//   - Asynchronous flush (by default)
//   - Without ack response (by default)
//   - Flush attempt interval is 600ms (by default)
//   - Initial chunk buffer size is 1MB (by default)
//   - Threshold chunk buffer size to flush is 4MB (by default)
//   - Threshold chunk buffer retention time to flush is 1000 ms (by default)
//   - Max total buffer size is 512MB (by default)
//   - Use off heap memory for buffer pool (by default)
//   - Max retries of sending events is 8 (by default)
//   - Max wait until all buffers are flushed is 10 seconds (by default)
//   - Max wait until the flusher is terminated is 10 seconds (by default)
Fluency fluency = new FluencyExtBuilderForFluentd().build(Paths.get("/tmp/fluentd/ingest.socket");

Ingestion to Treasure Data

Features

  • Asynchronous flush
  • Backup of buffered data on local disk
  • Automatic database/table creation

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-treasuredata:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-treasuredata</artifactId>
    <version>${fluency.version}</version>
</dependency>

Create Fluency instance

Default configuration
// Asynchronous flush (by default)
// Flush attempt interval is 600ms (by default)
// Initial chunk buffer size is 4MB (by default)
// Threshold chunk buffer size to flush is 64MB (by default)
// Threshold chunk buffer retention time to flush is 30000 ms (by default)
// Max total buffer size is 512MB (by default)
// Use off heap memory for buffer pool (by default)
// Max retries of sending events is 10 (by default)
// Max wait until all buffers are flushed is 10 seconds (by default)
// Max wait until the flusher is terminated is 10 seconds (by default)
Fluency fluency = new FluencyBuilderForTreasureData().build(yourApiKey);
Buffer configuration for high throughput data ingestion with high latency
// Initial chunk buffer size is 32MB
// Threshold chunk buffer size to flush is 256MB
// Threshold chunk buffer retention time to flush is 120 seconds
// Max total buffer size is 1024MB
// Sender's working buffer size 32KB
FluencyBuilderForTreasureData builder = new FluencyBuilderForTreasureData();
builder.setBufferChunkInitialSize(32 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
builder.setBufferChunkRetentionSize(256 * 1024 * 1024);
builder.setBufferChunkRetentionTimeMillis(120 * 1000);
builder.setSenderWorkBufSize(32 * 1024);
Fluency fluency = builder.build(yourApiKey);
Customize Treasure Data endpoint
Fluency fluency = new FluencyBuilderForTreasureData()
						.build(yourApiKey, tdEndpoint);
Other configurations

Some of other usages are same as ingestion to Fluentd. See Ingestion to Fluentd > Usage above.

Ingestion to AWS S3

Features

  • Asynchronous flush
  • Backup of buffered data on local disk
  • Several format supports
    • CSV
    • JSONL
    • MessagePack
  • GZIP compression
  • Customizable S3 bucket/key decision rule

Install

Gradle

dependencies {
    compile "org.komamitsu:fluency-core:${fluency.version}"
    compile "org.komamitsu:fluency-aws-s3:${fluency.version}"
}

Maven

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-core</artifactId>
    <version>${fluency.version}</version>
</dependency>

<dependency>
    <groupId>org.komamitsu</groupId>
    <artifactId>fluency-aws-s3</artifactId>
    <version>${fluency.version}</version>
</dependency>

Create Fluency instance

Default configuration for JSONL format
// Asynchronous flush (by default)
// Flush attempt interval is 600ms (by default)
// Initial chunk buffer size is 4MB (by default)
// Threshold chunk buffer size to flush is 64MB (by default)
// Threshold chunk buffer retention time to flush is 30000 ms (by default)
// Max total buffer size is 512MB (by default)
// Use off heap memory for buffer pool (by default)
// Sender's working buffer size 8KB (by default)
// Max retries of sending events is 10 (by default)
// Initial retry interval of sending events is 1000 ms (by default)
// Retry backoff factor of sending events is 2.0 (by default)
// Max retry interval of sending events is 30000 ms (by default)
// Max wait until all buffers are flushed is 10 seconds (by default)
// Max wait until the flusher is terminated is 10 seconds (by default)
// Destination S3 bucket is specified by Fluency#emit()'s "tag" parameter (by default)
// Destination S3 key format is "yyyy/MM/dd/HH/mm-ss-SSSSSS" (by default)
// Destination S3 key is decided as UTC (by default)
// GZIP compression is enabled (by default)
// File format is JSONL
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
Fluency fluency = builder.build();
Default configuration for MessagePack format
// File format is MessagePack
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.MESSAGE_PACK);
Fluency fluency = builder.build();
Default configuration for CSV format
// File format is CSV
// Expected columns are "time", "age", "name", "comment"
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.CSV);
builder.setFormatCsvColumnNames(Arrays.asList("time", "age", "name", "comment"));
Fluency fluency = builder.build();
AWS S3 configuration

fluency-aws-s3 follows default credential provider chain. If you want to explicitly specify credentials, use the following APIs.

// AWS S3 region is "us-east-1"
// AWS S3 endpoint is "https://another.s3.endpoi.nt"
// AWS access key id is "ABCDEFGHIJKLMNOPQRST"
// AWS secret access key is "ZaQ1XsW2CdE3VfR4BgT5NhY6"
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setAwsRegion("us-east-1");
builder.setAwsEndpoint("https://another.s3.endpoi.nt");
builder.setAwsAccessKeyId("ABCDEFGHIJKLMNOPQRST");
builder.setAwsSecretAccessKey("ZaQ1XsW2CdE3VfR4BgT5NhY6");
Fluency fluency = builder.build();
Disable compression
// GZIP compression is disabled
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setCompressionEnabled(false);
Fluency fluency = builder.build();
Change timezone used in S3 key decision rule
// Destination S3 key is decided as JST
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setS3KeyTimeZoneId(ZoneId.of("JST", SHORT_IDS));
Fluency fluency = builder.build();
Customize S3 destination decision rule
// Destination S3 bucket is "fixed-bucket-name"
// Destination S3 key format is UNIX epoch seconds rounded to 1 hour range
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setCustomS3DestinationDecider((tag, time) ->
    new S3DestinationDecider.S3Destination(
        "fixed-bucket-name",
        String.format("%s-%d", tag, time.getEpochSecond() / 3600)
));
Fluency fluency = builder.build();
Buffer configuration for high throughput data ingestion with high latency
// Initial chunk buffer size is 32MB
// Threshold chunk buffer size to flush is 256MB
// Threshold chunk buffer retention time to flush is 120 seconds
// Max total buffer size is 1024MB
// Sender's working buffer size 32KB
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setBufferChunkInitialSize(32 * 1024 * 1024);
builder.setMaxBufferSize(1024 * 1024 * 1024L);
builder.setBufferChunkRetentionSize(256 * 1024 * 1024);
builder.setBufferChunkRetentionTimeMillis(120 * 1000);
builder.setSenderWorkBufSize(32 * 1024);
Fluency fluency = builder.build();
Retry configuration
// Max retries of sending events is 16
// Initial retry interval of sending events is 500 ms
// Retry backoff factor of sending events is 1.5
// Max retry interval of sending events is 20000 ms
FluencyBuilderForAwsS3 builder = new FluencyBuilderForAwsS3();
builder.setFormatType(FluencyBuilderForAwsS3.FormatType.JSONL);
builder.setSenderRetryMax(16);
builder.setSenderRetryIntervalMillis(500);
builder.setSenderRetryFactor(1.5f);
builder.setSenderMaxRetryIntervalMillis(20000);
Fluency fluency = builder.build();
Other configurations

Some of other usages are same as ingestion to Fluentd. See Ingestion to Fluentd > Usage above.

More Repositories

1

Android-OCRSample

Android OCR example application which uses Google Text Recognition API
Java
111
star
2

phi-accrual-failure-detector

A port of Akka's Phi Accrual Failure Detector
Java
53
star
3

oraft

Library of Raft consensus algorithm implemented in OCaml
OCaml
27
star
4

jackson-dataformat-msgpack

(This project was merged to https://github.com/msgpack/msgpack-java/tree/v07-develop/msgpack-jackson) Jackson extension that adds support for MessagePack(v0.7) / msgpack.org[Jackson-dataformat] http://msgpack.org/
Java
26
star
5

foreign_table_exposer

Expose foreign tables as a regular table. This is useful when using some BI tools that don't support foreign tables.
C
15
star
6

spring-data-sqlite

Java
14
star
7

td-fdw

Multicorn based PostgreSQL Foreign Data Wrapper for Treasure Data
Python
12
star
8

Spring-MVC-sample-using-iBatis

Spring MVC sample project, which is using iBatis. Now it's still under construction.
Java
11
star
9

rust-lock-bench

Rust
10
star
10

retrofit-converter-msgpack

Retrofit Converter for MessagePack / msgpack.org[Retrofit converter (Java)]
Java
9
star
11

td-client-rust

Rust Client Library for Treasure Data
Rust
9
star
12

OCatra

Simple web server library for OCaml like Sinatra
OCaml
8
star
13

fast-uuid-parser

A UUID string parser that is 4 times faster than java.util.UUID.fromString()
Java
6
star
14

tiny-kotlin-lisp

Tiny Lisp compiler/interpreter written in Kotlin
Kotlin
6
star
15

gc_monitor

Ruby module for monitoring GC
Ruby
5
star
16

OCaml-minilisp

mini lisp in OCaml
OCaml
5
star
17

chaos-dukey

Java agent to inject chaos delays into specific Java classes and methods
Java
3
star
18

ocaml-suffixarray

OCaml
2
star
19

konessem

NES emulator written in Kotlin
Kotlin
2
star
20

tpch_to_td_importer

TPC-H data importer into TreasureData storage
Shell
2
star
21

msgpack-rpc-android-example

Java
2
star
22

android-naver-topic-widget

Android Widget App. It displays NAVER Japan top page news.
Java
2
star
23

spring-data-jdbc-single-ds-example

Java
1
star
24

embulk-input-redis

Embulk input plugin for Redis
Ruby
1
star
25

spring-data-jdbc-multi-ds-example

An example application for Spring Data JDBC with multiple data sources
Java
1
star
26

leakmon

A Ruby library to monitor leaked objects
Ruby
1
star
27

android-logger-bridge

android-logger-bridge enables using Android Log util on non Android envinronment. If this library runs on Android, it outputs to logcat. If not, it outputs to stdout.
Java
1
star
28

mod_memc_sess

Apache DSO for checking a session stored in Memcached.
C
1
star
29

opengl-sample

Java
1
star
30

tiny-rust-lisp

Rust
1
star
31

message_passing

This is a method for creating real message passing (via TCP/IP) class. The method is using dRuby.
Ruby
1
star
32

fluent-logger-ocaml

A structured logger for Fluentd (OCaml)
OCaml
1
star
33

turing_machine_as3

Turing machine sample in ActionScript3.
ActionScript
1
star
34

ocaml-b_tree

OCaml Library for B-Tree
OCaml
1
star