Bookmark this page

Guided Exercise: Using Basic Kafka Streams Objects

In this exercise you will use the Kafka Streams DSL for stream processing. You will apply different transformations to a stream of data, processing data records, generating transformations between KStreams and KTables, and writing the results back to Kafka.

The example application for this exercise reads a stream of position messages sent by a set of vehicles, which continuously report their position to the vehicle-positions topic. Each position sent to the vehicle-positions topic is a JSON containing vehicle ID, latitude, longitude, and elevation values.

You will find the solution files for this exercise in the AD482-apps repository, within the streams-data/solutions directory.

Outcomes

You should be able to use KTables, KStreams and serdes to process data streams.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.

[user@host AD482]$ source .venv/bin/activate
(.venv) [user@host AD482]$ lab start streams-data
...output omitted...

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Make sure you copy the Bootstrap Server and Bootstrap Port values from the lab command output. You will use these values to configure Kafka Streams.

The lab command copies the exercise files, from the AD482-apps/streams-data/apps/vehicles directory, which is in your local Git repository, into the streams-data directory, which is in your workspace.

Procedure 3.2. Instructions

  1. Configure your Kafka Streams application.

    1. Change to the streams-data directory.

      (.venv) [user@host AD482]$ cd streams-data
      (.venv) [user@host streams-data]$

      This directory contains the Quarkus application for this exercise.

    2. By using your editor of choice, open the src/main/resources/application.properties file, and configure the Kafka Streams properties as follows:

      # TODO: add configuration values
      quarkus.kafka-streams.bootstrap-servers = YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT
      quarkus.kafka-streams.topics = vehicle-positions,vehicle-feet-elevations

      Note that the rest of Kafka-specific configuration values are in the .env file, which has been copied from your workspace by the lab command.

  2. Implement the code to read a stream of vehicle positions from the vehicle-positions topic. Each record in the vehicle-positions topic consists of a null key and a JSON value.

    1. Inspect the source code of the VehiclePosition class. This class defines the format and structure of the messages that you receive from the vehicle-positions topic.

    2. Edit the VehiclePositionsStream class. Add the code to create a SerDe (Serializer/Deserializer) for VehiclePosition objects.

      public Topology buildTopology() {
          StreamsBuilder builder = new StreamsBuilder();
      
          // TODO: create serde to deserialize VehiclePosition messages
          ObjectMapperSerde<VehiclePosition> vehiclePositionSerde = new ObjectMapperSerde<>(
              VehiclePosition.class
          );
      
          ...output omitted...
      }

      The ObjectMapperSerde class serializes JSON messages using the Jackson library. Notice that the constructor parameter specifies the class to deserialize into, which is VehiclePosition in this case.

    3. Add the code to create a KStream from the vehicle-positions topic.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: Create the stream from the "vehicle-positions" topic
          KStream<String, VehiclePosition> stream = builder.stream(
              "vehicle-positions", Consumed.with(stringSerde, vehiclePositionSerde)
          );
      
          ...output omitted...
      }

      Note the use of a string SerDe to deserialize null keys and the ObjectMapperSerde instance to deserialize VehiclePosition values.

    4. Add the code to print each incoming vehicle position.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: print stream values
          stream.foreach((key, value) ->
              System.out.println("Received vehicle position: " + value)
          );
      
          ...output omitted...
      }
    5. Switch back to the terminal and run the application.

      (.venv) [user@host streams-data]$ ./mvnw clean package quarkus:dev
      ...output omitted...
      2021-07-16 15:46:59,182 INFO  [org.apa.kaf.cli.con.int.ConsumerCoordinator] ...output omitted... Setting offset for partition vehicle-positions-0 to the comitted offset ...output omitted...

      Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.

      Note

      The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the ./mvnw clean package quarkus:dev command again.

    6. Open a new terminal window, make sure you are in your workspace directory, activate the Python virtual environment, and run the python streams-data/scripts/produce_vehicle_positions.py

      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$ python streams-data/scripts/produce_vehicle_positions.py
      ...output omitted...
      - Generated vehicle position to 'vehicle-positions'
      {'elevation': 0.03501904353249241,
       'latitude': 41.2265693535002,
       'longitude': 4.398626580777269,
       'vehicleId': 5}
      
      - Generated vehicle position to 'vehicle-positions'
      {'elevation': 0.2595584138525602,
       'latitude': 39.816763660245876,
       'longitude': 3.9794423934152765,
       'vehicleId': 9}

      The script produces random records to the vehicle-positions topic.

      Leave this terminal window open.

  3. Use the map transformation to create a new KStream instance. Map the stream of vehicle positions, in which each record includes elevation in meters, to a new stream of elevations in feet. Send the new stream to the vehicle-positions-feet topic.

    1. Edit the VehiclePositionsStream class. Add the code to map the original stream to a stream of elevations in feet

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: map positions to elevations in feet
          // and send the stream to "vehicle-feet-elevations" topic
          stream
              .map((key, value) -> {
                  Double feet = value.elevation * 3.28084;
                  return KeyValue.pair(value.vehicleId, feet);
              })
      
          ...output omitted...
      }

      The map function passes each record of the original stream to the provided lambda expression, and writes the new record to a new KStream instance. In this case, the records of the new stream use the vehicle ID as the key, and the elevation in feet as the value.

    2. Add the code to write the new stream to the vehicle-feet-elevations topic.

      Add the code to map the original stream to a stream of elevations in feet.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: map positions to elevations in feet
          // and send the stream to "vehicle-feet-elevations" topic
          stream
              .map((key, value) -> {
                  Double feet = value.elevation * 3.28084;
                  return KeyValue.pair(value.vehicleId, feet);
              })
              .to(
                    "vehicle-feet-elevations",
                    Produced.with(Serdes.Integer(), Serdes.Double())
               );
      
          ...output omitted...
      }

      Note how the application produces integer keys and double values for each record sent to the vehicle-feet-elevations topic.

      After your changes, the Quarkus application should hot-reload automatically.

      Note

      Alternatively, you can stop the application and run the ./mvnw clean package quarkus:dev command again.

  4. Consume the messages from the vehicle-feet-elevations topic and verify that the messages include the vehicle ID and the elevation in feet.

    In a new terminal window, run the oc exec command to execute the built-in kafka-console-consumer.sh from inside the kafka container in the cluster.

    [user@host ~]$ oc exec \
     -it my-cluster-kafka-0 \
     -c kafka -n RHT_OCP4_DEV_USER-kafka-cluster \
     -- bin/kafka-console-consumer.sh \
     --bootstrap-server my-cluster-kafka-brokers:9092 \
     --topic vehicle-feet-elevations \
     --key-deserializer org.apache.kafka.common.serialization.IntegerDeserializer \
     --value-deserializer org.apache.kafka.common.serialization.DoubleDeserializer \
     --property print.key=true --property key.separator=" - "
    ...output omitted...
    7 - 2.411428440449955
    9 - 2.8870592230148593
    0 - 9.113734181610747

    Replace RHT_OCP4_DEV_USER with your OpenShift username. You can get this value from the Lab Environment tab in the ROL web interface.

    You should see the records in the vehicleId - elevationInFeet format.

    Terminate the script and close this terminal window.

  5. Count the number of reported positions by vehicle.

    Transform the KStream object into a KGroupedStream object, by using the groupBy method. Next, use the KGroupedStream#count method to create a KTable containing the number of positions by vehicle.

    1. Edit the VehiclePositionsStream class. Add the code to group positions by vehicle ID.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: group positions by vehicleId
          KGroupedStream<Integer, VehiclePosition> positionsByVehicle = stream
              .groupBy(
                  (key, value) -> value.vehicleId,
                  Grouped.with(Serdes.Integer(), vehiclePositionSerde)
              );
      
          ...output omitted...
      }

      Notice the first parameter of the groupBy call. This lambda expression returns the key to group by. In this case, the key is vehicleId.

      The second parameter specifies the serdes required after the data is grouped. The Integer serde is required as the key serde, because the vehicle ID is an integer.

    2. Add the code to count how many positions each vehicle has reported.

      public Topology buildTopology() {
          ...output omitted...
      
          KGroupedStream<Integer, VehiclePosition> positionsByVehicle = stream
              .groupBy(
                  (key, value) -> value.vehicleId,
                  Grouped.with(Serdes.Integer(),vehiclePositionSerde)
              );
      
          // TODO: count positions by vehicle
          KTable<Integer, Long> countsByVehicle = positionsByVehicle.count();
      
          ...output omitted...
      }

      Note how the count operation generates a new KTable.

    3. Add the code to print the count values.

      To print the values of the countsByVehicle table, first convert it to a KStream, and then use the foreach method of the stream to print each record.

      public Topology buildTopology() {
          ...output omitted...
      
          // TODO: print the count values
          countsByVehicle
              .toStream()
              .foreach((vehicleId, count) ->
                  System.out.println(
                      "Vehicle: " + vehicleId + " Positions count: " + count + "\n"
                  )
              );
      
          ...output omitted...
      }
    4. After the application hot-reloads, verify that the application displays the counts.

      ...output omitted...
      Vehicle: 7 Positions count: 17
      
      Vehicle: 1 Positions count: 10
      ...output omitted...

      Note

      You might have to wait up to 30 seconds to see the first count message.

      This is because of Kafka Streams record caches and memory management policies. Kafka Streams uses the record cache to compact multiple records together before writing them to the internal store of a KTable.

      Refer to Kafka Streams Memory Management for more information.

    5. Stop the Quarkus application and the produce_vehicle_positions.py script.

Finish

Return to your workspace directory and use the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish streams-data

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c