Bookmark this page

Creating Connectors

Objectives

After completing this section, you should be able to create a Kafka Connect connector.

Defining Connectors

Connectors integrate Kafka Connect with external technologies, such as databases, and third-party services. The main responsibility of connectors is to stream data from an external system to Kafka, or from Kafka to an external system.

Kafka Connect has two types of connectors.

Source connector

Streams data from an external system to Kafka.

For example, the Debezium MySQL connector. This connector reads the MySQL binlog and streams data to Kafka.

Figure 5.5: Source connector
Sink connector

Streams data from Kafka to an external system.

For example, the Elasticsearch sink connector. This connector streams data from Kafka to Elasticsearch.

Figure 5.6: Sink connector

Connectors are components that implement the Kafka Connect API. When you use connectors, you use configuration files to describe the data you want to copy, and the Kafka Connect components do all the work for you.

The connector implementation is responsible for splitting the copying job into tasks. Those tasks are later distributed to connect workers that perform the copy.

Defining Converters

Converters are another important component of Kafka Connect. Their main responsibility is to handle the serialization and deserialization of data. For example, a converter can take input from JDBC and convert it to JSON.

For maximum reusability, converters are a separate component from connectors. This means that you can use the same converter with a source connector, and with a sink connector.

Apache Kafka provides the following converters:

  • org.apache.kafka.connect.json.JsonConverter

  • org.apache.kafka.connect.storage.StringConverter

  • org.apache.kafka.connect.converters.ByteArrayConverter

Note

The Red Hat Service Registry provides the Avro and JSON Schema converters to use with Kafka Connect.

Creating and Managing Connectors

To use connectors, you must create a connector instance in your Kafka Connect cluster. After creating the instance, you can configure, monitor and manage the running connector instance.

The connector's configuration depends on the external system that they integrate with. They might have a different purpose, but there are a few common options that you must configure in all your connectors:

  • name: The unique name of the connector.

  • connector.class: The Java class for the connector.

  • tasks.max: Maximum number of tasks that the connector can create.

  • key.converter (Optional): Setting that defines the key converter.

  • value.converter (Optional): Setting that defines the value converter.

Sink connectors must also set one of the following options to control their input:

  • topics: List of input topics for the connector.

  • topics.regex: Regular expression of input topics for the connector.

The preceding configuration options are in the format of Java properties that Kafka Connect accepts natively in its API.

AMQ Streams on the Red Hat OpenShift Container Platform (RHOCP) provides two options to create and manage connectors, the KafkaConnector custom resource, and the Kafka Connect REST API.

Using the KafkaConnector Resource

With the KafkaConnector custom resource, you define the desired state of a connector in a YAML file and deploy the resource to OpenShift to create the connector instances. You must deploy the KafkaConnector custom resources in the same namespace as your Kafka Connect cluster.

The following snippet shows how to define a connector instance.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: my-source-connector 1
  labels:
    strimzi.io/cluster: my-connect-cluster 2
spec:
  class: org.apache.kafka.connect.file.FileStreamSourceConnector 3
  tasksMax: 2 4
  config: 5
    file: "/opt/kafka/LICENSE"
    topic: my-topic

1

Unique name assigned to the connector.

2

Name of the Kafka Connect cluster to link with.

3

Fully qualified name or alias of the class that implements the connector logic.

4

Maximum number of tasks that the connector can create.

5

Connector configuration as key-value pairs. Each connector implementation might have different configuration options.

The preceding example defines a source connector that streams the content of the /opt/kafka/LICENSE file to the my-topic topic.

To manage a running connector instance, you must update its corresponding KafkaConnector resource. For other actions such as restarting connector instances or connector tasks, you must annotate the KafkaConnector custom resource.

Using the Kafka Connect REST API

With the Kafka Connect REST API, you define the desired state of a connector in a JSON object and make an HTTP request to create the connector instances. In Red Hat OpenShift, the Kafka Connect REST API is available on port 8083 of the connect API service.

The following table is a non-comprehensive list of operations supported by the REST API.

Table 5.1. Kafka Connect REST API Operations

MethodEndpointDescription
GET /connectors Gets a list of active connectors.
POST /connectors Creates a new connector. The request body must be a JSON object with a name field, and a config object with the connector configuration parameters.
GET /connectors/{name} Gets the available information from a specific connector.

The following snippet shows the request payload that creates a connector instance.

{
  "name": "my-source-connector", 1
  "config": { 2
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector", 3
    "tasks.max": "2", 4
    "file": "/opt/kafka/LICENSE",
    "topic": "my-topic"
  }
}

1

Unique name assigned to the connector.

2

Connector configuration as key-value pairs. Each connector implementation might have different configuration options.

3

Fully qualified name or alias of the class that implements the connector logic.

4

Maximum number of tasks that the connector can create.

The preceding example defines a source connector that streams the content of the /opt/kafka/LICENSE file to the my-topic topic.

Consult the Kafka Connect REST API document in the references section for more information about the operations supported by the REST API.

Note

The Cluster Operator reverts any changes done through the Kafka Connect REST API when you have the KafkaConnectors enabled. You can enable and disable KafkaConnectors by using the Boolean annotation strimzi.io/use-connector-resources in your KafkaConnect custom resource.

Installing Connector Plug-ins

A connector plug-in is a Java package containing an implementation that knows how to communicate with an external system. The AMQ Streams container images for Kafka Connect include two built-in file connectors.

  • The FileStreamSourceConnector streams data to Kafka from a file.

  • The FileStreamSinkConnector streams data from Kafka to a file.

To extend the connection capabilities of Kafka Connect, you must create a custom Kafka Connect image, and add the connector classes you need. There are different procedures to add your own connector classes to Kafka Connect images:

For the details of each procedure, and more information about each strategy consult the Extending Kafka Connect with connector plug-ins document in the references section.

Note

Instead of creating your own connectors, you can use the open source connectors available in the Debezium and Apache Camel projects.

 

References

For more information about adding your own connector classes to connector images, refer to the Extending Kafka Connect with connector plug-ins section in the Red Hat AMQ Guide at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#using-kafka-connect-with-plug-ins-str

For more information about restarting a Kafka connector, refer to the Performing a restart of a Kafka connector section in the Red Hat AMQ Guide at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#proc-manual-restart-connector-str

For more information about restarting a Kafka connector task, refer to the Performing a restart of a Kafka connector task section in the Red Hat AMQ Guide at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q3/html-single/deploying_and_upgrading_amq_streams_on_openshift/index#proc-manual-restart-connector-task-str

Configuring Connectors

Kafka Connect REST API

Apache Camel Kafka Connector

Debezium Connectors

Revision: ad482-1.8-cc2ae1c