Bookmark this page

Chapter 5. Integrating Data Systems with Kafka Connect

Abstract

Goal Connect data systems and react to data changes using Kafka Connect and Debezium.
Objectives
  • Create a Kafka Connect cluster.

  • Create a Kafka Connect connector.

  • Apply Single Message Transformations (SMT) with Kafka Connect.

  • Capture change event data with Debezium.

Sections
  • Creating a Connect Cluster (and Guided Exercise)

  • Creating Connectors (and Guided Exercise)

  • Transforming Messages (and Guided Exercise)

  • Capturing Change Event Data with Debezium (and Guided Exercise)

Lab

Integrating Data Systems with Kafka Connect

Creating a Connect Cluster

Objectives

After completing this section, you should be able to create a Kafka Connect cluster.

Defining Kafka Connect

Kafka Connect is a tool for reliably streaming data between Apache Kafka and external systems. It is both scalable and flexible. You can use Kafka Connect to integrate Kafka with any system, such as databases, key-value stores, search indexes and file systems.

Figure 5.1: High-level Kafka Connect workflow

Without Kafka Connect, developers must build their own applications to integrate Kafka with an external system. For each integration point, there must be a custom application that performs the data transfer between the external systems. The custom applications do not share a custom common contract unless it is implemented. Implementing a custom contract that provides reusability and flexibility is not easy. It also creates the risk of reliability and scalability issues because of the possible defects in the custom integration applications.

Benefits of Kafka Connect

Kafka Connect, as a ready-to-use mechanism, has many benefits over the do-it-yourself (DIY) approach, including the following:

Data Centric Approach

Data is at the center of Kafka Connect. Kafka Connect forces you to only work with data formatting and where to push or pull the data from. You do not have to deal with the consumer and producer implementations and type conversions to implement an integration logic.

Scalability and Flexibility

Kafka Connect is very flexible because it can make integration possible with any external system such as databases, search indexes, file systems, etc. With Kafka Connect's distributed mode, you can also scale Kafka Connect instances.

Reusability and Extensibility

Kafka Connect provides ready-to-use, reusable connectors. It also includes an API to build connectors or extend the current connectors for customization.

For many streaming data integration cases, providing all the preceding benefits, Kafka Connect is a lightweight, Kafka-centric solution for integrating systems and Apache Kafka.

Describing the Kafka Connect Structure

Kafka Connect consists of two main components: the Kafka Connect cluster and the Kafka Connect connectors.

A Kafka Connect Cluster consists of one or multiple nodes called workers. This is very similar to a Kafka cluster because the workload can be distributed across the nodes.

Figure 5.2: Kafka Connect workers
Workers

A worker is a running process in a Kafka Connect cluster. This process is responsible for executing a set of components that transfer data from an external system to Kafka or vice versa. These sets of components are called connectors and tasks.

Connectors

Kafka Connect connectors are ready-to-use prebuilt components. Connector instances are logical jobs that are responsible for streaming data between an external system and Kafka.

Tasks

Tasks are the smallest units in the data model of Kafka Connect. Connector instances are responsible for coordinating a set of tasks. These tasks are the actors that actually copy the data between external systems and Kafka.

Connectors are covered in more detail in the following section.

A Kafka Connect cluster, without connectors, does not do anything by itself. The process that runs in the Kafka Connect node, will just run idly. The cluster's main responsibility is to manage the worker nodes and ensure that the connector workload per task is distributed evenly. To transfer data between Kafka and an external system, there must be actively running connector tasks.

Kafka Connect Working Modes

Kafka Connect has two working modes:

Standalone Mode

In standalone mode, only a single Kafka Connect worker is responsible for execution. This execution runs all the connectors and their tasks in the same worker. Thus, this mode is not suitable for building a highly available, scalable system. It is more suitable as a local adapter that solves a minor integration problem.

With this mode running, Kafka offsets and Kafka Connect configuration is stored locally in files.

Figure 5.3: Kafka Connect running in standalone mode
Distributed Mode

In distributed mode, there are multiple workers to distribute the workload of connector tasks. Like a Kafka cluster, a Kafka Connect cluster has a leader that is one of the worker nodes. The Leader is responsible for coordinating and balancing the task distribution for all worker nodes including itself.

Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. This helps to build a highly available system which is important for most of the production environments.

Because the nodes are distributed, you must configure a Kafka Connect cluster for keeping the configuration and the offsets stored in Kafka topics. In this way, all the workers can sync the configuration and offset data.

Figure 5.4: Kafka Connect running in distributed mode

In AMQ Streams on the Red Hat OpenShift Container Platform, Kafka Connect Clusters always run in distributed mode. The worker nodes run in containers instead of machines and can scale with the help of pods. Each pod has one Kafka Connect container that has one worker in it.

Because Kafka Connect runs in distributed mode, connector offsets, configurations and status are all stored in Kafka topics.

Configuring the Kafka Connect Cluster

In AMQ Streams on the Red Hat OpenShift Container Platform (RHOCP), Kafka Connect Clusters are configured by using the KafkaConnect custom resource. The following YAML definition is a basic example of a Kafka Connect cluster configuration with two replicas.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect 1
metadata:
  annotations:
    strimzi.io/use-connector-resources: 'true' 2
  name: my-connect-cluster 3
