Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
- SectionCreating a Connect Cluster
- Guided Exercise: Creating a Connect Cluster
- Creating Connectors
- Guided Exercise: Creating Connectors
- Transforming Messages
- Guided Exercise: Transforming Messages
- Capturing Change Event Data with Debezium
- Guided Exercise: Capturing Change Event Data with Debezium
- Lab: Integrating Data Systems with Kafka Connect
- Summary
Abstract
| Goal | Connect data systems and react to data changes using Kafka Connect and Debezium. |
| Objectives |
|
| Sections |
|
| Lab |
Integrating Data Systems with Kafka Connect |
Kafka Connect is a tool for reliably streaming data between Apache Kafka and external systems. It is both scalable and flexible. You can use Kafka Connect to integrate Kafka with any system, such as databases, key-value stores, search indexes and file systems.
Without Kafka Connect, developers must build their own applications to integrate Kafka with an external system. For each integration point, there must be a custom application that performs the data transfer between the external systems. The custom applications do not share a custom common contract unless it is implemented. Implementing a custom contract that provides reusability and flexibility is not easy. It also creates the risk of reliability and scalability issues because of the possible defects in the custom integration applications.
Kafka Connect, as a ready-to-use mechanism, has many benefits over the do-it-yourself (DIY) approach, including the following:
- Data Centric Approach
Data is at the center of Kafka Connect. Kafka Connect forces you to only work with data formatting and where to push or pull the data from. You do not have to deal with the consumer and producer implementations and type conversions to implement an integration logic.
- Scalability and Flexibility
Kafka Connect is very flexible because it can make integration possible with any external system such as databases, search indexes, file systems, etc. With Kafka Connect's distributed mode, you can also scale Kafka Connect instances.
- Reusability and Extensibility
Kafka Connect provides ready-to-use, reusable connectors. It also includes an API to build connectors or extend the current connectors for customization.
For many streaming data integration cases, providing all the preceding benefits, Kafka Connect is a lightweight, Kafka-centric solution for integrating systems and Apache Kafka.
Kafka Connect consists of two main components: the Kafka Connect cluster and the Kafka Connect connectors.
A Kafka Connect Cluster consists of one or multiple nodes called workers. This is very similar to a Kafka cluster because the workload can be distributed across the nodes.
- Workers
A worker is a running process in a Kafka Connect cluster. This process is responsible for executing a set of components that transfer data from an external system to Kafka or vice versa. These sets of components are called connectors and tasks.
- Connectors
Kafka Connect connectors are ready-to-use prebuilt components. Connector instances are logical jobs that are responsible for streaming data between an external system and Kafka.
- Tasks
Tasks are the smallest units in the data model of Kafka Connect. Connector instances are responsible for coordinating a set of tasks. These tasks are the actors that actually copy the data between external systems and Kafka.
Connectors are covered in more detail in the following section.
A Kafka Connect cluster, without connectors, does not do anything by itself. The process that runs in the Kafka Connect node, will just run idly. The cluster's main responsibility is to manage the worker nodes and ensure that the connector workload per task is distributed evenly. To transfer data between Kafka and an external system, there must be actively running connector tasks.
Kafka Connect has two working modes:
- Standalone Mode
In standalone mode, only a single Kafka Connect worker is responsible for execution. This execution runs all the connectors and their tasks in the same worker. Thus, this mode is not suitable for building a highly available, scalable system. It is more suitable as a local adapter that solves a minor integration problem.
With this mode running, Kafka offsets and Kafka Connect configuration is stored locally in files.
- Distributed Mode
In distributed mode, there are multiple workers to distribute the workload of connector tasks. Like a Kafka cluster, a Kafka Connect cluster has a leader that is one of the worker nodes. The Leader is responsible for coordinating and balancing the task distribution for all worker nodes including itself.
Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. This helps to build a highly available system which is important for most of the production environments.
Because the nodes are distributed, you must configure a Kafka Connect cluster for keeping the configuration and the offsets stored in Kafka topics. In this way, all the workers can sync the configuration and offset data.
In AMQ Streams on the Red Hat OpenShift Container Platform, Kafka Connect Clusters always run in distributed mode. The worker nodes run in containers instead of machines and can scale with the help of pods. Each pod has one Kafka Connect container that has one worker in it.
Because Kafka Connect runs in distributed mode, connector offsets, configurations and status are all stored in Kafka topics.
In AMQ Streams on the Red Hat OpenShift Container Platform (RHOCP), Kafka Connect Clusters are configured by using the KafkaConnect custom resource. The following YAML definition is a basic example of a Kafka Connect cluster configuration with two replicas.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnectmetadata: annotations: strimzi.io/use-connector-resources: 'true'
name: my-connect-cluster
spec: bootstrapServers: 'my-cluster-kafka-bootstrap:9092'
![]()
image: 'quay.io/redhattraining/connect-cluster:latest'config:
config.storage.topic: my-connect-cluster-configs
offset.storage.topic: my-connect-cluster-offsets
status.storage.topic: my-connect-cluster-status
config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1
replicas: 2version: 2.8.0
Kafka Connect resource name that the AMQ Streams operator handles. | |
Activate | |
Kafka Connect cluster's name. | |
Kafka cluster bootstrap address that the Kafka Connect cluster connects to. | |
The registry address for the custom Kafka Connect image. AMQ Streams builds this image and pushes it to the registry. | |
Configurations for the Kafka Connect cluster. | |
The topic to store the connector and task configuration state in. | |
The topic to store the connector offset state in. | |
The topic to store status updates for connectors and tasks | |
Replica count for the Kafka Connect cluster. | |
Kafka version that is used in the Kafka Connect cluster. |
The KafkaConnect.spec.image value from the preceding example is a custom, prebuilt Kafka Connect cluster image.
This is a manually built image that the Kafka Connect cluster runs in its containers.
In most cases, you should create a custom image to include the required plug-ins for the connectors.
Plug-ins are the .jar files that connectors use when they are active in a Kafka Connect cluster.
Manually building an image is not the only way to create a Kafka Connect cluster. To prepare a Kafka Connect cluster image, there are two options:
Manually build an image.
Use the AMQ Streams operator to build the image.
In this lecture, we will focus on building a cluster image with the AMQ Streams operator.
To make AMQ Streams build an image, you must define the KafkaConnect.spec.build value rather than the KafkaConnect.spec.image.
The following YAML configuration is an example of a KafkaConnect cluster resource that is configured for building the images on RHOCP.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
annotations:
strimzi.io/use-connector-resources: 'true'
name: my-connect-cluster
spec:
bootstrapServers: my-cluster-kafka-bootstrap:9092
build:
output:
image: quay.io/redhattraining/connect-cluster:latest
pushSecret: my-connect-cluster-push-secret
type: docker
plugins:
- name: debezium-postgres-connector
artifacts:
- type: tgz
url: https://a.plugin.url/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz
sha512sum: 962a12151bdf9a5a30627...f035a0447cb820077af00c03
- name: camel-elasticsearch-rest-kafka-connector
artifacts:
- type: tgz
url: https://a.plugin.url/camel-elasticsearch-rest-kafka-connector-0.10.0-package.tar.gz
config:
config.storage.topic: my-connect-cluster-configs
offset.storage.topic: my-connect-cluster-offsets
status.storage.topic: my-connect-cluster-status
config.storage.replication.factor: 1
offset.storage.replication.factor: 1
status.storage.replication.factor: 1
replicas: 2
version: 2.8.0Instead of an | |
The | |
The registry address for the custom Kafka Connect image that the AMQ Streams operator builds and pushes. | |
Push secret name for pushing the image to the defined registry. You should create this push secret upfront. | |
Output type can either be docker or imagestream. While output type docker defines an external image to be created, imagestream defines a RHOCP internal image that is accessed via an ImageStream. | |
List of connector plug-ins and their artifacts to add to the new container image. Each plug-in must be configured with at least one artifact. | |
Artifact type for the plug-in.
This field can have | |
URL from which the plug-in artifact is downloaded. | |
(Optional) SHA-512 checksum to verify the plug-in artifact. |
When you apply a KafkaConnect resource with the build artifact configurations in it, the operator first builds the image and pushes it to the registry address provided in the resource configuration. The rest of the workflow for the operator works the same as the manual building strategy. The Kafka Connect operator creates the relevant worker pods that use the custom image.
Apart from providing a set of custom resources that the AMQ Streams operator provides, Kafka Connect also offers a REST API. This REST API allows developers to get the worker node info and manage connectors on the Kafka Connect cluster. In AMQ Streams on RHOCP, this is available on connect-cluster-name-connect-api:8083. This address is exposed via a Service that the operator creates.
The top-level resource paths for the REST API are:
//connectors/connector-plugins
/ is the root path for the REST API, which gets the information from the worker node and serves the REST request.
This information has the version of the Kafka Connect worker, the git commit ID of the source code, and the Kafka Connect cluster ID.
The following is an example of a request and its response to a Kafka Connect cluster's REST API for its root path.
| Request |
GET / HTTP/1.1 Host: |
| Response |
HTTP/1.1 200 OK
|
The REST API currently only supports application/json as the content type for both the request and the response.
Thus, the requests should specify the expected content type of the response via the Accept header.
In the response header, the Content-Type should be application/json as well, if it is specified in the request header.
There is more detailed information about connectors and using the REST API for them in the next section named Creating Connectors.
References
Further information about Kafka Connect cluster configuration, is in the Kafka Connect/S2I cluster configuration section in the Using AMQ Streams on OpenShift document at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/using_amq_streams_on_openshift/index#assembly-kafka-connect-str
For more information, refer to Deploy Kafka Connect chapter in the Deploying and Upgrading AMQ Streams on OpenShift document at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#kafka-connect-str