Bookmark this page

Guided Exercise: Implementing Stateful Event Processing

The example application for this exercise is a vehicle tracking system with the following features:

  • When a vehicle is registered or updated in the system, the application produces a VehicleRegistered event including basic vehicle details in the vehicles topic. In a real-world application, these events would be managed by another microservice. For simplicity, this guided exercise simulates this behavior with a Python producer script.

  • Each vehicle sends VehicleMoved events to the vehicle-positions topic every time it moves.

You must complete the provided Quarkus application to find the movement statistics for every vehicle in the system.

You will find the solution files for this exercise in the AD482-apps repository, within the collaboration-stateful directory.

Outcomes

You should be able to perform stateful transformations on event streams.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start collaboration-stateful
...output omitted...

The lab command copies the exercise files, from the AD482-apps/collaboration-stateful/apps directory, which is in your local Git repository, into the collaboration-stateful directory, which is in your workspace. It also copies the .env-streams file from your workspace to the vehicles application directory as the .env file.

Procedure 4.2. Instructions

  1. Navigate to the collaboration-stateful directory

    (.venv) [user@host AD482]$ cd collaboration-stateful
    (.venv) [user@host collaboration-stateful]$
  2. Examine the application code and the related resources.

    1. The scripts directory contains a script that sends VehicleRegistered events. In a real-world application, these events would be emitted every time a new vehicle is added to the system. The VehicleRegistered event contains information about the vehicle that is being registered (identifier, model, and type of vehicle).

    2. The vehicles directory contains the Java application source code. The com.redhat.vehicles.movement.generator package contains code to simulate the movement of vehicles registered in the system. The application sends VehicleMoved events to the vehicle-movements topic every three seconds.

    3. The com.redhat.vehicles.movement.tracker package contains the required classes to achieve the objectives of this exercise. The VehicleMovementTracker class defines the Kafka Streams topology that you must implement. In the topology, you must join every VehicleMoved event with its corresponding VehicleRegistered event by using the vehicle ID. This is necessary because the VehicleMoved event does not contain the basic information of the vehicle (model, type…​).

    4. The results of the operations performed on the streams are stored in a state store. You use the VehicleMetricsResource class to read the state store and expose the results through an HTTP endpoint.

  3. Produce VehicleRegistered events to the vehicles topic.

    1. Run the Python script located at scripts/produce_vehicles.py to produce the events.

      (.venv) [user@host collaboration-stateful]$ python scripts/produce_vehicles.py
      - Sent VehicleRegistered event to 'vehicles' topic
      {'id': 1, 'model': 'Acme Urban Bike', 'type': 'bike'}
      
      - Sent VehicleRegistered event to 'vehicles' topic
      {'id': 2, 'model': 'Acme HitTheRoad Hybrid', 'type': 'car'}
      
      - Sent VehicleRegistered event to 'vehicles' topic
      {'id': 3, 'model': 'Acme SuperFly', 'type': 'drone'}
  4. Modify the code of the com.redhat.vehicles.movement.tracker package to read VehicleRegistered and VehicleMoved events.

    1. Edit the vehicles/src/main/java/com/redhat/vehicles/movement/tracker/VehicleMovementTracker.java file. Add the code to load the vehicles topic as a change log into a GlobalKTable.

      Use a GlobalKTable object to read the latest VehicleRegistered events from any partition. Creating the GlobalKTable table before you start the stream processing ensures that every VehicleMoved event matches a vehicle from a VehicleRegistered event.

      ...content omitted...
      public Topology buildTopology() {
          ...content omitted...
      
          // TODO: Create GlobalKTable from "vehicles"
          GlobalKTable<Integer, Vehicle> vehiclesTable = builder.globalTable(
              "vehicles",
              Materialized
                  .<Integer, Vehicle, KeyValueStore<Bytes, byte[]>>as("vehicles-store")
                  .withKeySerde(intSerde)
                  .withValueSerde(vehicleSerde)
          );
      
          ...content omitted...

      Notice how the table is materialized in the vehicles-store store.

    2. Add the code to load the vehicle-movements topic as a KStream. By using a KStream object to read VehicleMoved events, you consider all movements for a vehicle.

      // TODO: Create KStream from "vehicle-movements"
      KStream<Integer, VehicleMoved> movementsStream = builder.stream(
          "vehicle-movements",
          Consumed.with(intSerde, vehicleMovedSerde)
      );
  5. Join the movementsStream stream with vehiclesTable to produce an enriched KStream of vehicle movements, in which every movement includes information about the vehicle.

    1. Add the code to join the stream with the table.

      // TODO: join
      KStream<Integer, VehicleStatus> vehicleStatusStream = movementsStream.join(
          vehiclesTable, 1
          (vehicleId, vehicleMoved) -> vehicleId, 2
          (vehicleMoved, vehicle) -> new VehicleStatus( 3
                  vehicle,
                  vehicleMoved.latitude,
                  vehicleMoved.longitude,
                  vehicleMoved.elevation
          ));

      1

      The table to be joined with the stream.

      2

      A function to select the join key.

      3

      A function to perform the join.

      Each record of the resulting vehicleStatusStream contains the vehicle movement and the vehicle details.

    2. Print the vehicleStatusStream records.

      // TODO: print the enriched stream
      vehicleStatusStream.foreach((vehicleId, vehiclePosition) -> {
          System.out.println(vehiclePosition);
      });
    3. Change to the vehicles directory and run the Quarkus app. You should see the enriched streams records, including vehicle details and position.

      (.venv) [user@host collaboration-stateful]$ cd vehicles
      (.venv) [user@host vehicles]$ ./mvnw clean package quarkus:dev
      ...output omitted...
       [VEHICLE STATUS]
        VEHICLE - Id: 2, Type: 'car', Model: 'Acme HitTheRoad Hybrid'
        Latitude: 38.35
        Longitude:  -1.09
        Elevation:  0.00 meters
  6. Group by vehicle ID and aggregate the enriched KStream to calculate the metrics for each vehicle. Materialize the table in a store called vehicle-metrics-store, to query the table later.

    1. Add the code to group by key, which is the vehicle identifier. Then, aggregate each vehicle status and produce a VehicleMetrics aggregation for every vehicle.

      // TODO: group by, aggregate, and materialize
      vehicleStatusStream.groupByKey().aggregate(
          VehicleMetrics::new, 1
          (vehicleId, vehicleStatus, vehicleMetrics) ->
                  vehicleMetrics.update(vehicleStatus), 2
          Materialized
              .<Integer, VehicleMetrics, KeyValueStore<Bytes, byte[]>>as( 3
                  "vehicle-metrics-store"
              )
              .withKeySerde(intSerde)
              .withValueSerde(vehicleMetricsSerde)
          );

      1

      Initial value for each vehicle's metrics

      2

      Aggregation function

      3

      The resulting table is materialized to a store called vehicle-metrics-store

  7. By using Interactive Queries, query the calculated metric from the previous step, which is stored in the vehicle-metrics-store store.

    1. Edit the src/main/java/com/redhat/vehicles/movement/tracker/VehicleMetricsResource.java file and implement the list method as follows:

      public List<VehicleMetrics> list() {
          List<VehicleMetrics> vehicleMetrics = new ArrayList<>();
      
          // TODO: query the store
          ReadOnlyKeyValueStore<Integer, VehicleMetrics> store = streams
              .store(StoreQueryParameters.fromNameAndType(
                  "vehicle-metrics-store",
                  QueryableStoreTypes.keyValueStore()
              ));
      
          store.all().forEachRemaining(row -> { vehicleMetrics.add(row.value); });
      
          return vehicleMetrics;
      }
    2. Make an HTTP GET request to http://localhost:8080/vehicle/metrics. You should see the current status of the vehicle metrics. The statistics, such as movements reported, delta latitude, or delta longitude, changes over time for every vehicle because the generation of more VehicleMoved events triggers the stateful transformations.

      [
         {
            "vehicle":{
               "id":1,
               "type":"bike",
               "model":"Acme Urban Bike"
            },
            "movementsReported":1532,
            "initialLatitude":41.02850341796875,
            "initialLongitude":4.254831790924072,
            "finalLatitude":37.82231140136719,
            "finalLongitude":2.019352436065674,
            "latitudeDelta":-3.2061920166015625,
            "longitudeDelta":-2.2354793548583984
         },
      ...output omitted...
    3. Stop the Quarkus application.

Finish

Return to your workspace directory and use the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host collaboration-stateful]$ lab finish collaboration-stateful

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c