Bookmark this page

Guided Exercise: Capturing Change Event Data with Debezium

In this exercise you will capture event changes with Debezium by using the Outbox Pattern for the application to prevent dual writes.

Outcomes

You should be able to configure Debezium connectors and use an outbox table for the application to capture changed event data and write them in a Kafka topic.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The OpenShift CLI (oc).

  • The JDK installed.

From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.

[user@host AD482]$ source .venv/bin/activate
(.venv) [user@host AD482]$ lab start connect-debezium
...output omitted...

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

The lab command copies the exercise files, from the AD482-apps/connect-debezium/apps directory, which is in your local Git repository, into the connect-debezium directory, which is in your workspace.

Procedure 5.4. Instructions

  1. Deploy the Debezium connector for PostgreSQL.

    1. Navigate to the connect-debezium directory in your workspace.

    2. Open the postgresql-debezium-connector.yaml file in the resources directory. You should see the content as follows.

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      metadata:
        labels:
          strimzi.io/cluster: my-connect-cluster
        name: postgresql-debezium-connector
      spec:
        class: io.debezium.connector.postgresql.PostgresConnector
        config:
          plugin.name: pgoutput
          database.hostname: postgresql
          database.port: 5432
          database.user: postgres
          database.password: postgres
          database.dbname: accountantsdb
          database.server.name: accountantsdb1
          schema.include.list : public
          table.include.list: public.outboxevent
          tombstones.on.delete: false
          transforms: EventRouter
          transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter
          transforms.EventRouter.table.fields.additional.placement: type:header:eventType
          transforms.EventRouter.route.topic.replacement: ${routedByValue}
        tasksMax: 1
    3. In your terminal, create the postgresql-debezium-connector.yaml connector resource by running the following command.

      (.venv) [user@host connect-debezium]$ oc create \
       -f resources/postgresql-debezium-connector.yaml
      kafkaconnector.kafka.strimzi.io/postgresql-debezium-connector created
    4. Examine the connector by executing the following command until you see it is in the RUNNING state.

      (.venv) [user@host resources]$ oc get kafkaconnectors -w \
       -o=custom-columns=NAME:.metadata.name,STATE:.status.connectorStatus.connector.state
      NAME                            STATE
      postgresql-debezium-connector   RUNNING
  2. Examine the AccountAnt application and run it.

    1. Open the AccountAnts application that is in the accountants directory. This application uses two PostgreSQL tables respectively for persisting the accountants and their event data: accountants and outboxevent.

    2. Open the com.redhat.training.service.AccountantsService class to examine create and update methods.

      While the create method persists the accountant data to the accountants table, the update method updates their status. Both methods fire events which are for create or update actions. These events are persisted in the outboxevent table temporarily. Debezium captures the temporarily persisted events.

          @Transactional
          public Long create(Accountant accountant) {
      ...output omitted...
              accountant.persist();
      
              event.fire(AccountantCreatedEvent.of(accountant));
      ...output omitted...
          }
      
          @Transactional
          public Accountant update(@PathParam Long id, Accountant accountant) {
      ...output omitted...
              entity.setStatus(accountant.getStatus());
      
              event.fire(AccountantUpdatedEvent.of(entity));
      ...output omitted...
          }

      Examine that both methods run in transactions with the help of the @Transactional method. This ensures them to run the operations for accountants and outboxevent tables in a single transaction per create or update. In case of any errors, the transaction is rolled back automatically, so if the application creates or updates no accountants data, then it will create no events.

    3. Setup port forwarding between your local machine and the PostgreSQL database running on the OpenShift cluster. You must get the PostgreSQL pod name for this.

      (.venv) [user@host accountants]$ oc get pods -o name
      NAME
      ...output omitted...
      pod/postgresql-POD_SUFFIX
      ...output omitted...

      In another terminal window, run the following command to make the port forwarding active. This allows you to access the remote PostgreSQL instance locally.

      [user@host]$ oc port-forward \
       postgresql-POD_SUFFIX 5432:5432
      Forwarding from 127.0.0.1:5432 -> 5432
      Forwarding from [::1]:5432 -> 5432

      Leave the terminal window open.

      Warning

      Make sure that 5432 port is not already in use on your local workstation. This prevents you using the 5432 port for port forwarding.

    4. Open a new terminal window, compile and run the application that is in the accountants directory.

      (.venv) [user@host accountants]$ ./mvnw package quarkus:dev
      ...output omitted...
      2021-08-24 13:44:28,361 INFO  [io.quarkus] (Quarkus Main Thread) accountants ... started in 5.615s. Listening on: http://localhost:8080
      ...output omitted...
  3. Examine the change data capture for the accountant events.

    1. In a new terminal window, run the Kafka console consumer for the topic accountant-event.

      [user@host]$ oc exec -it my-cluster-kafka-0 -- bin/kafka-console-consumer.sh \
       --bootstrap-server my-cluster-kafka-brokers:9092 \
       --topic accountant-event

      Because you have not created any accountant events yet, there should be no message listed in the output. Leave the terminal window open and the console consumer working.

    2. Open a browser tab and navigate to http://localhost:8080. You should see the web interface of the AccountAnts application. Click on CREATE five times to create five accountants whose Name and SSN are randomly generated.

    3. In your terminal, you will see the console consumer has accountant outbox events created for the JOINED status of the newly created accountants.

      {"schema":{"type":"string","optional":true},"payload":"{\"id\":1,\"userName\":\"Emily\",\"ssn\":947031975,\"status\":\"JOINED\"}"}
      ...output omitted...
      {"schema":{"type":"string","optional":true},"payload":"{\"id\":5,\"userName\":\"Joe\",\"ssn\":195498058,\"status\":\"JOINED\"}"}
    4. Update some accountants by changing their status. Pick the second accountant to be PROMOTED and the fourth one to QUIT. Click on the EDIT STATUS buttons for each and select the relevant status data from the drop down selection for Accountant Status. Click on UPDATE for each to save their new status.

    5. In your console consumer terminal, you can see the captured event changes as follows.

      {"schema":{"type":"string","optional":true},"payload":"{\"id\":2,\"userName\":\"Jacob\",\"ssn\":770007954,\"status\":\"PROMOTED\"}"}
      {"schema":{"type":"string","optional":true},"payload":"{\"id\":4,\"userName\":\"Alfie\",\"ssn\":959716346,\"status\":\"QUIT\"}"}

      Stop the AccountAnts application, the console consumer and the port forwarding.

Finish

Return to your workspace directory and use the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish connect-debezium

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c