Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will test an event-driven application. The application to test is a vehicle tracking system with the following features:
The application handles the registration of new vehicles in the system. When a vehicle is registered, the application produces a new
VehicleRegisteredevent including basic vehicle details.The application is ready to process
VehicleMovedevents. Assume these events are produced by external systems.A Kafka Streams topology processes the
VehicleRegisteredandVehicleMovedevent streams. The topology processes both streams and writes the result into Kafka Streams stores.
Outcomes
You should be able to unit test event-driven business logic and Kafka Streams topologies.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment and use the lab command to start the scenario for this exercise.
[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$lab start troubleshooting-test
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1The lab command copies the exercise files, from the AD482-apps/troubleshooting-test/apps/vehicles directory, which is in your local Git repository, into the troubleshooting-test directory, at the root of your of your workspace.
Procedure 6.4. Instructions
Navigate to the
troubleshooting-testdirectory in your workspace and examine the application code.The
inventory.VehicleInventoryclass generates aVehicleRegisteredevent when a user sends a POST request to the/vehicleendpoint. Note that theVehicleInventoryclass does not have references to Kafka.The
inventory.VehicleResourcemediates between the Kafka client code and theVehicleInventorybusiness logic. This class emits the events returned by theVehicleInventoryclass.The
movement.tracker.VehicleMovementsTrackerclass defines a Kafka Streams topology to processVehicleRegisteredandVehicleMovedevents. The topology saves registered vehicles in thevehicles-storestore, and writes the results of processing movements to thevehicle-metrics-storestore.The
testdirectory contains the test files. You will implement the tests in this directory.
Unit test the
VehicleInventory#registermethod. Theregistermethod accepts an instance ofVehicleas a parameter, and registers the vehicle.Acceptance criteria:
The method should return a
VehicleRegisteredevent if the vehicle type is not empty.Otherwise, the method should throw a
InvalidVehicleExceptionexception.
Edit the
TestVehicleInventoryclass. Create a test case to verify that theregistermethod throws an exception if the given vehicle type is an empty string.// TODO: implement exception test case
@Test public void testRegisterThrowsExceptionIfTypeIsEmpty() { // Given Vehicle vehicle = new Vehicle(2, "", "Test Car"); // When Exception exception = assertThrows(InvalidVehicleException.class, () -> { vehicleManager.register(vehicle); }); // Then assertThat(exception.getMessage(), containsString("Invalid vehicle")); }Run the tests by executing the
./mvnw testcommand. One test should pass.(.venv) [user@host troubleshooting-test]$
./mvnw test...output omitted... [INFO] Results: [INFO] [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ...output omitted...In the same file, implement a test case to verify that the method emits a
VehicleRegisteredif the vehicle type is not empty.// TODO: implement VehicleRegistered test case
@Test public void testRegisterReturnsVehicleRegistered() throws InvalidVehicleException { // Given Vehicle vehicle = new Vehicle(2, "car", "Test Car"); // When VehicleRegistered event = vehicleManager.register(vehicle); // Then assertEquals(2, event.id); assertEquals("car", event.type); assertEquals("Test Car", event.model); }Run the tests with the
./mvnw testcommand. Two tests should pass.
Unit test the Kafka Streams topology specified by the
VehicleMovementsTrackerclass.Edit the
MovementTrackerTopologyTestclass. In thesetupmethod, create theTopologyTestDriverwith the topology from theVehicleMovementTracker. The test driver runs the tests on this topology.// TODO: pass the VehicleMovementTracker topology to the test driver
tracker = new VehicleMovementTracker(); testDriver = new TopologyTestDriver(tracker.buildTopology());In the same
setupmethod, create the input test topics, by specifying the topic names and the required serializers.// TODO: Create test input topics
vehicleRegisteredSerde = new ObjectMapperSerde<>(VehicleRegistered.class); vehicleMovedSerde = new ObjectMapperSerde<>(VehicleMoved.class); vehicleRegisteredTopic = testDriver.createInputTopic( "vehicle-registered", new IntegerSerializer(), vehicleRegisteredSerde.serializer() ); vehicleMovedTopic = testDriver.createInputTopic( "vehicle-moved", new IntegerSerializer(), vehicleMovedSerde.serializer() );In the same method, create the test stores for vehicles and metrics.
// TODO: Create test stores
vehiclesStore = testDriver.getKeyValueStore("vehicles-store"); vehicleMetricsStore = testDriver.getKeyValueStore("vehicle-metrics-store");Implement a test to verify that the topology reads the
VehicleRegisteredevent stream as a change log and writes the result into the vehicles store.// TODO: test VehicleRegistered events create vehicles in the store
@Test public void testTopologySendsVehiclesToStore() throws JsonProcessingException { // Given VehicleRegistered event = new VehicleRegistered(12, "bike", "super bike"); // When vehicleRegisteredTopic.pipeInput(event.id, event); // Then Vehicle vehicleInStore = vehiclesStore.get(12); assertEquals(vehicleInStore.id, 12); }Run the tests with the
./mvnw testcommand. Three tests should pass.Test that the result of joining and aggregating
VehicleRegisteredandVehicleMovedevents is sent to the vehicle metrics store. Validate that the metrics gathered from the store contain the number of movements reported.// TODO: test VehicleMetrics counts VehicleMoved events for each vehicle
@Test public void testVehicleMovementsCountEqualsEventsCount() throws JsonProcessingException { // Given Vehicle vehicle = new Vehicle(14, "car", "test"); vehiclesStore.put(vehicle.id, vehicle); VehicleMoved event1 = new VehicleMoved(14, 0, 0, 5); VehicleMoved event2 = new VehicleMoved(14, 0, 0, 10); // When vehicleMovedTopic.pipeInput(event1.vehicleId, event1); vehicleMovedTopic.pipeInput(event2.vehicleId, event2); // Then VehicleMetrics metrics = vehicleMetricsStore.get(14); assertEquals(2, metrics.movementsReported); }Run the tests with the
./mvnw testcommand. Four tests should pass.
Create a new unit test for the
VehicleMovementsTrackerclass. Test that the topology produces an enriched movements stream and sends the result to thevehicle-statustopic. Verify that the test fails and add the necessary code in theVehicleMovementsTrackerto make the test pass.Edit the
MovementTrackerTopologyTestclass. In thesetupmethod, initialize the test output topic, calledvehicle-status.// TODO: Create test output topic
vehicleStatusSerde = new ObjectMapperSerde<>(VehicleStatus.class); vehicleStatusTopic = testDriver.createOutputTopic( "vehicle-status", new IntegerDeserializer(), vehicleStatusSerde.deserializer() );Note that, in contrast to test input topics, which require serializers, test output topics require deserializers.
In the same file, implement a test to validate that, for each
VehicleMovedevent, the topology creates a newVehicleStatusvalue, which includes the movement and the vehicle details. Verify that the topology writes the new value to thevehicle-statustopic.// TODO: test VehicleMoved and VehicleRegistered events are joined and sent to output topic
@Test public void testVehicleMovementsStreamIsEnriched() throws JsonProcessingException { // Given VehicleRegistered vehicleRegistered = new VehicleRegistered( 111, "car", "test car" ); VehicleMoved vehicleMoved = new VehicleMoved(111, 0, 0, 5); // When vehicleRegisteredTopic.pipeInput(vehicleRegistered.id, vehicleRegistered); vehicleMovedTopic.pipeInput(vehicleMoved.vehicleId, vehicleMoved); // Then TestRecord<Integer, VehicleStatus> record = vehicleStatusTopic.readRecord(); assertEquals(111, record.getKey()); assertEquals(111, record.getValue().vehicle.id); assertEquals(5, record.getValue().elevation); }Run the tests with the
./mvnw testcommand. The last test should fail.(.venv) [user@host troubleshooting-test]$
./mvnw test...output omitted... [ERROR] Errors: [ERROR] MovementTrackerTopologyTest.testVehicleMovementsStreamIsEnriched:126 » NoSuchElement [INFO] [ERROR] Tests run: 5, Failures: 0, Errors: 1, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ ...output omitted...Edit the
VehicleMovementTrackerclass. Add the code to make the test pass.// TODO: materialize vehicleStatusStream to the "vehicle-status" topic
vehicleStatusStream.to( "vehicle-status", Produced.with(intSerde, vehicleStatusSerde) );Run the tests with the
./mvnw testcommand. Five tests should pass.
This concludes the guided exercise.