Bookmark this page

Lab: Troubleshooting AMQ Streams Applications

In this lab, you will troubleshoot a wind turbine data streaming system.

The system includes the following applications:

  • A wind turbine data producer application that writes wind turbine power measurements to Kafka.

  • A Kafka Streams energy meter application, which processes the measurements and displays the power data in a front-end web application.

Outcomes

You should be able to:

  • Configure retries and idempotence.

  • Handle out of order events when aggregating streaming data.

  • Implement unit tests in Kafka Streams.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.

[user@host AD482]$ source .venv/bin/activate
(.venv) [user@host AD482]$ lab start troubleshooting-apps

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

The lab command copies the exercise files, from the AD482-apps/troubleshooting-apps/apps directory, which is in your local Git repository, into the troubleshooting-apps directory, at the root of your workspace.

Procedure 6.5. Instructions

  1. Start the applications for this exercise. Run each of them in separate terminal windows.

    • Energy meter back end: From the troubleshooting-apps/energy-meter directory, run the ./mvnw quarkus:dev command. The Quarkus development server makes the application available at http://localhost:8080. The logs display that the application starts the Kafka Streams engine, but the application does not receive any messages.

    • Turbine data producer: From the troubleshooting-apps/turbine-data-producer directory, run the ./mvnw compile exec:java "-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp" command. The logs display disconnection errors caused by an incorrect configuration.

    • Energy meter front end: From the troubleshooting-apps directory, run the python scripts/serve-frontend.py command. Make sure that the Python virtual environment is activated before running this Python script. A simple Python web server makes the front-end application available at http://localhost:8081. The web dashboard lists wind turbines but power production values display 0.

    1. Navigate to the energy-meter directory and run the energy meter back-end application.

      [user@host AD482]$ cd troubleshooting-apps/energy-meter
      [user@host energy-meter]$ ./mvnw quarkus:dev
      ...output omitted...
      2021-09-23 15:49:29,149 INFO  [org.apa.kaf.str.KafkaStreams] (energy-meter-troubleshooting-...omitted... State transition from REBALANCING to RUNNING

      Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.

      Note

      The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the ./mvnw quarkus:dev command again.

    2. Open a new terminal window, navigate to the troubleshooting-apps/turbine-data-producer directory and run the turbine data producer.

      [user@host AD482]$ cd troubleshooting-apps/turbine-data-producer
      [user@host turbine-data-producer]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp"
      ...output omitted...
      [kafka-producer-network-thread | producer-2] WARN org.apache...NetworkClient - [Producer clientId=producer-2] Bootstrap broker ...output omitted... disconnected

      The logs display disconnection warnings from the server when trying to write data to Kafka.

      Leave this terminal window open.

    3. Open a new terminal window, and make sure you are in your workspace directory. Activate the Python virtual environment, navigate to the troubleshooting-apps directory, and run the front-end application.

      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$ cd troubleshooting-apps
      (.venv) [user@host troubleshooting-apps]$ python scripts/serve-frontend.py
      ...output omitted...

      Open your web browser and navigate to http://localhost:8081. Verify that the dashboard displays power production values of 0 MWatts.

      Leave this terminal window and the browser tab open.

  2. Fix the turbines power data production problem.

    The turbine data producer sends power measurements to Kafka topics. This is a simulation of wind turbines producing data. Due to restrictive network timeout configuration, the produced measurements do not reach the Kafka broker.

    Edit the TurbineDataProducerApp class from the turbine-data-producer application to adjust the request timeout configuration and enable idempotence. Next, verify that the energy meter application receives 10 measurements per each 10-second time window.

    1. Edit the TurbineDataProducerApp class.

      In the setProducerConfig method, activate idempotence and increase the value of the REQUEST_TIMEOUT_MS_CONFIG property.

      // TODO: fix delivery issues
      props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
      props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    2. Restart the turbine data producer. The application should not display errors.

      [user@host turbine-data-producer]$ ./mvnw compile exec:java "-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp"
      ...output omitted...
      Produced { turbineId: 1, power: 1875333 }
      Produced { turbineId: 2, power: 1767793 }

      Navigate back to the energy meter front-end browser tab. Verify that the Reported values (last 10 seconds) field displays a value of 10 received measurements in the last 10 seconds.

      Important

      You can experiment with different timeout configurations.

      Note that the timeout is subject to the state of the cluster and the network delay. If the timeout is too low, then few retries succeed and the Reported values (last 10 seconds) field displays values below 10. If the retries cause duplication, then the Reported values (last 10 seconds) field displays values higher than 10.

      30 seconds, however, should be sufficient.

  3. Handle out of order events.

    The company needs to process each power measurement by using the time when a turbine collects a measurement. Due to problems in the internal I/O of the turbines, some messages suffer delays of approximately 11 seconds, until the producer finally writes the record to Kafka.

    You must perform the following steps:

    • Edit the TurbineDataProducerApp class to use the wind turbine timestamps. To activate this feature, use the emitter.withTurbineTimestamps() method.

    • Verify that the count shown in the front end is lower than 10.

    • Adjust the Kafka Streams topology of the energy meter back end to handle late events.

    1. Edit the TurbineDataProducerApp class of the turbine data producer. Activate the wind turbine timestamps feature.

      // TODO: Activate the turbine timestamps feature
      emitter.withTurbineTimestamps();
    2. Restart the turbine data producer. The output should display the timestamp when the turbine produces each record.

      [user@host turbine-data-producer]$ ./mvnw compile exec:java "-Dexec.mainClass=com.redhat.energy.TurbineDataProducerApp"
      ...output omitted...
      Produced { turbineId: 2, power: 1813523 } - Timestamp: 1632474226000
      Produced { turbineId: 3, power: 1814831 } - Timestamp: 1632474216000 - DELAYED
    3. Navigate back to the energy meter front-end browser tab. Verify that the Reported values (last 10 seconds) field displays a count less than 10. You can also check the logs of the energy-meter back end.

      MAP - Turbine: 3 | 1612075 Watts -> 1.612075 MWatts
      COUNT - Turbine: 1 | Count:7
      COUNT - Turbine: 2 | Count:7
      COUNT - Turbine: 3 | Count:7
    4. Edit the StreamTopologyBuilder class of the energy meter back end. Set a grace period of 12 seconds.

      wattsValuesStream
          .groupByKey()
          .windowedBy(
              TimeWindows
                  .of(Duration.ofSeconds(10))
                  .advanceBy(Duration.ofSeconds(10))
                  // TODO: adjust grace period
                  .grace(Duration.ofSeconds(12))
          )
      ...output omitted...
    5. Return to your browser tab and refresh the page. Verify the back-end application logs. Wait until the Kafka Streams engine switches to the RUNNING state.

      Note

      If you experience problems while restarting the application, then stop the Quarkus application and run the ./mvnw quarkus:dev command again.

      2021-09-14 14:39:51,336 INFO  [org.apa.kaf.str.KafkaStreams] (energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2-StreamThread-1) stream-client [energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2] State transition from REBALANCING to RUNNING

      In the browser tab, verify that the Reported values (last 10 seconds) field is 10.

  4. Implement a unit test to verify that the conversion from Watts to MWatts is correct.

    The energy meter back end reads power measurements in Watts from the turbine-generated-watts topic. The topology converts the values from Watts to MWatts. In the conversion, the topology generates a stream of MWattsMeasurement objects and materializes the new stream to the turbine-generated-mwatts topic.

    You must write a unit test that, given a Watts measurement in the turbine-generated-watts topic, verifies that the topology writes the corresponding MWatts value in the turbine-generated-mwatts topic.

    Note

    1 MWatt = 1,000,000 Watts.

    1. Edit the StreamTopologyBuilderTest class. In the setup method, create a test driver to test the StreamTopologyBuilder topology.

      // TODO: create the test driver
      StreamTopologyBuilder builder = new StreamTopologyBuilder();
      testDriver = new TopologyTestDriver(builder.buildTopology());
    2. In the same method, create the serdes and test topics required for the test.

      // TODO: Create test topics
      wattsStream = testDriver.createInputTopic(
          "turbine-generated-watts",
          new IntegerSerializer(),
          new IntegerSerializer()
      );
      
      ObjectMapperSerde<MWattsMeasurement> measurementSerde = new ObjectMapperSerde<>(MWattsMeasurement.class);
      measurementsStream = testDriver.createOutputTopic(
          "turbine-generated-mwatts",
          new IntegerDeserializer(),
          measurementSerde.deserializer()
      );
    3. In the same file, add a test case to verify that for a value of 2500000 Watts, the topology writes a value of 2.5 MWatts.

      // TODO: implement test
      @Test
      public void testMWattsConversion() {
          wattsStream.pipeInput(1, 2500000);
      
          TestRecord<Integer, MWattsMeasurement> record = measurementsStream.readRecord();
      
          assertEquals(2.5, record.getValue().megawatts);
      }
    4. Open a new terminal window, navigate to the troubleshooting-apps/energy-meter directory and verify that the test passes.

      [user@host energy-meter]$ ./mvnw test
      ...output omitted...
      [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0
      [INFO]
      [INFO] ------------------------------------------------------------------------
      [INFO] BUILD SUCCESS
      [INFO] ------------------------------------------------------------------------
      ...output omitted...

Evaluation

Do not stop the running applications.

Open a new terminal window, and make sure you are in your workspace directory. Activate the Python virtual environment, and use the lab command to grade your work. Correct any reported failures and rerun the command until successful.

(.venv) [user@host AD482]$ lab grade troubleshooting-apps

Terminate all the running applications.

Finish

Use the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish troubleshooting-apps

This concludes the lab.

Revision: ad482-1.8-cc2ae1c