Introduction
This project contains common transformations for every day use cases with Kafka Connect.
Installation
Confluent Hub
The following command can be used to install the plugin directly from the Confluent Hub using the Confluent Hub Client.
confluent-hub install jcustenborder/kafka-connect-transform-common:latest
Manually
The zip file that is deployed to the Confluent Hub is available under
target/components/packages/
. You can manually extract this zip file which includes all dependencies. All the dependencies
that are required to deploy the plugin are under target/kafka-connect-target
as well. Make sure that you include all the dependencies that are required
to run the plugin.
- Create a directory under the
plugin.path
on your Connect worker. - Copy all of the dependencies under the newly created subdirectory.
- Restart the Connect worker.
Transformations
BytesToString
Key
com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.BytesToString$Value
Configuration
General
charset
The charset to use when creating the output string.
Importance: HIGH
Type: STRING
Default Value: UTF-8
fields
The fields to transform.
Importance: HIGH
Type: LIST
ChangeCase
Key
com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ChangeCase$Value
Configuration
General
from
The format to move from
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
to
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
ChangeTopicCase
com.github.jcustenborder.kafka.connect.transform.common.ChangeTopicCase
This transformation is used to change the case of a topic.
Tip
This transformation will convert a topic name like 'TOPIC_NAME' to topicName
, or topic_name
.
Configuration
General
from
The format of the incoming topic name. LOWER_CAMEL
= Java variable naming convention, e.g., "lowerCamel". LOWER_HYPHEN
= Hyphenated variable naming convention, e.g., "lower-hyphen". LOWER_UNDERSCORE
= C++ variable naming convention, e.g., "lower_underscore". UPPER_CAMEL
= Java and C++ class naming convention, e.g., "UpperCamel". UPPER_UNDERSCORE
= Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE".
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
to
The format of the outgoing topic name. LOWER_CAMEL
= Java variable naming convention, e.g., "lowerCamel". LOWER_HYPHEN
= Hyphenated variable naming convention, e.g., "lower-hyphen". LOWER_UNDERSCORE
= C++ variable naming convention, e.g., "lower_underscore". UPPER_CAMEL
= Java and C++ class naming convention, e.g., "UpperCamel". UPPER_UNDERSCORE
= Java and C++ constant naming convention, e.g., "UPPER_UNDERSCORE".
Importance: HIGH
Type: STRING
Validator: Matches: LOWER_HYPHEN
, LOWER_UNDERSCORE
, LOWER_CAMEL
, UPPER_CAMEL
, UPPER_UNDERSCORE
ExtractNestedField
Key
com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ExtractNestedField$Value
Configuration
General
input.inner.field.name
The field on the child struct containing the field to be extracted. For example if you wanted the extract address.state
you would use state
.
Importance: HIGH
Type: STRING
input.outer.field.name
The field on the parent struct containing the child struct. For example if you wanted the extract address.state
you would use address
.
Importance: HIGH
Type: STRING
output.field.name
The field to place the extracted value into.
Importance: HIGH
Type: STRING
ExtractTimestamp
Key
com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ExtractTimestamp$Value
This transformation is used to use a field from the input data to override the timestamp for the record.
Configuration
General
field.name
The field to pull the timestamp from. This must be an int64 or a timestamp.
Importance: HIGH
Type: STRING
HeaderToField
Key
com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.HeaderToField$Value
Configuration
General
header.mappings
The mapping of the header to the field in the message.
Importance: HIGH
Type: LIST
NormalizeSchema
Key
com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value
This transformation is used to convert older schema versions to the latest schema version. This works by keying all of the schemas that are coming into the transformation by their schema name and comparing the version() of the schema. The latest version of a schema will be used. Schemas are discovered as the flow through the transformation. The latest version of a schema is what is used.
Configuration
PatternFilter
Key
com.github.jcustenborder.kafka.connect.transform.common.PatternFilter$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.PatternFilter$Value
Configuration
General
pattern
The regex to test the message with.
Importance: HIGH
Type: STRING
Validator: com.github.jcustenborder.kafka.connect.utils.config.validators.PatternValidator@4170ee0f
fields
The fields to transform.
Importance: HIGH
Type: LIST
PatternRename
Key
com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.PatternRename$Value
Configuration
General
field.pattern
Importance: HIGH
Type: STRING
field.replacement
Importance: HIGH
Type: STRING
field.pattern.flags
Importance: LOW
Type: LIST
Default Value: [CASE_INSENSITIVE]
Validator: [UNICODE_CHARACTER_CLASS, CANON_EQ, UNICODE_CASE, DOTALL, LITERAL, MULTILINE, COMMENTS, CASE_INSENSITIVE, UNIX_LINES]
SchemaNameToTopic
Key
com.github.jcustenborder.kafka.connect.transform.common.SchemaNameToTopic$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SchemaNameToTopic$Value
This transformation is used to take the name from the schema for the key or value and replace the topic with this value.
Configuration
SetMaximumPrecision
Key
com.github.jcustenborder.kafka.connect.transform.common.SetMaximumPrecision$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SetMaximumPrecision$Value
This transformation is used to ensure that all decimal fields in a struct are below the maximum precision specified.
Note
The Confluent AvroConverter uses a default precision of 64 which can be too large for some database systems.
Configuration
General
precision.max
The maximum precision allowed.
Importance: HIGH
Type: INT
Validator: [1,...,64]
SetNull
Key
com.github.jcustenborder.kafka.connect.transform.common.SetNull$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.SetNull$Value
Configuration
TimestampNow
com.github.jcustenborder.kafka.connect.transform.common.TimestampNow
This transformation is used to override the timestamp of the incoming record to the time the record is being processed.
Configuration
TimestampNowField
Key
com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.TimestampNowField$Value
This transformation is used to set a field with the current timestamp of the system running the transformation.
Configuration
General
fields
The field(s) that will be inserted with the timestamp of the system.
Importance: HIGH
Type: LIST
ToJSON
Key
com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ToJSON$Value
Configuration
General
output.schema.type
The connect schema type to output the converted JSON as.
Importance: MEDIUM
Type: STRING
Default Value: STRING
Validator: [STRING, BYTES]
schemas.enable
Flag to determine if the JSON data should include the schema.
Importance: MEDIUM
Type: BOOLEAN
ToLong
Key
com.github.jcustenborder.kafka.connect.transform.common.ToLong$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.ToLong$Value
Configuration
General
fields
The fields to transform.
Importance: HIGH
Type: LIST
TopicNameToField
Key
com.github.jcustenborder.kafka.connect.transform.common.TopicNameToField$Key
Value
com.github.jcustenborder.kafka.connect.transform.common.TopicNameToField$Value
Configuration
General
field
The field to insert the topic name.
Importance: HIGH
Type: STRING
Development
Building the source
mvn clean package
Contributions
Contributions are always welcomed! Before you start any development please create an issue and
start a discussion. Create a pull request against your newly created issue and we're happy to see
if we can merge your pull request. First and foremost any time you're adding code to the code base
you need to include test coverage. Make sure that you run mvn clean package
before submitting your
pull to ensure that all of the tests, checkstyle rules, and the package can be successfully built.