Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will perform Single Message Transformations on the data generated by the AccountAnts application.
The application stores accountant users in a PostgreSQL database, and uses Kafka Connect for moving data from the database to Elasticsearch.
The application uses two Kafka Connectors:
A source connector that extracts data from a PostgreSQL database.
A sink connector that stores the extracted data into an Elasticsearch search engine.
Outcomes
You should be able to apply Single Message Transformations in the sink connector.
You can find the solution files for this exercise in the AD482-apps repository, within the connect-transformation/solutions directory.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The OpenShift CLI (oc)
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start connect-transformation
...output omitted...
The lab command copies the exercise files, from the AD482-apps/connect-transformation/apps directory, which is in your local Git repository, into the connect-transformation directory, which is in your workspace.
Procedure 5.3. Instructions
Navigate to the
connect-transformationdirectory located in your workspace directory.(.venv) [user@host AD482]$
cd connect-transformation(.venv) [user@host connect-transformation]$Verify that the Kafka Connect cluster is in the
READYstate, and the connectors are in theRUNNINGstate.Use the
oc get kafkaconnectscommand to verify that the Kafka Connect cluster is in theREADYstate.(.venv) [user@host connect-transformation]$
oc get kafkaconnectsNAME DESIRED REPLICAS READY my-connect-cluster 1TrueUse the
oc get kafkaconnectorscommand to verify that the sink and source connectors are in theRUNNINGstate.(.venv) [user@host connect-transformation]$
oc get kafkaconnectors \-o=custom-columns=NAME:.metadata.name,STATE:.status.connectorStatus.connector.stateNAME STATE elasticsearch-sink-connectorRUNNINGjdbc-source-connectorRUNNING
Run the
AccountAntsapplication, and create an accountant.Use the
oc get podscommand to get the pod name running the PostgreSQL database.(.venv) [user@host connect-transformation]$
oc get pods \--selector name=postgresqlNAME READY STATUS RESTARTS AGE postgresql-POD_SUFFIX 1/1 Running 0 104mCopy the pod name for later use.
Use the
oc port-forwardcommand to redirect all communication requests from your5432local port to the5432port of the PostgreSQL pod.(.venv) [user@host connect-transformation]$
oc port-forward \postgresql-Forwarding from 127.0.0.1:5432 -> 5432POD_SUFFIX5432:5432Leave this terminal window open.
Warning
Make sure port
5432is not in use on your local workstation.Open a new terminal window, navigate to the
connect-transformation/accountantsdirectory of your workspace, and run the example application.[user@host accountants]$
./mvnw clean package quarkus:dev...output omitted...Open a web browser, and navigate to
http://localhost:8080. Click to create an accountant.On each click, the application creates an accountant with the
NameandSSNrandomly generated.
Verify that the data received on the Elasticsearch sink connector instance contains the SSN for the created accountant.
Open a new terminal window, navigate to the exercise directory, and use the
oc get routecommand to get the URL of the Elasticsearch instance.(.venv) [user@host connect-transformation]$
oc get route elasticsearch-es-http \-o jsonpath="{'http://'}{.spec.host}{'/accountants/_search\n'}"http://YOUR_ELASTICSEARCH_URL/accountants/_searchOpen a web browser, and navigate to the URL that you just retrieved. Verify that the Elasticsearch response contains all the details of the accountant that you just created.
{...output omitted... {"id":1,"username":"Emily","ssn":914113680}}]}}Notice that the created accountant has an SSN value assigned.
Transform the
ssnfield into a zero value by using theMaskFieldtransformation. This transformation converts thessnvalue into0before going to Elasticsearch.By using your editor of choice, open the
connect-transformation/resources/jdbc-source-connector.yamlfile. The file contains the source connector configuration.Add the
MaskFieldtransformation to the source connector.apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector ...output omitted... config: ...output omitted... # TODO: Add Single Message Transformations
transforms: Mask transforms.Mask.type: org.apache.kafka.connect.transforms.MaskField$Value transforms.Mask.fields: ssntasksMax: 1Use the
oc applycommand to apply the new connector configuration.[user@host connect-transformation]$
oc apply \-f resources/jdbc-source-connector.yamlkafkaconnector.kafka.strimzi.io/jdbc-source-connector configuredNote
The changes to the Kafka Connector instances might take some time to be available.
Verify that the transformation applied to the source connector removes the values of the
ssnfield.Return to the AccountAnts tab in your browser, and then click to generate a new accountant.
Return to the Elasticsearch tab in your browser, and then reload the page. The new record available in Elasticsearch has the
ssnfield transformed into0.{...output omitted... {"id":2,"username":"Jacob","ssn":0}}]}}
Add a chained transformation to the source connector that adds a new field named
data-source. Set the value of the new field todatabase.Add the
InsertFieldtransformation to the source connector.apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector ...output omitted... config: ...output omitted... # TODO: Add Single Message Transformations transforms:
Mask,Inserttransforms.Mask.type: org.apache.kafka.connect.transforms.MaskField$Value transforms.Mask.fields: ssntransforms.Insert.type: org.apache.kafka.connect.transforms.InsertField$Value transforms.Insert.static.field: data-source transforms.Insert.static.value: databasetasksMax: 1Use the
oc applycommand to apply the new connector configuration.[user@host connect-transformation]$
oc apply \-f resources/jdbc-source-connector.yamlkafkaconnector.kafka.strimzi.io/jdbc-source-connector configuredNote
The changes to the Kafka Connector instances might take some time to be available.
Verify that the chained transformations applied to the source connector removes the values of the
ssnfield, and adds thedata-sourcefield.Return to the AccountAnts tab in your browser, and then click to generate a new accountant.
Return to the Elasticsearch tab in your browser, and then reload the page. The new record available in Elasticsearch has the
ssnfield transformed into0, and the newdata-sourcefield.{...output omitted... {"data-source":"database", "id":4,"username":"Freddie","ssn":0}}]}}Stop the application and the port forwarding.
This concludes the guided exercise.