Bookmark this page

Transforming Messages

Objectives

After completing this section, you should be able to apply Single Message Transformations (SMT) with Kafka Connect.

Defining Single Message Transformations

Kafka Connect provides a pluggable architecture composed of several components that you can use to build your integration pipelines. Single Message Transformations is an interface inside Kafka Connect to apply stateless transformations to the individual messages of your integration pipeline.

When using source connectors, the transformations occur before writing to Kafka.

Figure 5.7: SMT on a pipeline that streams data to Kafka

When using sink connectors, the transformations occur before writing to the sink connector.

Figure 5.8: SMT on a pipeline that streams data from Kafka

A transformation is a function that receives a Connect Record as input, and returns a modified Connect Record. Kafka Connect provides a number of transformations. In addition, you can implement custom transformations.

Use Cases of Single Message Transformations

With SMTs, you can tailor the data stream that passes through your integration pipeline by only using configuration files. This, combined with the large amount of transformations included with Kafka Connect, creates a lightweight foundation of tools you can use in the integration with external systems.

Some common use cases of SMTs include:

  • Converting between different data formats.

  • Removing records from the integration pipeline.

  • Routing records to different topics based on their content.

  • Applying stateless transformations in the records content.

Configuring the Kafka Connectors for SMTs

The KafkaConnector custom resource allows you to create and manage connector instances for Kafka Connect. In each one of the connector instances you can define any number of Single Message Transformations.

All the transformations you want to apply in a connector must be included in the spec.config field.

  • The spec.config.transforms field lists the transformation aliases and the order in which you want to apply them.

  • The spec.config.transforms.$alias.type field specifies the fully qualified class name for the transformation implementation.

  • The spec.config.transforms.$alias.$transformationSpecificConfig field specifies the configuration properties for the transformation.

The following snippet shows how to add an SMT to a connector instance.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  ...output omitted...
spec:
  config:
    transforms: createKey 1
    transforms.createKey.type: org.apache.kafka.connect.transforms.ValueToKey 2
    transforms.createKey.fields: productId 3
    ...output omitted...

1

Sets createKey as the alias for the transformation

2

Sets the fully qualified class name of the transformation

3

Defines the configuration properties for the transformation

In the preceding example, the connector applies the ValueToKey transformation to each record that processes. After the transformation, the record has the productId field as the record key.

Applying Transformations

Kafka Connect includes the following data and routing transformations.

Table 5.2. Single Message Transformations

TransformationDescription
InsertField Adds a field to the record by using static data or record metadata.
ReplaceField Filters or renames fields of the record.
MaskField Replaces a field in the record with a custom or a null value.
ValueToKey Replaces the record key with a new key formed from a subset of fields of the record value.
HoistField Wraps the entire event as a single field.
ExtractField Extracts a field from a complex structure (Struct or Map), and replaces the key or value with the extracted field.
SetSchemaMetadata Modifies the schema name or version of the record.
TimestampRouter Updates the topic of a record based on the original topic and the record time stamp.
RegexRouter Updates the topic of a record based on a regular expression, and a replacement string.
Filter Removes records from the integration pipeline by using a predicate.

For a more fine-grained control over the transformations, you can use predicates. Predicates are a set of rules you can add in the connector configuration that control when a transformation is applied or not.

Chaining Transformations

Developers often need to apply multiple SMTs to each record.

To support multiple transformations, Kafka Connect supports chained transformations. That means that you can define multiple SMTs in the same integration pipeline. The transformations are applied in the same order they are defined in the configuration.

For example, assume you have a connector that gathers the current stock levels of the products of your store, and the connector receives a record with the following values.

Record keyRecord value
null {"id":213, "productId": 998, "stock": 0}

Also assume that the requirements of your integration define that the records must use the productId value as the record key.

To accomplish that requirement, you first apply a ValueToKey transformation. This transformation copies the productId field into the record key. After the transformation, the record looks like the following:

Record keyRecord value
{"productId": 998} {"id":213, "productId": 998, "stock": 0}

Finally, you apply the extractField transformation on the record key. This transformation extracts the 998 value and replaces the entire record key with this value. The resulting record is a record that has the product ID as the record key.

Record keyRecord value
998 {"id":213, "productId": 998, "stock": 0}

The following snippet shows how to implement the chained transformations from the example within a KafkaConnector.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  ...output omitted...
spec:
  config:
    transforms: createKey, extractValue 1
    transforms.createKey.type: org.apache.kafka.connect.transforms.ValueToKey 2
    transforms.createKey.fields: productId 3
    transforms.extractValue.type: org.apache.kafka.connect.transforms.ExtractField$Key 4
    transforms.extractValue.field: productId 5
    ...output omitted...

1

List of transformation alias in the order they are used. The connector applies first the transformation with alias createKey, and then the one with alias extractValue.

2

Fully qualified class name of the createKey transformation.

3

Configuration properties for the createKey transformation.

4

Fully qualified class name of the extractValue transformation.

5

Configuration properties for the extractValue transformation.

Revision: ad482-1.8-cc2ae1c