Bookmark this page

Lab: Integrating Data Systems with Kafka Connect

In this lab, you will implement a small part of the business logic of a company called WaterLeak Inc. WaterLeak Inc. has a system for creating repair requests for any water leak incident. When a repair request is created, it is assigned to an available plumber and an incident record is created in the archive.

The system has three applications:

  • The Repair Request Service enables creating repair requests and saves them into its own PostgreSQL database. Each created record in the database triggers an outbox event that a Debezium Kafka Connect connector captures and sends to a Kafka topic.

  • The Plumber Service is a service application that is responsible for plumber specific actions. When a repair request is created, an available plumber is selected, this triggers an event by marking the plumber as Assigned in the service's PostgreSQL database.

  • The Incidents Archive Service keeps records of all the incidents that the Repair Request Service creates. Provides an interface for searching incidents. Thus, uses Elasticsearch as a data store. A Kafka Connect connector receives the repair request data from a Kafka topic and indexes it in Elasticsearch.

Outcomes

You should be able to create a Kafka Connect cluster, create a Debezium connector that receives data from a PostgreSQL database and writes it to a Kafka cluster, and create a connector that consumes and saves the data to Elasticsearch. Also, you should be able to implement the Outbox pattern and CDC by using Debezium.

You will find the solution files for this exercise in the AD482-apps repository, within the connect-integration/solutions directory. Notice that you might need to replace some strings in the YAML and Java files.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

  • The OpenShift CLI (oc).

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/bin/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start connect-integration

The lab command copies the exercise files, from the AD482-apps/connect-integration/apps directory, which is in your local Git repository, into the connect-integration directory, which is in your workspace.

