Bookmark this page

Guided Exercise: Creating Connectors

In this exercise you will stream issues from a GitHub repository to Kafka, and then stream the issues from Kafka to the Elasticsearch engine.

Outcomes

You should be able to create source and sink connectors by using the KafkaConnector resource, and the Kafka Connect REST API.

You can find the solution files for this exercise in the AD482-apps repository, within the connect-connectors/solutions directory.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The OpenShift CLI (oc).

  • A GitHub account.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start connect-connectors
...output omitted...

The lab command copies the exercise files, from the AD482-apps/connect-connectors/apps directory, which is in your local Git repository, into the connect-connectors directory, which is in your workspace.

Procedure 5.2. Instructions

  1. Navigate to the connect-connectors directory located in your workspace directory.

    (.venv) [user@host AD482]$ cd connect-connectors
    (.venv) [user@host connect-connectors]$
  2. Verify that the Kafka Connect cluster and the available connector plug-ins are running in the Kafka Connect instance.

    1. Use the oc get pods command to verify that the Kafka Connect cluster pod is running.

      (.venv) [user@host connect-connectors]$ oc get pods \
       --selector app.kubernetes.io/name=kafka-connect
      NAME                                    READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-POD_SUFFIX   1/1     Running     0          3m47s
    2. Use the oc describe kafkaconnect command to verify the available connector plug-ins.

      (.venv) [user@host connect-connectors]$ oc describe \
       kafkaconnect my-connect-cluster
      ...output omitted...
      Status:
        ...output omitted...
        Connector Plugins:
          Class: org.apache...elasticsearchrest.CamelElasticsearchrestSinkConnector
          Type: sink
          Version: 0.10.0
          Class: org.apache.camel.kafkaconnector.github.CamelGithubSinkConnector
          Type: sink
          Version: 0.10.1
          Class: org.apache.camel.kafkaconnector.github.CamelGithubSourceConnector
          Type: source
          Version: 0.10.1
          ...output omitted...

      The preceding command lists all the connector plug-ins available in the running Kafka Connect instance. The list must contain a sink connector for Elasticsearch, and a source connector for GitHub.

  3. Create a GitHub repository and an access token.

    1. Open a web browser, navigate to GitHub, and create a public empty repository named ad482-connectors.

    2. Navigate to https://github.com/settings/tokens.

    3. Click Generate new token. Enter your GitHub password when prompted. Create a new token with the repo scope selected. Enter ad482-connectors for the Note field. After generating the token, copy the token value.

      Warning

      Be sure to copy the generated access token and store it in a place that you remember.

  4. Create a GitHub source connector by using the KafkaConnector custom resource.

    1. Open the resources/github-source-connector.yaml file, and configure your GitHub user and access token.

      The file contents should look like the following:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        labels:
          strimzi.io/cluster: my-connect-cluster
        name: github-source-connector
      spec:
        class: org.apache.camel.kafkaconnector.github.CamelGithubSourceConnector 1
        config:
          camel.source.endpoint.oauthToken: YOUR_GITHUB_TOKEN 2
          camel.source.endpoint.repoName: ad482-connectors 3
          camel.source.endpoint.repoOwner: YOUR_GITHUB_USERNAME 4
          camel.source.path.branchName: main 5
          camel.source.path.type: event 6
          key.converter: org.apache.kafka.connect.json.JsonConverter 7
          value.converter: org.apache.kafka.connect.json.JsonConverter 8
          topics: github-events 9
        tasksMax: 1

      1

      Connector class that implements the integration with GitHub.

      2

      Specific configuration for the Camel source connector. It defines the access token to use.

      3

      Specific configuration for the Camel source connector. It defines the GitHub repository.

      4

      Specific configuration for the Camel source connector. It defines the repository user.

      5

      Specific configuration for the Camel source connector. It defines the GitHub branch name.

      6

      Specific configuration for the Camel source connector. It defines the path type.

      7

      Key converter.

      8

      Value converter.

      9

      Topic to which messages are sent.

    2. Use the oc create command to create the source connector.

      (.venv) [user@host connect-connectors]$ oc create \
       -f resources/github-source-connector.yaml
      kafkaconnector.kafka.strimzi.io/github-source-connector created
  5. Verify the correct configuration of the GitHub source connector.

    1. Use the oc exec command to start a consumer for the github-events topic.

      (.venv) [user@host connect-connectors]$ oc exec \
       -it my-cluster-kafka-0 \
       -c kafka \
       -- bin/kafka-console-consumer.sh \
       --bootstrap-server my-cluster-kafka-brokers:9092 \
       --topic github-events

      Leave the terminal window open.

    2. Open a browser tab and navigate to https://github.com/YOUR_GITHUB_USERNAME/ad482-connectors/issues/new.

    3. Type ad482-test in the Title field, and click Submit new issue.

    4. Return to the consumer running in your terminal to verify the consumption of the GitHub event by the connector instance.

      {"schema":{"type":"string","optional":false},"payload":"IssuesEvent"}
    5. Stop the consumer.

  6. Create an Elasticsearch sink connector by using the Kafka Connect REST API.

    1. Use the oc patch command to allow the creation of connectors via the Kafka Connect REST API.

      (.venv) [user@host connect-connectors]$ oc patch \
       kafkaconnect/my-connect-cluster -p \
       '{"metadata":{"annotations":{"strimzi.io/use-connector-resources":"false"} } }' \
       --type merge
      kafkaconnect.kafka.strimzi.io/my-connect-cluster patched

      Important

      Windows users must replace the preceding oc patch command in PowerShell as follows:

      (.venv) PS C:\Users\user\AD482\connect-connectors> oc patch `
       kafkaconnect/my-connect-cluster -p `
       '{"""metadata""":{"""annotations""":{"""strimzi.io/use-connector-resources""":"""false"""} } }' `
       --type merge
    2. Use the oc get pods command to get the name of the Kafka Connect cluster pod.

      (.venv) [user@host connect-connectors]$ oc get pods \
       --selector app.kubernetes.io/name=kafka-connect
      NAME                                    READY   STATUS      RESTARTS   AGE
      my-connect-cluster-connect-POD_SUFFIX   1/1     Running     0          7m41s
    3. Use the oc rsh command to access the Kafka Connect cluster pod.

      (.venv) [user@host connect-connectors]$ oc rsh \
       my-connect-cluster-connect-POD_SUFFIX
    4. Use the curl command to create a sink connector.

      sh-4.2$ curl -X POST \
      http://localhost:8083/connectors \
      -H 'Content-Type: application/json' \
      -d '{ "name": "elasticsearch-sink-connector",
        "config":
          {
             "connector.class": "org.apache.camel.kafkaconnector.elasticsearchrest.​CamelElasticsearchrestSinkConnector", 1
             "tasks.max": "1",
             "topics": "github-events", 2
             "camel.sink.endpoint.hostAddresses": "elasticsearch-es-http:9200", 3
             "camel.sink.endpoint.indexName": "github_events", 4
             "camel.sink.endpoint.operation": "Index", 5
             "camel.sink.path.clusterName": "elasticsearch", 6
             "key.converter": "org.apache.kafka.connect.storage.StringConverter", 7
             "value.converter": "org.apache.kafka.connect.storage.StringConverter" 8
           }
      }'
      ...output omitted...

      1

      Connector class that implements the integration with Elasticsearch.

      2

      Topic to which messages are sent.

      3

      Specific configuration for the Camel sink connector. It defines the endpoint to use.

      4

      Specific configuration for the Camel sink connector. It defines the Elasticsearch index name.

      5

      Specific configuration for the Camel sink connector. It defines the Elasticsearch operation type.

      6

      Specific configuration for the Camel sink connector. It defines the Elasticsearch cluster name.

      7

      Key converter.

      8

      Value converter.

    5. Use the curl command to get a list of the created connectors.

      sh-4.2$ curl -X GET localhost:8083/connectors
      ["elasticsearch-sink-connector","github-source-connector"]
    6. Exit from the Kafka Connect cluster pod.

  7. Verify the correct configuration of the Elasticsearch sink connector.

    1. Return to the GitHub tab in your browser, and create another issue.

    2. Use the oc get route command to get the URL of the github_events index.

      (.venv) [user@host connect-connectors]$ oc get route elasticsearch-es-http \
       -o jsonpath="{'http://'}{.spec.host}{'/github_events/_search\n'}"
      http://YOUR_ELASTICSEARCH_URL/github_events/_search
    3. Open a browser tab and navigate to the URL you just retrieved. Verify that the Elasticsearch response contains several elements with the payload field equal to IssuesEvent.

      {"took":57,"timed_out":false..."payload":"IssuesEvent"}},
      ...output omitted...

Finish

Return to your workspace directory. Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish connect-connectors

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c