Bookmark this page

Guided Exercise: Transforming Messages

In this exercise you will perform Single Message Transformations on the data generated by the AccountAnts application. The application stores accountant users in a PostgreSQL database, and uses Kafka Connect for moving data from the database to Elasticsearch.

The application uses two Kafka Connectors:

  • A source connector that extracts data from a PostgreSQL database.

  • A sink connector that stores the extracted data into an Elasticsearch search engine.

Outcomes

You should be able to apply Single Message Transformations in the sink connector.

You can find the solution files for this exercise in the AD482-apps repository, within the connect-transformation/solutions directory.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The OpenShift CLI (oc)

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start connect-transformation
...output omitted...

The lab command copies the exercise files, from the AD482-apps/connect-transformation/apps directory, which is in your local Git repository, into the connect-transformation directory, which is in your workspace.

Procedure 5.3. Instructions

  1. Navigate to the connect-transformation directory located in your workspace directory.

    (.venv) [user@host AD482]$ cd connect-transformation
    (.venv) [user@host connect-transformation]$
  2. Verify that the Kafka Connect cluster is in the READY state, and the connectors are in the RUNNING state.

    1. Use the oc get kafkaconnects command to verify that the Kafka Connect cluster is in the READY state.

      (.venv) [user@host connect-transformation]$ oc get kafkaconnects
      NAME                 DESIRED REPLICAS   READY
      my-connect-cluster   1                  True
    2. Use the oc get kafkaconnectors command to verify that the sink and source connectors are in the RUNNING state.

      (.venv) [user@host connect-transformation]$ oc get kafkaconnectors \
      -o=custom-columns=NAME:.metadata.name,STATE:.status.connectorStatus.connector.state
      NAME                           STATE
      elasticsearch-sink-connector   RUNNING
      jdbc-source-connector          RUNNING
  3. Run the AccountAnts application, and create an accountant.

    1. Use the oc get pods command to get the pod name running the PostgreSQL database.

      (.venv) [user@host connect-transformation]$ oc get pods \
       --selector name=postgresql
      NAME                    READY   STATUS    RESTARTS   AGE
      postgresql-POD_SUFFIX   1/1     Running   0          104m

      Copy the pod name for later use.

    2. Use the oc port-forward command to redirect all communication requests from your 5432 local port to the 5432 port of the PostgreSQL pod.

      (.venv) [user@host connect-transformation]$ oc port-forward \
       postgresql-POD_SUFFIX 5432:5432
      Forwarding from 127.0.0.1:5432 -> 5432

      Leave this terminal window open.

      Warning

      Make sure port 5432 is not in use on your local workstation.

    3. Open a new terminal window, navigate to the connect-transformation/accountants directory of your workspace, and run the example application.

      [user@host accountants]$ ./mvnw clean package quarkus:dev
      ...output omitted...
    4. Open a web browser, and navigate to http://localhost:8080. Click CREATE to create an accountant.

      On each click, the application creates an accountant with the Name and SSN randomly generated.

  4. Verify that the data received on the Elasticsearch sink connector instance contains the SSN for the created accountant.

    1. Open a new terminal window, navigate to the exercise directory, and use the oc get route command to get the URL of the Elasticsearch instance.

      (.venv) [user@host connect-transformation]$ oc get route elasticsearch-es-http \
       -o jsonpath="{'http://'}{.spec.host}{'/accountants/_search\n'}"
      http://YOUR_ELASTICSEARCH_URL/accountants/_search
    2. Open a web browser, and navigate to the URL that you just retrieved. Verify that the Elasticsearch response contains all the details of the accountant that you just created.

      {...output omitted...
      {"id":1,"username":"Emily","ssn":914113680}}]}}

      Notice that the created accountant has an SSN value assigned.

  5. Transform the ssn field into a zero value by using the MaskField transformation. This transformation converts the ssn value into 0 before going to Elasticsearch.

    1. By using your editor of choice, open the connect-transformation/resources/jdbc-source-connector.yaml file. The file contains the source connector configuration.

    2. Add the MaskField transformation to the source connector.

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      ...output omitted...
        config:
          ...output omitted...
          # TODO: Add Single Message Transformations
          transforms: Mask
          transforms.Mask.type: org.apache.kafka.connect.transforms.MaskField$Value
          transforms.Mask.fields: ssn
        tasksMax: 1
    3. Use the oc apply command to apply the new connector configuration.

      [user@host connect-transformation]$ oc apply \
       -f resources/jdbc-source-connector.yaml
      kafkaconnector.kafka.strimzi.io/jdbc-source-connector configured

      Note

      The changes to the Kafka Connector instances might take some time to be available.

  6. Verify that the transformation applied to the source connector removes the values of the ssn field.

    1. Return to the AccountAnts tab in your browser, and then click CREATE to generate a new accountant.

    2. Return to the Elasticsearch tab in your browser, and then reload the page. The new record available in Elasticsearch has the ssn field transformed into 0.

      {...output omitted...
      {"id":2,"username":"Jacob","ssn":0}}]}}
  7. Add a chained transformation to the source connector that adds a new field named data-source. Set the value of the new field to database.

    1. Add the InsertField transformation to the source connector.

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaConnector
      ...output omitted...
        config:
          ...output omitted...
          # TODO: Add Single Message Transformations
          transforms: Mask,Insert
          transforms.Mask.type: org.apache.kafka.connect.transforms.MaskField$Value
          transforms.Mask.fields: ssn
          transforms.Insert.type: org.apache.kafka.connect.transforms.InsertField$Value
          transforms.Insert.static.field: data-source
          transforms.Insert.static.value: database
        tasksMax: 1
    2. Use the oc apply command to apply the new connector configuration.

      [user@host connect-transformation]$ oc apply \
       -f resources/jdbc-source-connector.yaml
      kafkaconnector.kafka.strimzi.io/jdbc-source-connector configured

      Note

      The changes to the Kafka Connector instances might take some time to be available.

  8. Verify that the chained transformations applied to the source connector removes the values of the ssn field, and adds the data-source field.

    1. Return to the AccountAnts tab in your browser, and then click CREATE to generate a new accountant.

    2. Return to the Elasticsearch tab in your browser, and then reload the page. The new record available in Elasticsearch has the ssn field transformed into 0, and the new data-source field.

      {...output omitted...
      {"data-source":"database", "id":4,"username":"Freddie","ssn":0}}]}}
    3. Stop the application and the port forwarding.

Finish

Go back to your workspace. Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish connect-transformation

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c