Procedure 5.5. Instructions

  1. In your workspace, move to the connect-integration directory. Create a Kafka Connect cluster that conforms to the following specifications:

    Table 5.3. Cluster Specifications

    Name
    my-connect-cluster
    Bootstrap servers
    my-cluster-kafka-bootstrap:9092
    Image
    quay.io/redhattraining/ad482-ch05s09-connect-cluster:latest
    Replica count
    1

    Add the following configurations to the Kafka Connect cluster specification:

    Table 5.4. Cluster Configurations

    key.converter
    org.apache.kafka.connect.storage
    .StringConverter
    value.converter
    org.apache.kafka.connect.storage
    .StringConverter
    config.storage.topic
    my-connect-cluster-configs
    offset.storage.topic
    my-connect-cluster-offsets
    status.storage.topic
    my-connect-cluster-status
    config.storage.replication.factor
    1
    offset.storage.replication.factor
    1
    status.storage.replication.factor
    1

    The Kafka Connect cluster should use connector resources for managing connectors, so set the strimzi.io/use-connector-resources annotation to true.

    1. By using your editor of choice, open the resources/my-connect-cluster.yaml file, and apply the given values in the preceding tables.

      The file content should look like the following:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnect
      metadata:
        annotations:
          strimzi.io/use-connector-resources: 'true'
        name: my-connect-cluster
      spec:
        bootstrapServers: 'my-cluster-kafka-bootstrap:9092'
        image: 'quay.io/redhattraining/ad482-ch05s09-connect-cluster:latest'
        config:
          config.storage.topic: my-connect-cluster-configs
          offset.storage.topic: my-connect-cluster-offsets
          status.storage.topic: my-connect-cluster-status
          key.converter: org.apache.kafka.connect.storage.StringConverter
          value.converter: org.apache.kafka.connect.storage.StringConverter
          config.storage.replication.factor: 1
          offset.storage.replication.factor: 1
          status.storage.replication.factor: 1
        replicas: 1
        version: 2.8.0
    2. Use the oc command to create the cluster in your project.

      (.venv) [user@host connect-integration]$ oc create -f \
      resources/my-connect-cluster.yaml
      kafkaconnect.kafka.strimzi.io/my-connect-cluster created
  2. Create a PostgreSQL Debezium connector that conforms to the following specifications:

    Table 5.5. PostgreSQL Debezium Connector Specifications

    Name
    repair-requests-connector
    Connector class
    io.debezium.connector.postgresql
    .PostgresConnector
    Connect cluster
    my-connect-cluster
    Max tasks
    1

    Add the following configurations to the PostgreSQL Debezium connector specification:

    Table 5.6. PostgreSQL Debezium Connector Configurations

    plugin.name
    pgoutput
    database.hostname
    postgresql-repair-requests
    database.port
    5432
    database.user
    postgres
    database.password
    postgres
    database.dbname
    waterleakdb
    database.server.name
    waterleakdb1
    schema.include.list
    public
    table.include.list
    public.outboxevent
    tombstones.on.delete
    false

    Finally, add the following transformations to the PostgreSQL Debezium connector configuration:

    Table 5.7. PostgreSQL Debezium Connector Transformations

    transforms
    EventRouter
    transforms.EventRouter.type
    io.debezium.transforms
    .outbox.EventRouter
    transforms.EventRouter.table.fields.additional.placement
    type:header:eventType
    transforms.EventRouter.route.topic.replacement
    ${routedByValue}.events

    1. Open the resources/repair-requests-connector.yaml file, and apply the given values in the preceding tables.

      The file content should look like the following:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        labels:
          strimzi.io/cluster: my-connect-cluster
        name: repair-requests-connector
      spec:
        class: io.debezium.connector.postgresql.PostgresConnector
        config:
          plugin.name: pgoutput
          database.hostname: postgresql-repair-requests
          database.port: 5432
          database.user: postgres
          database.password: postgres
          database.dbname: waterleakdb
          database.server.name: waterleakdb1
          schema.include.list : public
          table.include.list: public.outboxevent
          tombstones.on.delete: false
          transforms: EventRouter
          transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter
          transforms.EventRouter.table.fields.additional.placement: type:header:eventType
          transforms.EventRouter.route.topic.replacement: ${routedByValue}.events
        tasksMax: 1
    2. Use the oc command to create the connector.

      (.venv) [user@host connect-integration]$ oc create -f \
      resources/repair-requests-connector.yaml
      kafkaconnector.kafka.strimzi.io/repair-requests-connector created
  3. Create a Camel Elasticsearch REST Sink connector that conforms to the following specifications:

    Table 5.8. Camel Elasticsearch REST Sink Connector Specifications

    Name
    incidents-connector
    Connector class
    org.apache.camel.kafkaconnector
    .elasticsearchrest
    .CamelElasticsearchrestSinkConnector
    Connect cluster
    my-connect-cluster
    Max tasks
    1

    Add the following configurations to the Camel Elasticsearch REST Sink connector specification:

    Table 5.9. Camel Elasticsearch REST Sink Connector Configurations

    camel.sink.endpoint.hostAddresses
    elasticsearch-es-http:9200
    camel.sink.endpoint.operation
    Index
    camel.sink.path.clusterName
    elasticsearch
    camel.sink.endpoint.indexName
    incidents
    key.converter
    org.apache.kafka.connect.json
    .JsonConverter
    value.converter
    org.apache.kafka.connect.json
    .JsonConverter
    key.converter.schemas.enable
    false
    value.converter.schemas.enable
    false
    topics
    repair-request.events
    errors.tolerance
    all

    1. Open the resources/incidents-connector.yaml file, and apply the given values in the preceding tables.

      The file content should look like the following:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        labels:
          strimzi.io/cluster: my-connect-cluster
        name: incidents-connector
      spec:
        class: org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector
        config:
          camel.sink.endpoint.hostAddresses: 'elasticsearch-es-http:9200'
          camel.sink.endpoint.operation: Index
          camel.sink.path.clusterName: elasticsearch
          camel.sink.endpoint.indexName: incidents
          key.converter: org.apache.kafka.connect.json.JsonConverter
          value.converter: org.apache.kafka.connect.json.JsonConverter
          key.converter.schemas.enable: false
          value.converter.schemas.enable: false
          topics: repair-request.events
          errors.tolerance: all
        tasksMax: 1
    2. Use the oc command to create the connector.

      (.venv) [user@host connect-integration]$ oc create -f \
      resources/incidents-connector.yaml
      kafkaconnector.kafka.strimzi.io/incidents-connector created
  4. Move to the repair-request-service directory, which contains the source code of the Repair Request Service application. In the com.redhat.training.event.RequestCreatedEvent class, implement the of method that initializes a RequestCreatedEvent object from a RepairRequest object. Use the ObjectMapper instance mapper to create an ObjectNode.

    The keys asJson of type ObjectNode should contain the following:

    • id

    • requesterName

    • requestDate

    • status

    • plumberId

    1. Open the RequestCreatedEvent class and initialize the ObjectNode asJson with the provided keys in the of method.

      // TODO: Implement the method that initializes a RequestCreatedEvent from a RepairRequest object.
      public static RequestCreatedEvent of(RepairRequest repairRequest) {
          ObjectNode asJson = mapper.createObjectNode()
                  .put("id", repairRequest.getId())
                  .put("requesterName", repairRequest.getRequesterName())
                  .put("requestDate", repairRequest.getRequestDate().toString())
                  .put("status", repairRequest.getStatus().toString())
                  .put("plumberId", repairRequest.getPlumberId());
          return new RequestCreatedEvent(repairRequest.getId(), asJson);
      }
  5. In the com.redhat.training.service.RepairRequestService class, find the method create that persists the Plumber and RepairRequest data into the PostgreSQL database. Fire a RequestCreatedEvent event that has the repair request data after it is persisted. Notice that the create method is already transactional, so in case of a failure while persisting into the database, the event will not be created.

    1. Open the RepairRequestService class and call event.fire(…​) with the initialized RequestCreatedEvent event in the create method.

      // TODO: Fire a RequestCreatedEvent event
      event.fire(RequestCreatedEvent.of(repairRequest));
  6. Run the Repair Request Service, Plumber Service and the Incidents Archive Service applications to examine how the data is shared across the services via events. Follow the following steps to test the WaterLeak Inc. repair request application process.

    1. Setup port forwarding for the Repair Request Service and Plumber Service between your local machine and the PostgreSQL database running on the OpenShift cluster. Open two new terminal windows to run the following commands in each terminal by leaving the main terminal open for other tasks. In the first terminal window, run the following commands.

      [user@host ~]$ oc get pods -o name
      NAME
      ...output omitted...
      postgresql-repair-requests-POD_SUFFIX_1
      postgresql-plumbers-POD_SUFFIX_2
      ...output omitted...
      [user@host ~]$ oc port-forward \
       postgresql-repair-requests-POD_SUFFIX_1 5432:5432
      Forwarding from 127.0.0.1:5432 -> 5432
      Forwarding from [::1]:5432 -> 5432

      In the second terminal window, run the following command. Use the pod name that you received from the previous terminal session.

      [user@host ~]$ oc port-forward \
       postgresql-plumbers-POD_SUFFIX_2 5433:5432
      Forwarding from 127.0.0.1:5433 -> 5432
      Forwarding from [::1]:5433 -> 5432

      Leave the port forwarding running in the terminals.

    2. In a new terminal window navigate to the repair-request-service directory in your workspace to compile and run the Repair Request Service.

      [user@host repair-request-service]$ ./mvnw package quarkus:dev
      ...output omitted...
      2021-09-24 10:58:05,031 INFO  [io.quarkus] (Quarkus Main Thread) repair-request-service ... started in 6.790s. Listening on: http://localhost:8080
      ...output omitted...

      Note that the application runs on http://localhost:8080. Leave the application running in this terminal.

    3. In a new terminal window navigate to the plumber-service directory in your workspace to compile and run the Plumber Service.

      [user@host plumber-service]$ ./mvnw package quarkus:dev
      ...output omitted...
      2021-09-24 11:00:46,064 INFO  [io.quarkus] (Quarkus Main Thread) plumber-service ... started in 5.472s. Listening on: http://localhost:8081
      ...output omitted...

      Note that the application runs on http://localhost:8081. Leave the application running in this terminal.

    4. Run the following command to get the Elasticsearch route address.

      (.venv) [user@host connect-integration]$ oc get route elasticsearch-es-http \
       -o jsonpath="{.spec.host}{'\n'}"
      YOUR_ELASTICSEARCH_ROUTE_PREFIX.apps.cluster.example.com
    5. Open the src/main/resources/application.properties file in the incidents-archive-service directory in your workspace. Replace the ELASTICSEARCH_ROUTE_URL with your Elasticsearch route address.

      quarkus.http.port=8082
      
      quarkus.elasticsearch.hosts = ELASTICSEARCH_ROUTE_URL:80
    6. In a new terminal window navigate to the incidents-archive-service directory to compile and run the Incidents Archive Service.

      [user@host incidents-archive-service]$ ./mvnw package quarkus:dev
      ...output omitted...
      2021-09-24 11:05:34,849 INFO  [io.quarkus] (Quarkus Main Thread) incidents-archive-service ... started in 2.071s. Listening on: http://localhost:8082
      ...output omitted...

      Note that the application runs on http://localhost:8082. Leave the application running in this terminal.

    7. Open a browser tab and navigate to http://localhost:8080 for the Repair Request Service application. Create two repair requests. For the first repair request, enter Pablo as the requester name, and for the second request, enter the name Jaime. You should see the IDs of the assigned plumbers for each request.

    8. Open a new browser tab and navigate to http://localhost:8081 for the Plumber Service application. You should see the first two plumbers' state as ASSIGNED. If you do not see it, refresh the page a few times until you see the assigned plumbers in the list.

    9. Open a new browser tab and navigate to http://localhost:8082 for the Incidents Archive Service application. Click on the Search to see the created repair request incident records.

Evaluation

Go back to your workspace. Run the lab command to grade your work. Correct any reported failures and rerun the command until successful.

(.venv) [user@host AD482]$ lab grade connect-integration

Terminate all the running applications.

Finish

Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish connect-integration

This concludes the lab.

Revision: ad482-1.8-cc2ae1c