Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
Connectors integrate Kafka Connect with external technologies, such as databases, and third-party services. The main responsibility of connectors is to stream data from an external system to Kafka, or from Kafka to an external system.
Kafka Connect has two types of connectors.
- Source connector
Streams data from an external system to Kafka.
For example, the Debezium MySQL connector. This connector reads the MySQL binlog and streams data to Kafka.
Figure 5.5: Source connector- Sink connector
Streams data from Kafka to an external system.
For example, the Elasticsearch sink connector. This connector streams data from Kafka to Elasticsearch.
Figure 5.6: Sink connector
Connectors are components that implement the Kafka Connect API. When you use connectors, you use configuration files to describe the data you want to copy, and the Kafka Connect components do all the work for you.
The connector implementation is responsible for splitting the copying job into tasks. Those tasks are later distributed to connect workers that perform the copy.
Converters are another important component of Kafka Connect. Their main responsibility is to handle the serialization and deserialization of data. For example, a converter can take input from JDBC and convert it to JSON.
For maximum reusability, converters are a separate component from connectors. This means that you can use the same converter with a source connector, and with a sink connector.
Apache Kafka provides the following converters:
org.apache.kafka.connect.json.JsonConverterorg.apache.kafka.connect.storage.StringConverterorg.apache.kafka.connect.converters.ByteArrayConverter
Note
The Red Hat Service Registry provides the Avro and JSON Schema converters to use with Kafka Connect.
To use connectors, you must create a connector instance in your Kafka Connect cluster. After creating the instance, you can configure, monitor and manage the running connector instance.
The connector's configuration depends on the external system that they integrate with. They might have a different purpose, but there are a few common options that you must configure in all your connectors:
name: The unique name of the connector.connector.class: The Java class for the connector.tasks.max: Maximum number of tasks that the connector can create.key.converter(Optional): Setting that defines the key converter.value.converter(Optional): Setting that defines the value converter.
Sink connectors must also set one of the following options to control their input:
topics: List of input topics for the connector.topics.regex: Regular expression of input topics for the connector.
The preceding configuration options are in the format of Java properties that Kafka Connect accepts natively in its API.
AMQ Streams on the Red Hat OpenShift Container Platform (RHOCP) provides two options to create and manage connectors, the KafkaConnector custom resource, and the Kafka Connect REST API.
With the KafkaConnector custom resource, you define the desired state of a connector in a YAML file and deploy the resource to OpenShift to create the connector instances. You must deploy the KafkaConnector custom resources in the same namespace as your Kafka Connect cluster.
The following snippet shows how to define a connector instance.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: my-source-connectorlabels: strimzi.io/cluster: my-connect-cluster
spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 2
config:
file: "/opt/kafka/LICENSE" topic: my-topic
Unique name assigned to the connector. | |
Name of the Kafka Connect cluster to link with. | |
Fully qualified name or alias of the class that implements the connector logic. | |
Maximum number of tasks that the connector can create. | |
Connector configuration as key-value pairs. Each connector implementation might have different configuration options. |
The preceding example defines a source connector that streams the content of the /opt/kafka/LICENSE file to the my-topic topic.
To manage a running connector instance, you must update its corresponding KafkaConnector resource. For other actions such as restarting connector instances or connector tasks, you must annotate the KafkaConnector custom resource.
With the Kafka Connect REST API, you define the desired state of a connector in a JSON object and make an HTTP request to create the connector instances. In Red Hat OpenShift, the Kafka Connect REST API is available on port 8083 of the connect API service.
The following table is a non-comprehensive list of operations supported by the REST API.
Table 5.1. Kafka Connect REST API Operations
| Method | Endpoint | Description |
|---|---|---|
| GET |
/connectors
| Gets a list of active connectors. |
| POST |
/connectors
| Creates a new connector. The request body must be a JSON object with a name field, and a config object with the connector configuration parameters. |
| GET |
/connectors/{name}
| Gets the available information from a specific connector. |
The following snippet shows the request payload that creates a connector instance.
{
"name": "my-source-connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "2",
"file": "/opt/kafka/LICENSE",
"topic": "my-topic"
}
}Unique name assigned to the connector. | |
Connector configuration as key-value pairs. Each connector implementation might have different configuration options. | |
Fully qualified name or alias of the class that implements the connector logic. | |
Maximum number of tasks that the connector can create. |
The preceding example defines a source connector that streams the content of the /opt/kafka/LICENSE file to the my-topic topic.
Consult the Kafka Connect REST API document in the references section for more information about the operations supported by the REST API.
Note
The Cluster Operator reverts any changes done through the Kafka Connect REST API when you have the KafkaConnectors enabled.
You can enable and disable KafkaConnectors by using the Boolean annotation strimzi.io/use-connector-resources in your KafkaConnect custom resource.
A connector plug-in is a Java package containing an implementation that knows how to communicate with an external system. The AMQ Streams container images for Kafka Connect include two built-in file connectors.
The
FileStreamSourceConnectorstreams data to Kafka from a file.The
FileStreamSinkConnectorstreams data from Kafka to a file.
To extend the connection capabilities of Kafka Connect, you must create a custom Kafka Connect image, and add the connector classes you need. There are different procedures to add your own connector classes to Kafka Connect images:
Create a new container image by using AMQ Streams on the RHOCP. For more information about, see the section called “Creating a Connect Cluster”.
Create a container image from the Kafka Connect base image by using a Dockerfile.
For the details of each procedure, and more information about each strategy consult the Extending Kafka Connect with connector plug-ins document in the references section.
Note
Instead of creating your own connectors, you can use the open source connectors available in the Debezium and Apache Camel projects.
References
For more information about adding your own connector classes to connector images, refer to the Extending Kafka Connect with connector plug-ins section in the Red Hat AMQ Guide at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#using-kafka-connect-with-plug-ins-str
For more information about restarting a Kafka connector, refer to the Performing a restart of a Kafka connector section in the Red Hat AMQ Guide at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#proc-manual-restart-connector-str
For more information about restarting a Kafka connector task, refer to the Performing a restart of a Kafka connector task section in the Red Hat AMQ Guide at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#proc-manual-restart-connector-task-str