Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise, you will integrate a basic stream processing pipeline in a Quarkus application, by using the Kafka Streams library.
You will implement a simple application to process a stream of payments coming from the payments topic, process these payments with the Kafka Streams library, and write the result to a large-payments topic.
You will find the solution files in the AD482-apps repository, within the streams-interaction/solutions directory.
Outcomes
You should be able to add the Kafka Streams dependency to an application, implement a basic stream processing pipeline, and write the result to another topic.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start streams-interaction
...output omitted...
Copy the Bootstrap Server and Bootstrap Port values from the lab start script output.
You will use these values to configure Kafka Streams.
The lab command copies the exercise files, from the AD482-apps/streams-interaction/apps/payments directory, at your local Git repository, into the streams-interaction directory, at the root of your workspace.
Procedure 3.1. Instructions
Add the Kafka Streams dependency to your Quarkus app.
Change to the
streams-interactiondirectory.[user@host AD482]$
cd streams-interaction[user@host streams-interaction]$This directory contains the Quarkus application for this exercise.
Install the Kafka Streams Quarkus extension.
[user@host streams-interaction]$
./mvnw quarkus:add-extension \-Dextensions="kafka-streams"...output omitted... [INFO] [SUCCESS] ✅ Extension io.quarkus:quarkus-kafka-streams has been installed ...output omitted...Note
In non-Quarkus projects, add the
kafka-streamsartifact ID, from theorg.apache.kafkagroup, to yourpom.xmlfile.For further information, refer to the Libraries and Maven artifacts section of the Kafka Streams documentation.
Specify the required Kafka Streams configuration in the
application.propertiesfile.By using your editor of choice, open the
application.propertiesfile, and configure the Kafka Streams properties as follows:quarkus.kafka-streams.bootstrap-servers =
YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORTquarkus.kafka-streams.topics = payments,large-payments
kafka.security.protocol = SSL
kafka.ssl.truststore.location =
ABSOLUTE_PATH_TO_WORKSPACE_FOLDER/truststore.jkskafka.ssl.truststore.password = password
The Kafka bootstrap server. Set your own Kafka bootstrap server, by specifying the host and the port.
The topics used in the application. Specific to Quarkus, which waits for the topics to exist before executing the Kafka Streams processing pipeline. For this exercise, you will use
paymentsandlarge-paymentsThe security protocol to connect to Kafka. Leave it as
SSL.The location of the TrustStore file required to connect to Kafka. Replace the
ABSOLUTE_PATH_TO_WORKSPACE_FOLDERwith the path to your workspace directory.The password of the TrustStore file required to connect to Kafka. Leave this value as
password.
Implement a basic stream processor and verify that your application reads the payments stream.
Open the
PaymentsStreamclass.Inspect the source code. The
PaymentsStreamclass includes the emptybuildTopologymethod. You must implement the body of this method.In the
buildTopologymethod, create aStreamsBuilderobject.public Topology buildTopology() { // TODO: Create the stream from the "payments" topicStreamsBuilder builder = new StreamsBuilder();// TODO: use foreach to print each message // TODO: process the stream and send the result to the "large-payments" topic // TODO: return the topology }Use the builder to initialize an instance of
KStream. Specifypaymentsas the source topic of the stream by passingpaymentsas the first parameter of thebuilder.streammethod.public Topology buildTopology() { // TODO: Create the stream from the "payments" topic StreamsBuilder builder = new StreamsBuilder();KStream<String, Integer> stream = builder.stream( "payments", Consumed.with(keySerde, valueSerde) );// TODO: use foreach to print each message // TODO: process the stream and send the result to the "large-payments" topic // TODO: return the topology }Note that the
Consumed.withmethod specifies how the application deserializes messages, by using SerDe (Serializer/Deserializer) objects.Message keys, which are null in this case, can use the
keySerdeobject, a String SerDe. Message values are integers, usevalueSerde, which is an Integer SerDe.Add the code to call the
stream.foreachmethod and print each received payment.public Topology buildTopology() { StreamsBuilder builder = new StreamsBuilder(); KStream<String, Integer> stream = builder.stream( "payments", Consumed.with(keySerde, valueSerde) ); // TODO: use foreach to print each messagestream.foreach( (key, value) -> System.out.println("Received payment: " + value) );// TODO: process the stream and send the result to the "large-payments" topic // TODO: return the topology }The
foreachmethod of thestreamobject runs the provided function once for each payment.Make the
builderobject build and return a topology, which contains the stream processing pipeline that you just defined.public Topology buildTopology() { ...output omitted... // TODO: return the topologyreturn builder.build();}Run the
produce_integer_payments.pyscript to start producing payments. Pass the topic name,payments, as a parameter to the script.(.venv) [user@host streams-interaction]$
python \scripts/produce_integer_payments.py payments...output omitted... Payment sent: 1217 USD Payment sent: 136 USD Payment sent: 1446 USDLeave the producer script running.
Open a new terminal window, navigate to the
streams-interactiondirectory of your workspace, run the Quarkus application by using the./mvnw clean package quarkus:devcommand, and observe how the application receives the payment messages.[user@host streams-interaction]$
./mvnw clean package quarkus:dev...output omitted... Received payment: 1980 Received payment: 1995 Received payment: 1934Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.
Note
The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the
./mvnw clean package quarkus:devcommand again.
Filter payments greater than 1000 and send them to the
large-paymentstopic.In the
buildTopologymethod of thePaymentsStreamclass, add the code to filter large payments.public Topology buildTopology() { ...output omitted... // TODO: process the stream and send the result to the "large-payments" topicstream .filter((key, amount) -> amount > 1000)return builder.build(); }The
filtermethod returns a newKStreamobject, which only includes payments greater than 1000.Add the code to send the filtered stream to the
large-paymentstopic.public Topology buildTopology() { ...output omitted... // TODO: process the stream and send the result to the "large-payments" topic stream .filter((key, amount) -> amount > 1000).to("large-payments", Produced.with(keySerde, valueSerde));return builder.build(); }The
tomethod sends the filtered stream to thelarge-paymentstopic, and uses the same SerDes as before to serialize the produced messages.Wait until the Quarkus dev mode rebuilds the application after you save the file.
Note
Alternatively, you can stop the application and run the
./mvnw clean package quarkus:devcommand again.Switch to the terminal where the Quarkus application is running and verify that the application is still receiving payments.
...output omitted... Received payment: 1974 Received payment: 1281 Received payment: 1269
Run the provided
consume_integer_payments.pyscript. This script reads events from thelarge-paymentstopic.Open a new terminal window, navigate to your workspace directory, and activate the Python virtual environment.
[user@host ~]$
cd AD482[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$Important
On Windows, use the
Activate.ps1script to activate your Python virtual environment.PS C:\Users\user\AD482>
./.venv/Scripts/Activate.ps1Run the
consume_integer_payments.pyscript to consume large payments. Pass the topic name,large-payments, as a parameter to the script.Verify that the consumer receives large payments only.
(.venv) [user@host AD482]$
python \streams-interaction/scripts/consume_integer_payments.py large-paymentsConsuming messages... Received large payment: 1220 USD Received large payment: 1461 USD Received large payment: 1384 USD
Stop the
produce_integer_payments.pyscript, the Quarkus application, and theconsume_integer_payments.pyscript.
This concludes the guided exercise.