Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
After completing this section, you should be able to apply Single Message Transformations (SMT) with Kafka Connect.
Kafka Connect provides a pluggable architecture composed of several components that you can use to build your integration pipelines. Single Message Transformations is an interface inside Kafka Connect to apply stateless transformations to the individual messages of your integration pipeline.
When using source connectors, the transformations occur before writing to Kafka.
When using sink connectors, the transformations occur before writing to the sink connector.
A transformation is a function that receives a Connect Record as input, and returns a modified Connect Record. Kafka Connect provides a number of transformations. In addition, you can implement custom transformations.
With SMTs, you can tailor the data stream that passes through your integration pipeline by only using configuration files. This, combined with the large amount of transformations included with Kafka Connect, creates a lightweight foundation of tools you can use in the integration with external systems.
Some common use cases of SMTs include:
Converting between different data formats.
Removing records from the integration pipeline.
Routing records to different topics based on their content.
Applying stateless transformations in the records content.
The KafkaConnector custom resource allows you to create and manage connector instances for Kafka Connect.
In each one of the connector instances you can define any number of Single Message Transformations.
All the transformations you want to apply in a connector must be included in the spec.config field.
The
spec.config.transformsfield lists the transformation aliases and the order in which you want to apply them.The
spec.config.transforms.$alias.typefield specifies the fully qualified class name for the transformation implementation.The
spec.config.transforms.$alias.$transformationSpecificConfigfield specifies the configuration properties for the transformation.
The following snippet shows how to add an SMT to a connector instance.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: ...output omitted... spec: config: transforms: createKeytransforms.createKey.type: org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields: productId
...output omitted...
Sets | |
Sets the fully qualified class name of the transformation | |
Defines the configuration properties for the transformation |
In the preceding example, the connector applies the ValueToKey transformation to each record that processes.
After the transformation, the record has the productId field as the record key.
Kafka Connect includes the following data and routing transformations.
Table 5.2. Single Message Transformations
| Transformation | Description |
|---|---|
InsertField
| Adds a field to the record by using static data or record metadata. |
ReplaceField
| Filters or renames fields of the record. |
MaskField
| Replaces a field in the record with a custom or a null value. |
ValueToKey
| Replaces the record key with a new key formed from a subset of fields of the record value. |
HoistField
| Wraps the entire event as a single field. |
ExtractField
| Extracts a field from a complex structure (Struct or Map), and replaces the key or value with the extracted field. |
SetSchemaMetadata
| Modifies the schema name or version of the record. |
TimestampRouter
| Updates the topic of a record based on the original topic and the record time stamp. |
RegexRouter
| Updates the topic of a record based on a regular expression, and a replacement string. |
Filter
| Removes records from the integration pipeline by using a predicate. |
For a more fine-grained control over the transformations, you can use predicates. Predicates are a set of rules you can add in the connector configuration that control when a transformation is applied or not.
Developers often need to apply multiple SMTs to each record.
To support multiple transformations, Kafka Connect supports chained transformations. That means that you can define multiple SMTs in the same integration pipeline. The transformations are applied in the same order they are defined in the configuration.
For example, assume you have a connector that gathers the current stock levels of the products of your store, and the connector receives a record with the following values.
| Record key | Record value |
|---|---|
| null |
{"id":213, "productId": 998, "stock": 0}
|
Also assume that the requirements of your integration define that the records must use the productId value as the record key.
To accomplish that requirement, you first apply a ValueToKey transformation.
This transformation copies the productId field into the record key.
After the transformation, the record looks like the following:
| Record key | Record value |
|---|---|
{"productId": 998}
|
{"id":213, "productId": 998, "stock": 0}
|
Finally, you apply the extractField transformation on the record key.
This transformation extracts the 998 value and replaces the entire record key with this value.
The resulting record is a record that has the product ID as the record key.
| Record key | Record value |
|---|---|
998
|
{"id":213, "productId": 998, "stock": 0}
|
The following snippet shows how to implement the chained transformations from the example within a KafkaConnector.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: ...output omitted... spec: config: transforms: createKey, extractValuetransforms.createKey.type: org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields: productId
transforms.extractValue.type: org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractValue.field: productId
...output omitted...
List of transformation alias in the order they are used.
The connector applies first the transformation with alias | |
Fully qualified class name of the | |
Configuration properties for the | |
Fully qualified class name of the | |
Configuration properties for the |