Bookmark this page

Guided Exercise: Processing Data Streams

In this exercise, you will integrate a basic stream processing pipeline in a Quarkus application, by using the Kafka Streams library.

You will implement a simple application to process a stream of payments coming from the payments topic, process these payments with the Kafka Streams library, and write the result to a large-payments topic.

You will find the solution files in the AD482-apps repository, within the streams-interaction/solutions directory.

Outcomes

You should be able to add the Kafka Streams dependency to an application, implement a basic stream processing pipeline, and write the result to another topic.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start streams-interaction
...output omitted...

Copy the Bootstrap Server and Bootstrap Port values from the lab start script output. You will use these values to configure Kafka Streams.

The lab command copies the exercise files, from the AD482-apps/streams-interaction/apps/payments directory, at your local Git repository, into the streams-interaction directory, at the root of your workspace.

Procedure 3.1. Instructions

  1. Add the Kafka Streams dependency to your Quarkus app.

    1. Change to the streams-interaction directory.

      [user@host AD482]$ cd streams-interaction
      [user@host streams-interaction]$

      This directory contains the Quarkus application for this exercise.

    2. Install the Kafka Streams Quarkus extension.

      [user@host streams-interaction]$ ./mvnw quarkus:add-extension \
       -Dextensions="kafka-streams"
      ...output omitted...
      [INFO] [SUCCESS] ✅  Extension io.quarkus:quarkus-kafka-streams has been installed
      ...output omitted...

      Note

      In non-Quarkus projects, add the kafka-streams artifact ID, from the org.apache.kafka group, to your pom.xml file.

      For further information, refer to the Libraries and Maven artifacts section of the Kafka Streams documentation.

  2. Specify the required Kafka Streams configuration in the application.properties file.

    1. By using your editor of choice, open the application.properties file, and configure the Kafka Streams properties as follows:

      quarkus.kafka-streams.bootstrap-servers = YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT 1
      quarkus.kafka-streams.topics = payments,large-payments 2
      kafka.security.protocol = SSL 3
      kafka.ssl.truststore.location = ABSOLUTE_PATH_TO_WORKSPACE_FOLDER/truststore.jks 4
      kafka.ssl.truststore.password = password 5

      1

      The Kafka bootstrap server. Set your own Kafka bootstrap server, by specifying the host and the port.

      2

      The topics used in the application. Specific to Quarkus, which waits for the topics to exist before executing the Kafka Streams processing pipeline. For this exercise, you will use payments and large-payments

      3

      The security protocol to connect to Kafka. Leave it as SSL.

      4

      The location of the TrustStore file required to connect to Kafka. Replace the ABSOLUTE_PATH_TO_WORKSPACE_FOLDER with the path to your workspace directory.

      5

      The password of the TrustStore file required to connect to Kafka. Leave this value as password.

  3. Implement a basic stream processor and verify that your application reads the payments stream.

    1. Open the PaymentsStream class.

      Inspect the source code. The PaymentsStream class includes the empty buildTopology method. You must implement the body of this method.

    2. In the buildTopology method, create a StreamsBuilder object.

      public Topology buildTopology() {
          // TODO: Create the stream from the "payments" topic
          StreamsBuilder builder = new StreamsBuilder();
      
          // TODO: use foreach to print each message
      
          // TODO: process the stream and send the result to the "large-payments" topic
      
          // TODO: return the topology
      }
    3. Use the builder to initialize an instance of KStream. Specify payments as the source topic of the stream by passing payments as the first parameter of the builder.stream method.

      public Topology buildTopology() {
          // TODO: Create the stream from the "payments" topic
          StreamsBuilder builder = new StreamsBuilder();
          KStream<String, Integer> stream = builder.stream(
              "payments", Consumed.with(keySerde, valueSerde)
          );
      
          // TODO: use foreach to print each message
      
          // TODO: process the stream and send the result to the "large-payments" topic
      
          // TODO: return the topology
      }

      Note that the Consumed.with method specifies how the application deserializes messages, by using SerDe (Serializer/Deserializer) objects.

      Message keys, which are null in this case, can use the keySerde object, a String SerDe. Message values are integers, use valueSerde, which is an Integer SerDe.

    4. Add the code to call the stream.foreach method and print each received payment.

      public Topology buildTopology() {
          StreamsBuilder builder = new StreamsBuilder();
          KStream<String, Integer> stream = builder.stream(
              "payments", Consumed.with(keySerde, valueSerde)
          );
      
          // TODO: use foreach to print each message
          stream.foreach(
              (key, value) -> System.out.println("Received payment: " + value)
          );
      
          // TODO: process the stream and send the result to the "large-payments" topic
      
          // TODO: return the topology
      }

      The foreach method of the stream object runs the provided function once for each payment.

    5. Make the builder object build and return a topology, which contains the stream processing pipeline that you just defined.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: return the topology
          return builder.build();
      }
    6. Run the produce_integer_payments.py script to start producing payments. Pass the topic name, payments, as a parameter to the script.

      (.venv) [user@host streams-interaction]$ python \
       scripts/produce_integer_payments.py payments
      ...output omitted...
      
      	Payment sent: 1217 USD
      	Payment sent: 136 USD
      	Payment sent: 1446 USD

      Leave the producer script running.

    7. Open a new terminal window, navigate to the streams-interaction directory of your workspace, run the Quarkus application by using the ./mvnw clean package quarkus:dev command, and observe how the application receives the payment messages.

      [user@host streams-interaction]$ ./mvnw clean package quarkus:dev
      ...output omitted...
      Received payment: 1980
      Received payment: 1995
      Received payment: 1934

      Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.

      Note

      The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the ./mvnw clean package quarkus:dev command again.

  4. Filter payments greater than 1000 and send them to the large-payments topic.

    1. In the buildTopology method of the PaymentsStream class, add the code to filter large payments.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: process the stream and send the result to the "large-payments" topic
          stream
              .filter((key, amount) -> amount > 1000)
      
          return builder.build();
      }

      The filter method returns a new KStream object, which only includes payments greater than 1000.

    2. Add the code to send the filtered stream to the large-payments topic.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: process the stream and send the result to the "large-payments" topic
          stream
              .filter((key, amount) -> amount > 1000)
              .to("large-payments", Produced.with(keySerde, valueSerde));
      
          return builder.build();
      }

      The to method sends the filtered stream to the large-payments topic, and uses the same SerDes as before to serialize the produced messages.

    3. Wait until the Quarkus dev mode rebuilds the application after you save the file.

      Note

      Alternatively, you can stop the application and run the ./mvnw clean package quarkus:dev command again.

    4. Switch to the terminal where the Quarkus application is running and verify that the application is still receiving payments.

      ...output omitted...
      Received payment: 1974
      Received payment: 1281
      Received payment: 1269
  5. Run the provided consume_integer_payments.py script. This script reads events from the large-payments topic.

    1. Open a new terminal window, navigate to your workspace directory, and activate the Python virtual environment.

      [user@host ~]$ cd AD482
      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$

      Important

      On Windows, use the Activate.ps1 script to activate your Python virtual environment.

      PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1
    2. Run the consume_integer_payments.py script to consume large payments. Pass the topic name, large-payments, as a parameter to the script.

      Verify that the consumer receives large payments only.

      (.venv) [user@host AD482]$ python \
       streams-interaction/scripts/consume_integer_payments.py large-payments
      Consuming messages...
      
      Received large payment: 1220 USD
      Received large payment: 1461 USD
      Received large payment: 1384 USD
  6. Stop the produce_integer_payments.py script, the Quarkus application, and the consume_integer_payments.py script.

Finish

Return to your workspace directory and use the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish streams-interaction

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c