Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will stream issues from a GitHub repository to Kafka, and then stream the issues from Kafka to the Elasticsearch engine.
Outcomes
You should be able to create source and sink connectors by using the KafkaConnector resource, and the Kafka Connect REST API.
You can find the solution files for this exercise in the AD482-apps repository, within the connect-connectors/solutions directory.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The OpenShift CLI (oc).
A GitHub account.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start connect-connectors
...output omitted...
The lab command copies the exercise files, from the AD482-apps/connect-connectors/apps directory, which is in your local Git repository, into the connect-connectors directory, which is in your workspace.
Procedure 5.2. Instructions
Navigate to the
connect-connectorsdirectory located in your workspace directory.(.venv) [user@host AD482]$
cd connect-connectors(.venv) [user@host connect-connectors]$Verify that the Kafka Connect cluster and the available connector plug-ins are running in the Kafka Connect instance.
Use the
oc get podscommand to verify that the Kafka Connect cluster pod is running.(.venv) [user@host connect-connectors]$
oc get pods \--selector app.kubernetes.io/name=kafka-connectNAME READY STATUS RESTARTS AGE my-connect-cluster-connect-POD_SUFFIX 1/1Running0 3m47sUse the
oc describe kafkaconnectcommand to verify the available connector plug-ins.(.venv) [user@host connect-connectors]$
oc describe \kafkaconnect my-connect-cluster...output omitted... Status: ...output omitted... Connector Plugins: Class:org.apache...elasticsearchrest.CamelElasticsearchrestSinkConnectorType: sink Version: 0.10.0 Class: org.apache.camel.kafkaconnector.github.CamelGithubSinkConnector Type: sink Version: 0.10.1 Class:org.apache.camel.kafkaconnector.github.CamelGithubSourceConnectorType: source Version: 0.10.1 ...output omitted...The preceding command lists all the connector plug-ins available in the running Kafka Connect instance. The list must contain a sink connector for Elasticsearch, and a source connector for GitHub.
Create a GitHub repository and an access token.
Open a web browser, navigate to GitHub, and create a public empty repository named
ad482-connectors.Navigate to https://github.com/settings/tokens.
Click . Enter your GitHub password when prompted. Create a new token with the
reposcope selected. Enterad482-connectorsfor theNotefield. After generating the token, copy the token value.Warning
Be sure to copy the generated access token and store it in a place that you remember.
Create a GitHub source connector by using the
KafkaConnectorcustom resource.Open the
resources/github-source-connector.yamlfile, and configure your GitHub user and access token.The file contents should look like the following:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: github-source-connector spec: class: org.apache.camel.kafkaconnector.github.CamelGithubSourceConnectorconfig: camel.source.endpoint.oauthToken:
YOUR_GITHUB_TOKENcamel.source.endpoint.repoName: ad482-connectors
camel.source.endpoint.repoOwner:
YOUR_GITHUB_USERNAMEcamel.source.path.branchName: main
camel.source.path.type: event
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
topics: github-events
tasksMax: 1
Connector class that implements the integration with GitHub.
Specific configuration for the Camel source connector. It defines the access token to use.
Specific configuration for the Camel source connector. It defines the GitHub repository.
Specific configuration for the Camel source connector. It defines the repository user.
Specific configuration for the Camel source connector. It defines the GitHub branch name.
Specific configuration for the Camel source connector. It defines the path type.
Key converter.
Value converter.
Topic to which messages are sent.
Use the
oc createcommand to create the source connector.(.venv) [user@host connect-connectors]$
oc create \-f resources/github-source-connector.yamlkafkaconnector.kafka.strimzi.io/github-source-connector created
Verify the correct configuration of the GitHub source connector.
Use the
oc execcommand to start a consumer for thegithub-eventstopic.(.venv) [user@host connect-connectors]$
oc exec \-it my-cluster-kafka-0 \-c kafka \-- bin/kafka-console-consumer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic github-eventsLeave the terminal window open.
Open a browser tab and navigate to
https://github.com/YOUR_GITHUB_USERNAME/ad482-connectors/issues/new.Type
ad482-testin the field, and click .Return to the consumer running in your terminal to verify the consumption of the GitHub event by the connector instance.
{"schema":{"type":"string","optional":false},"payload":"IssuesEvent"}Stop the consumer.
Create an Elasticsearch sink connector by using the Kafka Connect REST API.
Use the
oc patchcommand to allow the creation of connectors via the Kafka Connect REST API.(.venv) [user@host connect-connectors]$
oc patch \kafkaconnect/my-connect-cluster -p \'{"metadata":{"annotations":{"strimzi.io/use-connector-resources":"false"} } }' \--type mergekafkaconnect.kafka.strimzi.io/my-connect-cluster patchedImportant
Windows users must replace the preceding
oc patchcommand in PowerShell as follows:(.venv) PS C:\Users\user\AD482\connect-connectors>
oc patch `kafkaconnect/my-connect-cluster -p `'{"""metadata""":{"""annotations""":{"""strimzi.io/use-connector-resources""":"""false"""} } }' `--type mergeUse the
oc get podscommand to get the name of the Kafka Connect cluster pod.(.venv) [user@host connect-connectors]$
oc get pods \--selector app.kubernetes.io/name=kafka-connectNAME READY STATUS RESTARTS AGE my-connect-cluster-connect-POD_SUFFIX 1/1 Running 0 7m41sUse the
oc rshcommand to access the Kafka Connect cluster pod.(.venv) [user@host connect-connectors]$
oc rsh \my-connect-cluster-connect-POD_SUFFIXUse the
curlcommand to create a sink connector.sh-4.2$
curl -X POST \http://localhost:8083/connectors \-H 'Content-Type: application/json' \-d '{ "name": "elasticsearch-sink-connector","config":{"connector.class": "org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector","tasks.max": "1","topics": "github-events","camel.sink.endpoint.hostAddresses": "elasticsearch-es-http:9200","camel.sink.endpoint.indexName": "github_events","camel.sink.endpoint.operation": "Index","camel.sink.path.clusterName": "elasticsearch","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "org.apache.kafka.connect.storage.StringConverter"}}'...output omitted...Connector class that implements the integration with Elasticsearch.
Topic to which messages are sent.
Specific configuration for the Camel sink connector. It defines the endpoint to use.
Specific configuration for the Camel sink connector. It defines the Elasticsearch index name.
Specific configuration for the Camel sink connector. It defines the Elasticsearch operation type.
Specific configuration for the Camel sink connector. It defines the Elasticsearch cluster name.
Key converter.
Value converter.
Use the
curlcommand to get a list of the created connectors.sh-4.2$
curl -X GET localhost:8083/connectors["elasticsearch-sink-connector","github-source-connector"]Exit from the Kafka Connect cluster pod.
Verify the correct configuration of the Elasticsearch sink connector.
Return to the GitHub tab in your browser, and create another issue.
Use the
oc get routecommand to get the URL of thegithub_eventsindex.(.venv) [user@host connect-connectors]$
oc get route elasticsearch-es-http \-o jsonpath="{'http://'}{.spec.host}{'/github_events/_search\n'}"http://YOUR_ELASTICSEARCH_URL/github_events/_searchOpen a browser tab and navigate to the URL you just retrieved. Verify that the Elasticsearch response contains several elements with the
payloadfield equal toIssuesEvent.{"took":57,"timed_out":false..."payload":"IssuesEvent"}}, ...output omitted...
This concludes the guided exercise.