kafka-delta-ingest
The kafka-delta-ingest project aims to build a highly efficient daemon for streaming data through Apache Kafka into Delta Lake.
This project is currently highly experimental and evolving in tandem with the delta-rs bindings.
Features
-
Multiple worker processes per stream
-
Basic transformations within message
-
Statsd metric output
See the design doc for more details.
Example
The repository includes an example for trying out the application locally with some fake web request data.
The included docker-compose.yml contains kafka and localstack services you can run kafka-delta-ingest
against locally.
Starting Worker Processes
-
Launch test services -
docker-compose up setup
-
Compile:
cargo build
-
Run kafka-delta-ingest against the web_requests example topic and table (customize arguments as desired):
export AWS_ENDPOINT_URL=http://0.0.0.0:4566
export AWS_ACCESS_KEY_ID=test
export AWS_SECRET_ACCESS_KEY=test
RUST_LOG=debug cargo run ingest web_requests ./tests/data/web_requests \
--allowed_latency 60 \
--app_id web_requests \
--transform 'date: substr(meta.producer.timestamp, `0`, `10`)' \
'meta.kafka.offset: kafka.offset' \
'meta.kafka.partition: kafka.partition' \
'meta.kafka.topic: kafka.topic' \
--auto_offset_reset earliest
Notes:
-
The AWS_* environment variables are for S3 and are required by the delta-rs library.
-
Above, AWS_ENDPOINT_URL points to localstack.
-
-
The Kafka broker is assumed to be at localhost:9092, use -k to override.
-
To clean data from previous local runs, execute
./bin/clean-example-data.sh
. You’ll need to do this if you destroy your Kafka container between runs since your delta log directory will be out of sync with Kafka offsets.
Kafka SSL
In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.
For the cert chain include the PEM content as an environment variable named KAFKA_DELTA_INGEST_CERT
.
For the cert private key include the PEM content as an environment variable named KAFKA_DELTA_INGEST_KEY
.
These will be set as the ssl.certificate.pem
and ssl.key.pem
Kafka settings respectively.
Make sure to provide the additional option:
-K security.protocol=SSL
when invoking the cli command as well.
Example Data
A tarball containing 100K line-delimited JSON messages is included in tests/json/web_requests-100K.json.tar.gz
. Running ./bin/extract-example-json.sh
will unpack this to the expected location.
Pretty-printed example from the file
{
"meta": {
"producer": {
"timestamp": "2021-03-24T15:06:17.321710+00:00"
}
},
"method": "DELETE",
"session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
"status": 204,
"url": "http://www.youku.com",
"uuid": "831c6afa-375c-4988-b248-096f9ed101f8"
}
After extracting the example data, you’ll need to play this onto the web_requests topic of the local Kafka container.
Note
|
URLs sampled for the test data are sourced from Wikipedia’s list of most popular websites - https://en.wikipedia.org/wiki/List_of_most_popular_websites. |
Inspect example output
-
List data files -
ls tests/data/web_requests/date=2021-03-24
-
List delta log files -
ls tests/data/web_requests/_delta_log
-
Show some parquet data (using parquet-tools)
-
parquet-tools show tests/data/web_requests/date=2021-03-24/<some file written by your example>
-
Using Azure Event Hubs
Azure Event Hubs (with pricing tier "Standard" or higher) has a Kafka Surface that can be used with kafka-delta-ingest.
Azure Event Hubs doesn’t have a local emulator, so an actual Azure Event Hubs resource is required. As a result, there’s no need for the docker-compose application described above.
More info:
-
https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-migration-guide
-
https://docs.microsoft.com/en-us/azure/event-hubs/apache-kafka-troubleshooting-guide
-
https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka
-
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
Starting Worker Processes
-
Create an Azure Event Hubs Namespace and within it, an Event Hub (which corresponds to a Kafka topic).
-
Set these environment variables, they are required by the delta-rs library:
-
AZURE_STORAGE_ACCOUNT_NAME
(just the storage account name, not the FQDN) -
AZURE_STORAGE_ACCOUNT_KEY
(just the key, not the connection string)
-
-
Create the
_delta_log
directory in theweb_requests
directory in Azure Storage and upload the first Delta transaction containing the schema to this directory. -
In the docker command below, replace the following placeholders with your values:
-
AZURE_STORAGE_ACCOUNT_NAME
(just the storage account name, not the FQDN) -
AZURE_STORAGE_ACCOUNT_KEY
(just the key, not the connection string) -
EVENTHUBS_NAMESPACE_NAME
(just the namespace name, not the FQDN) -
EVENTHUBS_KEY_NAME
-
EVENTHUBS_KEY
-
-
Build the docker image
docker build -t kdi:0.1 . -f Dockerfile-Debian
Notes:
-
If this takes a long time, make sure that docker has enough memory
-
Execute this docker command to run kafka-delta-ingest
-
docker run -it --network=host ^
-e RUST_LOG="debug" ^
-e SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt ^
-e AZURE_STORAGE_ACCOUNT_NAME={AZURE_STORAGE_ACCOUNT_NAME} ^
-e "AZURE_STORAGE_ACCOUNT_KEY={AZURE_STORAGE_ACCOUNT_KEY}" ^
kdi:0.1 ^
ingest web_requests adls2://{AZURE_STORAGE_ACCOUNT_NAME}/{FILESYSTEM_NAME}/web_requests ^
--allowed_latency 5 ^
--kafka thovoll-kdi-eh.servicebus.windows.net:9093 ^
--Kafka security.protocol=SASL_SSL ^
--Kafka sasl.mechanism=PLAIN ^
--Kafka sasl.username=$ConnectionString ^
--Kafka sasl.password=Endpoint=sb://{EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName={EVENTHUBS_KEY_NAME};SharedAccessKey={EVENTHUBS_KEY} ^
--Kafka socket.keepalive.enable=true ^
--Kafka metadata.max.age.ms=180000 ^
--Kafka heartbeat.interval.ms=3000 ^
--Kafka session.timeout.ms=30000 ^
--Kafka debug=broker,security,protocol ^
--app_id web_requests ^
--transform "date: substr(meta.producer.timestamp, `0`, `10`)" ^
--transform "meta.kafka.offset: kafka.offset" ^
--transform "meta.kafka.partition: kafka.partition" ^
--transform "meta.kafka.topic: kafka.topic" ^
--auto_offset_reset earliest
Notes:
-
In the docker command:
-
The
sasl.username
is the literal string$ConnectionString
and not a placeholder. -
The following
--Kafka
arguments are taken from here:-
socket.keepalive.enable=true
-
metadata.max.age.ms=180000
-
heartbeat.interval.ms=3000
-
session.timeout.ms=30000
-
-
Sending data to Event Hubs
On Windows, Service Bus Explorer can be used to send data to Event Hubs.
The following payload should be sent for the web_requests Delta table:
{
"status": 200,
"session_id": "7c28bcf9-be26-4d0b-931a-3374ab4bb458",
"method": "GET",
"meta": {
"producer": {
"timestamp": "2021-03-24T15:06:17.321710+00:00"
}
},
"uuid": "831c6afa-375c-4988-b248-096f9ed101f8",
"url": "http://www.example.com"
}
Verifying data from Event Hub using kcat
kcat can be run on Windows via docker using this command, which will print the last message (-o -1).
Make sure to first replace the following placeholders:
-
EVENTHUBS_NAMESPACE_NAME
(just the namespace name, not the FQDN) -
EVENTHUBS_KEY_NAME
-
EVENTHUBS_KEY
docker run -it --network=host edenhill/kcat:1.7.1 -C -o -1 -b {EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net:9093 -t web_requests -X security.protocol=SASL_SSL -X sasl.mechanism=PLAIN -X sasl.username=$ConnectionString -X sasl.password=Endpoint=sb://{EVENTHUBS_NAMESPACE_NAME}.servicebus.windows.net/;SharedAccessKeyName={EVENTHUBS_KEY_NAME};SharedAccessKey={EVENTHUBS_KEY} -X socket.keepalive.enable=true -X metadata.max.age.ms=180000 -X heartbeat.interval.ms=3000 -X session.timeout.ms=30000
Notes:
-
The following configuration settings in the command above are taken from here:
-X socket.keepalive.enable=true -X metadata.max.age.ms=180000 -X heartbeat.interval.ms=3000 -X session.timeout.ms=30000
Kafka SSL
In case you have Kafka topics secured by SSL client certificates, you can specify these secrets as environment variables.
For the cert chain include the PEM content as an environment variable named KAFKA_DELTA_INGEST_CERT
.
For the cert private key include the PEM content as an environment variable named KAFKA_DELTA_INGEST_KEY
.
These will be set as the ssl.certificate.pem
and ssl.key.pem
Kafka settings respectively.
Make sure to provide the additional option:
-K security.protocol=SSL
when invoking the cli command as well.
Writing to S3
When writing to S3, you may experience an error like source: StorageError { source: S3Generic("dynamodb locking is not enabled") }
.
A locking mechanism is need to prevent unsafe concurrent writes to a delta lake directory, and DynamoDB is an option for this. To use DynamoDB, set the AWS_S3_LOCKING_PROVIDER
variable to dynamodb
and create a table named delta_rs_lock_table
in Dynamo. An example DynamoDB table creation snippet using the aws CLI follows, and should be customized for your environment’s needs (e.g. read/write capacity modes):
aws dynamodb create-table --table-name delta_rs_lock_table \
--attribute-definitions \
AttributeName=key,AttributeType=S \
--key-schema \
AttributeName=key,KeyType=HASH \
--provisioned-throughput \
ReadCapacityUnits=10,WriteCapacityUnits=10
For more information, see DynamoDB lock. ==== Verifying data in Azure Storage
Use the Azure Portal to browse the file system:
-
Data files:
web_requests/date=2021-03-24
-
Delta log files:
web_requests/_delta_log
Developing
Make sure the docker-compose setup has been ran, and execute cargo test
to run unit and integration tests.