Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this lab, you will implement a small part of the business logic of a company called WaterLeak Inc. WaterLeak Inc. has a system for creating repair requests for any water leak incident. When a repair request is created, it is assigned to an available plumber and an incident record is created in the archive.
The system has three applications:
The
Repair Request Serviceenables creating repair requests and saves them into its own PostgreSQL database. Each created record in the database triggers an outbox event that a Debezium Kafka Connect connector captures and sends to a Kafka topic.The
Plumber Serviceis a service application that is responsible for plumber specific actions. When a repair request is created, an available plumber is selected, this triggers an event by marking the plumber asAssignedin the service's PostgreSQL database.The
Incidents Archive Servicekeeps records of all the incidents that theRepair Request Servicecreates. Provides an interface for searching incidents. Thus, uses Elasticsearch as a data store. A Kafka Connect connector receives the repair request data from a Kafka topic and indexes it in Elasticsearch.
Outcomes
You should be able to create a Kafka Connect cluster, create a Debezium connector that receives data from a PostgreSQL database and writes it to a Kafka cluster, and create a connector that consumes and saves the data to Elasticsearch. Also, you should be able to implement the Outbox pattern and CDC by using Debezium.
You will find the solution files for this exercise in the AD482-apps repository, within the connect-integration/solutions directory.
Notice that you might need to replace some strings in the YAML and Java files.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
The OpenShift CLI (oc).
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/bin/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start connect-integration
The lab command copies the exercise files, from the AD482-apps/connect-integration/apps directory, which is in your local Git repository, into the connect-integration directory, which is in your workspace.
Procedure 5.5. Instructions
In your workspace, move to the
connect-integrationdirectory. Create a Kafka Connect cluster that conforms to the following specifications:Table 5.3. Cluster Specifications
Name my-connect-cluster
Bootstrap servers my-cluster-kafka-bootstrap:9092
Image quay.io/redhattraining/ad482-ch05s09-connect-cluster:latest
Replica count 1
Add the following configurations to the Kafka Connect cluster specification:
Table 5.4. Cluster Configurations
key.converter org.apache.kafka.connect.storage .StringConverter
value.converter org.apache.kafka.connect.storage .StringConverter
config.storage.topic my-connect-cluster-configs
offset.storage.topic my-connect-cluster-offsets
status.storage.topic my-connect-cluster-status
config.storage.replication.factor 1
offset.storage.replication.factor 1
status.storage.replication.factor 1
The Kafka Connect cluster should use connector resources for managing connectors, so set the
strimzi.io/use-connector-resourcesannotation totrue.By using your editor of choice, open the
resources/my-connect-cluster.yamlfile, and apply the given values in the preceding tables.The file content should look like the following:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: annotations: strimzi.io/use-connector-resources: 'true' name: my-connect-cluster spec: bootstrapServers: 'my-cluster-kafka-bootstrap:9092' image: 'quay.io/redhattraining/ad482-ch05s09-connect-cluster:latest' config: config.storage.topic: my-connect-cluster-configs offset.storage.topic: my-connect-cluster-offsets status.storage.topic: my-connect-cluster-status key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.storage.StringConverter config.storage.replication.factor: 1 offset.storage.replication.factor: 1 status.storage.replication.factor: 1 replicas: 1 version: 2.8.0Use the
occommand to create the cluster in your project.(.venv) [user@host connect-integration]$
oc create -f \resources/my-connect-cluster.yamlkafkaconnect.kafka.strimzi.io/my-connect-cluster created
Create a PostgreSQL Debezium connector that conforms to the following specifications:
Table 5.5. PostgreSQL Debezium Connector Specifications
Name repair-requests-connector
Connector class io.debezium.connector.postgresql .PostgresConnector
Connect cluster my-connect-cluster
Max tasks 1
Add the following configurations to the PostgreSQL Debezium connector specification:
Table 5.6. PostgreSQL Debezium Connector Configurations
plugin.name pgoutput
database.hostname postgresql-repair-requests
database.port 5432
database.user postgres
database.password postgres
database.dbname waterleakdb
database.server.name waterleakdb1
schema.include.list public
table.include.list public.outboxevent
tombstones.on.delete false
Finally, add the following transformations to the PostgreSQL Debezium connector configuration:
Table 5.7. PostgreSQL Debezium Connector Transformations
transforms EventRouter
transforms.EventRouter.type io.debezium.transforms .outbox.EventRouter
transforms.EventRouter.table.fields.additional.placement type:header:eventType
transforms.EventRouter.route.topic.replacement ${routedByValue}.eventsOpen the
resources/repair-requests-connector.yamlfile, and apply the given values in the preceding tables.The file content should look like the following:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: repair-requests-connector spec: class: io.debezium.connector.postgresql.PostgresConnector config: plugin.name: pgoutput database.hostname: postgresql-repair-requests database.port: 5432 database.user: postgres database.password: postgres database.dbname: waterleakdb database.server.name: waterleakdb1 schema.include.list : public table.include.list: public.outboxevent tombstones.on.delete: false transforms: EventRouter transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter transforms.EventRouter.table.fields.additional.placement: type:header:eventType transforms.EventRouter.route.topic.replacement: ${routedByValue}.events tasksMax: 1Use the
occommand to create the connector.(.venv) [user@host connect-integration]$
oc create -f \resources/repair-requests-connector.yamlkafkaconnector.kafka.strimzi.io/repair-requests-connector created
Create a Camel Elasticsearch REST Sink connector that conforms to the following specifications:
Table 5.8. Camel Elasticsearch REST Sink Connector Specifications
Name incidents-connector
Connector class org.apache.camel.kafkaconnector .elasticsearchrest .CamelElasticsearchrestSinkConnector
Connect cluster my-connect-cluster
Max tasks 1
Add the following configurations to the Camel Elasticsearch REST Sink connector specification:
Table 5.9. Camel Elasticsearch REST Sink Connector Configurations
camel.sink.endpoint.hostAddresses elasticsearch-es-http:9200
camel.sink.endpoint.operation Index
camel.sink.path.clusterName elasticsearch
camel.sink.endpoint.indexName incidents
key.converter org.apache.kafka.connect.json .JsonConverter
value.converter org.apache.kafka.connect.json .JsonConverter
key.converter.schemas.enable false
value.converter.schemas.enable false
topics repair-request.events
errors.tolerance all
Open the
resources/incidents-connector.yamlfile, and apply the given values in the preceding tables.The file content should look like the following:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: incidents-connector spec: class: org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector config: camel.sink.endpoint.hostAddresses: 'elasticsearch-es-http:9200' camel.sink.endpoint.operation: Index camel.sink.path.clusterName: elasticsearch camel.sink.endpoint.indexName: incidents key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false topics: repair-request.events errors.tolerance: all tasksMax: 1Use the
occommand to create the connector.(.venv) [user@host connect-integration]$
oc create -f \resources/incidents-connector.yamlkafkaconnector.kafka.strimzi.io/incidents-connector created
Move to the
repair-request-servicedirectory, which contains the source code of theRepair Request Serviceapplication. In thecom.redhat.training.event.RequestCreatedEventclass, implement theofmethod that initializes aRequestCreatedEventobject from aRepairRequestobject. Use theObjectMapperinstancemapperto create anObjectNode.The keys
asJsonof typeObjectNodeshould contain the following:idrequesterNamerequestDatestatusplumberId
Open the
RequestCreatedEventclass and initialize theObjectNode asJsonwith the provided keys in theofmethod.// TODO: Implement the method that initializes a RequestCreatedEvent from a RepairRequest object. public static RequestCreatedEvent of(RepairRequest repairRequest) {ObjectNode asJson = mapper.createObjectNode() .put("id", repairRequest.getId()) .put("requesterName", repairRequest.getRequesterName()) .put("requestDate", repairRequest.getRequestDate().toString()) .put("status", repairRequest.getStatus().toString()) .put("plumberId", repairRequest.getPlumberId());return new RequestCreatedEvent(repairRequest.getId(), asJson); }
In the
com.redhat.training.service.RepairRequestServiceclass, find the methodcreatethat persists thePlumberandRepairRequestdata into the PostgreSQL database. Fire aRequestCreatedEventevent that has the repair request data after it is persisted. Notice that thecreatemethod is already transactional, so in case of a failure while persisting into the database, the event will not be created.Run the
Repair Request Service,Plumber Serviceand theIncidents Archive Serviceapplications to examine how the data is shared across the services via events. Follow the following steps to test the WaterLeak Inc. repair request application process.Setup port forwarding for the
Repair Request ServiceandPlumber Servicebetween your local machine and the PostgreSQL database running on the OpenShift cluster. Open two new terminal windows to run the following commands in each terminal by leaving the main terminal open for other tasks. In the first terminal window, run the following commands.[user@host ~]$
oc get pods -o nameNAME ...output omitted...postgresql-repair-requests-...output omitted... [user@host ~]$POD_SUFFIX_1postgresql-plumbers-POD_SUFFIX_2oc port-forward \postgresql-repair-requests-Forwarding from 127.0.0.1:5432 -> 5432 Forwarding from [::1]:5432 -> 5432POD_SUFFIX_15432:5432In the second terminal window, run the following command. Use the pod name that you received from the previous terminal session.
[user@host ~]$
oc port-forward \postgresql-plumbers-Forwarding from 127.0.0.1:5433 -> 5432 Forwarding from [::1]:5433 -> 5432POD_SUFFIX_25433:5432Leave the port forwarding running in the terminals.
In a new terminal window navigate to the
repair-request-servicedirectory in your workspace to compile and run theRepair Request Service.[user@host repair-request-service]$
./mvnw package quarkus:dev...output omitted... 2021-09-24 10:58:05,031 INFO [io.quarkus] (Quarkus Main Thread) repair-request-service ... started in 6.790s. Listening on:http://localhost:8080...output omitted...Note that the application runs on http://localhost:8080. Leave the application running in this terminal.
In a new terminal window navigate to the
plumber-servicedirectory in your workspace to compile and run thePlumber Service.[user@host plumber-service]$
./mvnw package quarkus:dev...output omitted... 2021-09-24 11:00:46,064 INFO [io.quarkus] (Quarkus Main Thread) plumber-service ... started in 5.472s. Listening on:http://localhost:8081...output omitted...Note that the application runs on
http://localhost:8081. Leave the application running in this terminal.Run the following command to get the Elasticsearch route address.
(.venv) [user@host connect-integration]$
oc get route elasticsearch-es-http \-o jsonpath="{.spec.host}{'\n'}"YOUR_ELASTICSEARCH_ROUTE_PREFIX.apps.cluster.example.comOpen the
src/main/resources/application.propertiesfile in theincidents-archive-servicedirectory in your workspace. Replace theELASTICSEARCH_ROUTE_URLwith your Elasticsearch route address.quarkus.http.port=8082 quarkus.elasticsearch.hosts =
ELASTICSEARCH_ROUTE_URL:80In a new terminal window navigate to the
incidents-archive-servicedirectory to compile and run theIncidents Archive Service.[user@host incidents-archive-service]$
./mvnw package quarkus:dev...output omitted... 2021-09-24 11:05:34,849 INFO [io.quarkus] (Quarkus Main Thread) incidents-archive-service ... started in 2.071s. Listening on:http://localhost:8082...output omitted...Note that the application runs on http://localhost:8082. Leave the application running in this terminal.
Open a browser tab and navigate to http://localhost:8080 for the
Repair Request Serviceapplication. Create two repair requests. For the first repair request, enterPabloas the requester name, and for the second request, enter the nameJaime. You should see the IDs of the assigned plumbers for each request.Open a new browser tab and navigate to http://localhost:8081 for the
Plumber Serviceapplication. You should see the first two plumbers' state asASSIGNED. If you do not see it, refresh the page a few times until you see the assigned plumbers in the list.Open a new browser tab and navigate to http://localhost:8082 for the
Incidents Archive Serviceapplication. Click on the to see the created repair request incident records.
This concludes the lab.