spec:
  bootstrapServers: 'my-cluster-kafka-bootstrap:9092' 4
  image: 'quay.io/redhattraining/connect-cluster:latest' 5
  config: 6
    config.storage.topic: my-connect-cluster-configs 7
    offset.storage.topic: my-connect-cluster-offsets 8
    status.storage.topic: my-connect-cluster-status 9
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  replicas: 2 10
  version: 2.8.0 11

1

Kafka Connect resource name that the AMQ Streams operator handles.

2

Activate KafkaConnector resource scanning and create connectors without using the REST API.

3

Kafka Connect cluster's name.

4

Kafka cluster bootstrap address that the Kafka Connect cluster connects to.

5

The registry address for the custom Kafka Connect image. AMQ Streams builds this image and pushes it to the registry.

6

Configurations for the Kafka Connect cluster.

7

The topic to store the connector and task configuration state in.

8

The topic to store the connector offset state in.

9

The topic to store status updates for connectors and tasks

10

Replica count for the Kafka Connect cluster.

11

Kafka version that is used in the Kafka Connect cluster.

The KafkaConnect.spec.image value from the preceding example is a custom, prebuilt Kafka Connect cluster image. This is a manually built image that the Kafka Connect cluster runs in its containers. In most cases, you should create a custom image to include the required plug-ins for the connectors. Plug-ins are the .jar files that connectors use when they are active in a Kafka Connect cluster.

Manually building an image is not the only way to create a Kafka Connect cluster. To prepare a Kafka Connect cluster image, there are two options:

  • Manually build an image.

  • Use the AMQ Streams operator to build the image.

In this lecture, we will focus on building a cluster image with the AMQ Streams operator.

Building a Cluster Image with AMQ Streams Operator

To make AMQ Streams build an image, you must define the KafkaConnect.spec.build value rather than the KafkaConnect.spec.image. The following YAML configuration is an example of a KafkaConnect cluster resource that is configured for building the images on RHOCP.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: 'true'
  name: my-connect-cluster
spec:
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  build: 1
    output: 2
      image: quay.io/redhattraining/connect-cluster:latest 3
      pushSecret: my-connect-cluster-push-secret 4
      type: docker 5
    plugins: 6
    - name: debezium-postgres-connector
      artifacts:
      - type: tgz 7
        url: https://a.plugin.url/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz 8
        sha512sum: 962a12151bdf9a5a30627...f035a0447cb820077af00c03 9
    - name: camel-elasticsearch-rest-kafka-connector
      artifacts:
      - type: tgz
        url: https://a.plugin.url/camel-elasticsearch-rest-kafka-connector-0.10.0-package.tar.gz
  config:
    config.storage.topic: my-connect-cluster-configs
    offset.storage.topic: my-connect-cluster-offsets
    status.storage.topic: my-connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
  replicas: 2
  version: 2.8.0

1

Instead of an image, you should use build in this case.

2

The output defines the image output and its details.

3

The registry address for the custom Kafka Connect image that the AMQ Streams operator builds and pushes.

4

Push secret name for pushing the image to the defined registry. You should create this push secret upfront.

5

Output type can either be docker or imagestream. While output type docker defines an external image to be created, imagestream defines a RHOCP internal image that is accessed via an ImageStream.

6

List of connector plug-ins and their artifacts to add to the new container image. Each plug-in must be configured with at least one artifact.

7

Artifact type for the plug-in. This field can have tgz, zip and jar values.

8

URL from which the plug-in artifact is downloaded.

9

(Optional) SHA-512 checksum to verify the plug-in artifact.

When you apply a KafkaConnect resource with the build artifact configurations in it, the operator first builds the image and pushes it to the registry address provided in the resource configuration. The rest of the workflow for the operator works the same as the manual building strategy. The Kafka Connect operator creates the relevant worker pods that use the custom image.

Kafka Connect REST API

Apart from providing a set of custom resources that the AMQ Streams operator provides, Kafka Connect also offers a REST API. This REST API allows developers to get the worker node info and manage connectors on the Kafka Connect cluster. In AMQ Streams on RHOCP, this is available on connect-cluster-name-connect-api:8083. This address is exposed via a Service that the operator creates.

The top-level resource paths for the REST API are:

  • /

  • /connectors

  • /connector-plugins

/ is the root path for the REST API, which gets the information from the worker node and serves the REST request. This information has the version of the Kafka Connect worker, the git commit ID of the source code, and the Kafka Connect cluster ID.

The following is an example of a request and its response to a Kafka Connect cluster's REST API for its root path.

Request
GET / HTTP/1.1
Host: connect-cluster-name-connect-api
Accept: application/json
Response
HTTP/1.1 200 OK
Content-Type: application/json

{
  "version":"2.8.0.redhat-00005",
  "commit":"5568dc65ed44500b"
  "kafka_cluster_id":"pkAfTbvQTuqfo10Hlg_PxQ"
}

The REST API currently only supports application/json as the content type for both the request and the response. Thus, the requests should specify the expected content type of the response via the Accept header. In the response header, the Content-Type should be application/json as well, if it is specified in the request header.

There is more detailed information about connectors and using the REST API for them in the next section named Creating Connectors.

 

References

Kafka Connect REST API

Further information about Kafka Connect cluster configuration, is in the Kafka Connect/S2I cluster configuration section in the Using AMQ Streams on OpenShift document at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/using_amq_streams_on_openshift/index#assembly-kafka-connect-str

For more information, refer to Deploy Kafka Connect chapter in the Deploying and Upgrading AMQ Streams on OpenShift document at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#kafka-connect-str

Revision: ad482-1.8-cc2ae1c