Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will capture event changes with Debezium by using the Outbox Pattern for the application to prevent dual writes.
Outcomes
You should be able to configure Debezium connectors and use an outbox table for the application to capture changed event data and write them in a Kafka topic.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The OpenShift CLI (oc).
The JDK installed.
From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.
[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$lab start connect-debezium...output omitted...
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1The lab command copies the exercise files, from the AD482-apps/connect-debezium/apps directory, which is in your local Git repository, into the connect-debezium directory, which is in your workspace.
Procedure 5.4. Instructions
Deploy the Debezium connector for PostgreSQL.
Navigate to the
connect-debeziumdirectory in your workspace.Open the
postgresql-debezium-connector.yamlfile in theresourcesdirectory. You should see the content as follows.apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: labels: strimzi.io/cluster: my-connect-cluster name: postgresql-debezium-connector spec: class: io.debezium.connector.postgresql.PostgresConnector config: plugin.name: pgoutput database.hostname: postgresql database.port: 5432 database.user: postgres database.password: postgres database.dbname: accountantsdb database.server.name: accountantsdb1 schema.include.list : public table.include.list: public.outboxevent tombstones.on.delete: false transforms: EventRouter transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter transforms.EventRouter.table.fields.additional.placement: type:header:eventType transforms.EventRouter.route.topic.replacement: ${routedByValue} tasksMax: 1In your terminal, create the
postgresql-debezium-connector.yamlconnector resource by running the following command.(.venv) [user@host connect-debezium]$
oc create \-f resources/postgresql-debezium-connector.yamlkafkaconnector.kafka.strimzi.io/postgresql-debezium-connector createdExamine the connector by executing the following command until you see it is in the
RUNNINGstate.(.venv) [user@host resources]$
oc get kafkaconnectors -w \-o=custom-columns=NAME:.metadata.name,STATE:.status.connectorStatus.connector.stateNAME STATE postgresql-debezium-connector RUNNING
Examine the
AccountAntapplication and run it.Open the
AccountAntsapplication that is in theaccountantsdirectory. This application uses two PostgreSQL tables respectively for persisting the accountants and their event data:accountantsandoutboxevent.Open the
com.redhat.training.service.AccountantsServiceclass to examinecreateandupdatemethods.While the
createmethod persists theaccountantdata to theaccountantstable, theupdatemethod updates theirstatus. Both methods fire events which are forcreateorupdateactions. These events are persisted in theoutboxeventtable temporarily. Debezium captures the temporarily persisted events.@Transactionalpublic Long create(Accountant accountant) { ...output omitted...accountant.persist();event.fire(AccountantCreatedEvent.of(accountant));...output omitted... }@Transactionalpublic Accountant update(@PathParam Long id, Accountant accountant) { ...output omitted...entity.setStatus(accountant.getStatus());event.fire(AccountantUpdatedEvent.of(entity));...output omitted... }Examine that both methods run in transactions with the help of the
@Transactionalmethod. This ensures them to run the operations foraccountantsandoutboxeventtables in a single transaction percreateorupdate. In case of any errors, the transaction is rolled back automatically, so if the application creates or updates noaccountantsdata, then it will create no events.Setup port forwarding between your local machine and the PostgreSQL database running on the OpenShift cluster. You must get the PostgreSQL pod name for this.
(.venv) [user@host accountants]$
oc get pods -o nameNAME ...output omitted... pod/postgresql-POD_SUFFIX...output omitted...In another terminal window, run the following command to make the port forwarding active. This allows you to access the remote PostgreSQL instance locally.
[user@host]$
oc port-forward \postgresql-Forwarding from 127.0.0.1:5432 -> 5432 Forwarding from [::1]:5432 -> 5432POD_SUFFIX5432:5432Leave the terminal window open.
Warning
Make sure that
5432port is not already in use on your local workstation. This prevents you using the5432port for port forwarding.Open a new terminal window, compile and run the application that is in the
accountantsdirectory.(.venv) [user@host accountants]$
./mvnw package quarkus:dev...output omitted... 2021-08-24 13:44:28,361 INFO [io.quarkus] (Quarkus Main Thread) accountants ... started in 5.615s. Listening on: http://localhost:8080 ...output omitted...
Examine the change data capture for the accountant events.
In a new terminal window, run the Kafka console consumer for the topic
accountant-event.[user@host]$
oc exec -it my-cluster-kafka-0 -- bin/kafka-console-consumer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic accountant-eventBecause you have not created any accountant events yet, there should be no message listed in the output. Leave the terminal window open and the console consumer working.
Open a browser tab and navigate to http://localhost:8080. You should see the web interface of the
AccountAntsapplication. Click on five times to create five accountants whoseNameandSSNare randomly generated.In your terminal, you will see the console consumer has accountant outbox events created for the
JOINEDstatus of the newly created accountants.{"schema":{"type":"string","optional":true},"payload":"{\"id\":1,\"userName\":\"Emily\",\"ssn\":947031975,\"status\":\"JOINED\"}"} ...output omitted... {"schema":{"type":"string","optional":true},"payload":"{\"id\":5,\"userName\":\"Joe\",\"ssn\":195498058,\"status\":\"JOINED\"}"}Update some accountants by changing their status. Pick the second accountant to be
PROMOTEDand the fourth one toQUIT. Click on the buttons for each and select the relevant status data from the drop down selection forAccountant Status. Click on for each to save their new status.In your console consumer terminal, you can see the captured event changes as follows.
{"schema":{"type":"string","optional":true},"payload":"{\"id\":2,\"userName\":\"Jacob\",\"ssn\":770007954,\"status\":\"PROMOTED\"}"} {"schema":{"type":"string","optional":true},"payload":"{\"id\":4,\"userName\":\"Alfie\",\"ssn\":959716346,\"status\":\"QUIT\"}"}Stop the
AccountAntsapplication, the console consumer and the port forwarding.
This concludes the guided exercise.