Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this lab, you will troubleshoot a wind turbine data streaming system.
The system includes the following applications:
A wind turbine data producer application that writes wind turbine power measurements to Kafka.
A Kafka Streams energy meter application, which processes the measurements and displays the power data in a front-end web application.
Outcomes
You should be able to:
Configure retries and idempotence.
Handle out of order events when aggregating streaming data.
Implement unit tests in Kafka Streams.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.
[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$lab start troubleshooting-apps
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1The lab command copies the exercise files, from the AD482-apps/troubleshooting-apps/apps directory, which is in your local Git repository, into the troubleshooting-apps directory, at the root of your workspace.
Procedure 6.5. Instructions
Start the applications for this exercise. Run each of them in separate terminal windows.
Energy meter back end: From the
troubleshooting-apps/energy-meterdirectory, run the./mvnw quarkus:devcommand. The Quarkus development server makes the application available athttp://localhost:8080. The logs display that the application starts the Kafka Streams engine, but the application does not receive any messages.Turbine data producer: From the
troubleshooting-apps/turbine-data-producerdirectory, run the./mvnw compile exec:java "-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp"command. The logs display disconnection errors caused by an incorrect configuration.Energy meter front end: From the
troubleshooting-appsdirectory, run thepython scripts/serve-frontend.pycommand. Make sure that the Python virtual environment is activated before running this Python script. A simple Python web server makes the front-end application available athttp://localhost:8081. The web dashboard lists wind turbines but power production values display 0.
Navigate to the
energy-meterdirectory and run the energy meter back-end application.[user@host AD482]$
cd troubleshooting-apps/energy-meter[user@host energy-meter]$./mvnw quarkus:dev...output omitted... 2021-09-23 15:49:29,149 INFO [org.apa.kaf.str.KafkaStreams] (energy-meter-troubleshooting-...omitted...State transition from REBALANCING to RUNNINGLeave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.
Note
The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the
./mvnw quarkus:devcommand again.Open a new terminal window, navigate to the
troubleshooting-apps/turbine-data-producerdirectory and run the turbine data producer.[user@host AD482]$
cd troubleshooting-apps/turbine-data-producer[user@host turbine-data-producer]$./mvnw compile exec:java \"-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp"...output omitted... [kafka-producer-network-thread | producer-2] WARN org.apache...NetworkClient - [Producer clientId=producer-2] Bootstrap broker ...output omitted...disconnectedThe logs display disconnection warnings from the server when trying to write data to Kafka.
Leave this terminal window open.
Open a new terminal window, and make sure you are in your workspace directory. Activate the Python virtual environment, navigate to the
troubleshooting-appsdirectory, and run the front-end application.[user@host AD482]$
source .venv/bin/activate(.venv) [user@host AD482]$cd troubleshooting-apps(.venv) [user@host troubleshooting-apps]$python scripts/serve-frontend.py...output omitted...Open your web browser and navigate to
http://localhost:8081. Verify that the dashboard displays power production values of 0 MWatts.Leave this terminal window and the browser tab open.
Fix the turbines power data production problem.
The turbine data producer sends power measurements to Kafka topics. This is a simulation of wind turbines producing data. Due to restrictive network timeout configuration, the produced measurements do not reach the Kafka broker.
Edit the
TurbineDataProducerAppclass from theturbine-data-producerapplication to adjust the request timeout configuration and enable idempotence. Next, verify that the energy meter application receives 10 measurements per each 10-second time window.Edit the
TurbineDataProducerAppclass.In the
setProducerConfigmethod, activate idempotence and increase the value of theREQUEST_TIMEOUT_MS_CONFIGproperty.// TODO: fix delivery issues props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,
30000);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);Restart the turbine data producer. The application should not display errors.
[user@host turbine-data-producer]$
./mvnw compile exec:java "-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp"...output omitted... Produced { turbineId: 1, power: 1875333 } Produced { turbineId: 2, power: 1767793 }Navigate back to the energy meter front-end browser tab. Verify that the
Reported values (last 10 seconds)field displays a value of 10 received measurements in the last 10 seconds.Important
You can experiment with different timeout configurations.
Note that the timeout is subject to the state of the cluster and the network delay. If the timeout is too low, then few retries succeed and the
Reported values (last 10 seconds)field displays values below 10. If the retries cause duplication, then theReported values (last 10 seconds)field displays values higher than 10.30 seconds, however, should be sufficient.
Handle out of order events.
The company needs to process each power measurement by using the time when a turbine collects a measurement. Due to problems in the internal I/O of the turbines, some messages suffer delays of approximately 11 seconds, until the producer finally writes the record to Kafka.
You must perform the following steps:
Edit the
TurbineDataProducerAppclass to use the wind turbine timestamps. To activate this feature, use theemitter.withTurbineTimestamps()method.Verify that the count shown in the front end is lower than 10.
Adjust the Kafka Streams topology of the energy meter back end to handle late events.
Edit the
TurbineDataProducerAppclass of the turbine data producer. Activate the wind turbine timestamps feature.// TODO: Activate the turbine timestamps feature
emitter.withTurbineTimestamps();Restart the turbine data producer. The output should display the timestamp when the turbine produces each record.
[user@host turbine-data-producer]$
./mvnw compile exec:java "-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp"...output omitted... Produced { turbineId: 2, power: 1813523 } - Timestamp: 1632474226000 Produced { turbineId: 3, power: 1814831 } - Timestamp: 1632474216000 - DELAYEDNavigate back to the energy meter front-end browser tab. Verify that the
Reported values (last 10 seconds)field displays a count less than 10. You can also check the logs of theenergy-meterback end.MAP - Turbine: 3 | 1612075 Watts -> 1.612075 MWatts COUNT - Turbine: 1 | Count:7 COUNT - Turbine: 2 | Count:7 COUNT - Turbine: 3 | Count:7
Edit the
StreamTopologyBuilderclass of the energy meter back end. Set a grace period of 12 seconds.wattsValuesStream .groupByKey() .windowedBy( TimeWindows .of(Duration.ofSeconds(10)) .advanceBy(Duration.ofSeconds(10)) // TODO: adjust grace period .grace(Duration.ofSeconds(12)) ) ...output omitted...Return to your browser tab and refresh the page. Verify the back-end application logs. Wait until the Kafka Streams engine switches to the RUNNING state.
Note
If you experience problems while restarting the application, then stop the Quarkus application and run the
./mvnw quarkus:devcommand again.2021-09-14 14:39:51,336 INFO [org.apa.kaf.str.KafkaStreams] (energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2-StreamThread-1) stream-client [energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2]
State transition from REBALANCING to RUNNINGIn the browser tab, verify that the
Reported values (last 10 seconds)field is 10.
Implement a unit test to verify that the conversion from Watts to MWatts is correct.
The energy meter back end reads power measurements in Watts from the
turbine-generated-wattstopic. The topology converts the values from Watts to MWatts. In the conversion, the topology generates a stream ofMWattsMeasurementobjects and materializes the new stream to theturbine-generated-mwattstopic.You must write a unit test that, given a Watts measurement in the
turbine-generated-wattstopic, verifies that the topology writes the corresponding MWatts value in theturbine-generated-mwattstopic.Note
1 MWatt = 1,000,000 Watts.
Edit the
StreamTopologyBuilderTestclass. In thesetupmethod, create a test driver to test theStreamTopologyBuildertopology.// TODO: create the test driver
StreamTopologyBuilder builder = new StreamTopologyBuilder(); testDriver = new TopologyTestDriver(builder.buildTopology());In the same method, create the serdes and test topics required for the test.
// TODO: Create test topics
wattsStream = testDriver.createInputTopic( "turbine-generated-watts", new IntegerSerializer(), new IntegerSerializer() ); ObjectMapperSerde<MWattsMeasurement> measurementSerde = new ObjectMapperSerde<>(MWattsMeasurement.class); measurementsStream = testDriver.createOutputTopic( "turbine-generated-mwatts", new IntegerDeserializer(), measurementSerde.deserializer() );In the same file, add a test case to verify that for a value of 2500000 Watts, the topology writes a value of 2.5 MWatts.
// TODO: implement test
@Test public void testMWattsConversion() { wattsStream.pipeInput(1, 2500000); TestRecord<Integer, MWattsMeasurement> record = measurementsStream.readRecord(); assertEquals(2.5, record.getValue().megawatts); }Open a new terminal window, navigate to the
troubleshooting-apps/energy-meterdirectory and verify that the test passes.[user@host energy-meter]$
./mvnw test...output omitted... [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ...output omitted...
This concludes the lab.