Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
The example application for this exercise is a vehicle tracking system with the following features:
When a vehicle is registered or updated in the system, the application produces a
VehicleRegisteredevent including basic vehicle details in thevehiclestopic. In a real-world application, these events would be managed by another microservice. For simplicity, this guided exercise simulates this behavior with a Python producer script.Each vehicle sends
VehicleMovedevents to thevehicle-positionstopic every time it moves.
You must complete the provided Quarkus application to find the movement statistics for every vehicle in the system.
You will find the solution files for this exercise in the AD482-apps repository, within the collaboration-stateful directory.
Outcomes
You should be able to perform stateful transformations on event streams.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start collaboration-stateful
...output omitted...
The lab command copies the exercise files, from the AD482-apps/collaboration-stateful/apps directory, which is in your local Git repository, into the collaboration-stateful directory, which is in your workspace.
It also copies the .env-streams file from your workspace to the vehicles application directory as the .env file.
Procedure 4.2. Instructions
Navigate to the
collaboration-statefuldirectory(.venv) [user@host AD482]$
cd collaboration-stateful(.venv) [user@host collaboration-stateful]$Examine the application code and the related resources.
The
scriptsdirectory contains a script that sendsVehicleRegisteredevents. In a real-world application, these events would be emitted every time a new vehicle is added to the system. TheVehicleRegisteredevent contains information about the vehicle that is being registered (identifier, model, and type of vehicle).The
vehiclesdirectory contains the Java application source code. Thecom.redhat.vehicles.movement.generatorpackage contains code to simulate the movement of vehicles registered in the system. The application sendsVehicleMovedevents to thevehicle-movementstopic every three seconds.The
com.redhat.vehicles.movement.trackerpackage contains the required classes to achieve the objectives of this exercise. TheVehicleMovementTrackerclass defines the Kafka Streams topology that you must implement. In the topology, you must join everyVehicleMovedevent with its correspondingVehicleRegisteredevent by using the vehicle ID. This is necessary because theVehicleMovedevent does not contain the basic information of the vehicle (model, type…).The results of the operations performed on the streams are stored in a state store. You use the
VehicleMetricsResourceclass to read the state store and expose the results through an HTTP endpoint.
Produce
VehicleRegisteredevents to thevehiclestopic.Run the Python script located at
scripts/produce_vehicles.pyto produce the events.(.venv) [user@host collaboration-stateful]$
python scripts/produce_vehicles.py- Sent VehicleRegistered event to 'vehicles' topic {'id': 1, 'model': 'Acme Urban Bike', 'type': 'bike'} - Sent VehicleRegistered event to 'vehicles' topic {'id': 2, 'model': 'Acme HitTheRoad Hybrid', 'type': 'car'} - Sent VehicleRegistered event to 'vehicles' topic {'id': 3, 'model': 'Acme SuperFly', 'type': 'drone'}
Modify the code of the
com.redhat.vehicles.movement.trackerpackage to readVehicleRegisteredandVehicleMovedevents.Edit the
vehicles/src/main/java/com/redhat/vehicles/movement/tracker/VehicleMovementTracker.javafile. Add the code to load thevehiclestopic as a change log into aGlobalKTable.Use a
GlobalKTableobject to read the latestVehicleRegisteredevents from any partition. Creating theGlobalKTabletable before you start the stream processing ensures that everyVehicleMovedevent matches a vehicle from aVehicleRegisteredevent....content omitted... public Topology buildTopology() { ...content omitted... // TODO: Create GlobalKTable from "vehicles"
GlobalKTable<Integer, Vehicle> vehiclesTable = builder.globalTable( "vehicles", Materialized .<Integer, Vehicle, KeyValueStore<Bytes, byte[]>>as("vehicles-store") .withKeySerde(intSerde) .withValueSerde(vehicleSerde) );...content omitted...Notice how the table is materialized in the
vehicles-storestore.Add the code to load the
vehicle-movementstopic as aKStream. By using aKStreamobject to readVehicleMovedevents, you consider all movements for a vehicle.// TODO: Create KStream from "vehicle-movements"
KStream<Integer, VehicleMoved> movementsStream = builder.stream( "vehicle-movements", Consumed.with(intSerde, vehicleMovedSerde) );
Join the
movementsStreamstream withvehiclesTableto produce an enriched KStream of vehicle movements, in which every movement includes information about the vehicle.Add the code to join the stream with the table.
// TODO: join
KStream<Integer, VehicleStatus> vehicleStatusStream = movementsStream.join( vehiclesTable,(vehicleId, vehicleMoved) -> vehicleId,(vehicleMoved, vehicle) -> new VehicleStatus(vehicle, vehicleMoved.latitude, vehicleMoved.longitude, vehicleMoved.elevation ));The table to be joined with the stream.
A function to select the join key.
A function to perform the join.
Each record of the resulting
vehicleStatusStreamcontains the vehicle movement and the vehicle details.Print the
vehicleStatusStreamrecords.// TODO: print the enriched stream
vehicleStatusStream.foreach((vehicleId, vehiclePosition) -> { System.out.println(vehiclePosition); });Change to the
vehiclesdirectory and run the Quarkus app. You should see the enriched streams records, including vehicle details and position.(.venv) [user@host collaboration-stateful]$
cd vehicles(.venv) [user@host vehicles]$./mvnw clean package quarkus:dev...output omitted... [VEHICLE STATUS] VEHICLE - Id: 2, Type: 'car', Model: 'Acme HitTheRoad Hybrid' Latitude: 38.35 Longitude: -1.09 Elevation: 0.00 meters
Group by vehicle ID and aggregate the enriched
KStreamto calculate the metrics for each vehicle. Materialize the table in a store calledvehicle-metrics-store, to query the table later.Add the code to group by key, which is the vehicle identifier. Then, aggregate each vehicle status and produce a
VehicleMetricsaggregation for every vehicle.// TODO: group by, aggregate, and materialize
vehicleStatusStream.groupByKey().aggregate( VehicleMetrics::new,(vehicleId, vehicleStatus, vehicleMetrics) -> vehicleMetrics.update(vehicleStatus),Materialized .<Integer, VehicleMetrics, KeyValueStore<Bytes, byte[]>>as("vehicle-metrics-store" ) .withKeySerde(intSerde) .withValueSerde(vehicleMetricsSerde) );
By using Interactive Queries, query the calculated metric from the previous step, which is stored in the
vehicle-metrics-storestore.Edit the
src/main/java/com/redhat/vehicles/movement/tracker/VehicleMetricsResource.javafile and implement thelistmethod as follows:public List<VehicleMetrics> list() { List<VehicleMetrics> vehicleMetrics = new ArrayList<>(); // TODO: query the storeReadOnlyKeyValueStore<Integer, VehicleMetrics> store = streams .store(StoreQueryParameters.fromNameAndType( "vehicle-metrics-store", QueryableStoreTypes.keyValueStore() )); store.all().forEachRemaining(row -> { vehicleMetrics.add(row.value); });return vehicleMetrics; }Make an HTTP GET request to
http://localhost:8080/vehicle/metrics. You should see the current status of the vehicle metrics. The statistics, such as movements reported, delta latitude, or delta longitude, changes over time for every vehicle because the generation of moreVehicleMovedevents triggers the stateful transformations.[ { "vehicle":{ "id":1, "type":"bike", "model":"Acme Urban Bike" }, "movementsReported":1532, "initialLatitude":41.02850341796875, "initialLongitude":4.254831790924072,"finalLatitude":37.82231140136719, "finalLongitude":2.019352436065674, "latitudeDelta":-3.2061920166015625, "longitudeDelta":-2.2354793548583984}, ...output omitted...Stop the Quarkus application.
This concludes the guided exercise.