Bookmark this page

Lab: Building Applications with the Streams API

In this lab, you will process data streams in a Quarkus application, by using the Kafka Streams library.

You will build the stream processing topology required for a wind turbines monitoring dashboard. This application should handle input data streams from the following Kafka topics:

  • turbines. A stream of wind turbines registered in the system.

  • turbine-generated-watts. A live stream of how much power each turbine generates, in Watts.

The application should show general information about each turbine, power production in real time, in Megawatts (MWatts), and the number of processed power production values by turbine. By using Kafka Streams, you must process the input streams in the application's back end to display such values in the front end.

Outcomes

You should be able to:

  • Install and configure Kafka Streams in a Quarkus application.

  • Create a processing topology with the Kafka Streams DSL.

  • Read streams from and write streams to Kafka topics.

  • Use core DSL abstractions: KStream and KTable.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.

[user@host AD482]$ source .venv/bin/activate
(.venv) [user@host AD482]$ lab start streams-building

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Copy the Bootstrap Server and Bootstrap Port values. You will use these values to connect to the Kafka cluster.

The lab command copies the exercise files, from the AD482-apps/streams-building/apps/energy-meter directory, which is in your local Git repository, into the streams-building directory, at the root of your workspace.

Procedure 3.3. Instructions

  1. Navigate to the streams-building directory and briefly inspect the back-end and the front-end applications.

    The energy-meter directory contains the Quarkus Kafka Streams back-end application, which includes the following elements:

    • The api package, which exposes Kafka Streams stores and Kafka topics to the front end.

    • The records package, which includes classes that specify the format of stream records.

    • The StreamTopologyBuilder class. This is the only class that you must edit in this exercise, to populate the stores and topics exposed to the front end.

    The energy-meter-front directory contains a React application, which connects to the back end and displays the processed data in a web dashboard.

  2. Install the Quarkus extension for Kafka Streams in the back-end application.

    1. Navigate to the energy-meter directory.

      (.venv) [user@host streams-building]$ cd energy-meter
    2. Run the following command to add the kafka-streams Quarkus extension to your application:

      (.venv) [user@host energy-meter]$ ./mvnw quarkus:add-extension \
       -Dextensions="kafka-streams"
      ...output omitted...
      [INFO] [SUCCESS] ✅  Extension io.quarkus:quarkus-kafka-streams has been installed
      ...output omitted...
  3. Configure Kafka Streams in the back-end application, according to the following specifications:

    Table 3.2. Kafka Streams Configuration

    Application ID energy-meter-streams
    Bootstrap serverKafka bootstrap host and port
    Topics turbines,​turbine-generated-watts,​turbine-generated-mwatts,​turbine-stats

    1. By using your editor of choice, open the src/main/resources/application.properties file, and add the following configuration parameters.

      # TODO: configure Kafka Streams
      quarkus.kafka-streams.application-id=energy-meter-streams
      quarkus.kafka-streams.bootstrap-servers=YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT
      quarkus.kafka-streams.topics=turbines,turbine-generated-watts,turbine-generated-mwatts,turbine-stats

      Make sure to replace the YOUR_KAFKA_BOOTSTRAP_HOST and YOUR_KAFKA_BOOTSTRAP_PORT strings with the values from the lab start script.

  4. Start the applications for this exercise. Run each of them in a separate terminal window.

    • Back-end application: From the streams-building/energy-meter directory, run the ./mvnw quarkus:dev command. The application should be available at http://localhost:8080. The logs should display an error caused by an empty Kafka streams topology.

    • Front-end application: From the streams-building directory, run the python scripts/serve-frontend.py command. A simple Python web server should make the front-end application available at http://localhost:8081.

    • Producer script: From the streams-building directory, run the python scripts/produce_turbines.py command. This script should produce three wind turbines into the turbines Kafka topic, and continuous energy production Watts values for each turbine into the turbine-generated-watts topics.

    Make sure that the Python virtual environment is activated before running the Python scripts.

    1. Run the Quarkus application.

      [user@host energy-meter]$ ./mvnw quarkus:dev
      ...output omitted...
      2021-09-13 13:44:20,514 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile dev): org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.

      The application fails because the processing topology is empty.

      Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.

      Note

      The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the ./mvnw quarkus:dev command again.

    2. Open a new terminal window and make sure you are in your workspace directory. Activate the Python virtual environment, navigate to the streams-building directory, and run the producer script.

      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$ cd streams-building
      [user@host streams-building]$ python scripts/produce_turbines.py
      - Registered new turbine in 'turbines' topic
      (1,
       {'cutOutWindSpeed': 50,
        'description': 'Wind Turbine A',
        'id': 1,
        'powerCapacity': 2000000})
      
      ...output omitted...
      
      - Produced power in 'turbine-generated-watts' topic
      (1, 1462802)
      
      ...output omitted...

      Leave this terminal window open.

    3. Open a new terminal window, and make sure you are in your workspace directory. Activate the Python virtual environment, navigate to the streams-building directory, and run the front-end application.

      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$ cd streams-building
      (.venv) [user@host streams-building]$ python scripts/serve-frontend.py
      ...output omitted...

      Open your web browser and navigate to http://localhost:8081. The dashboard should list no wind turbines.

      Leave this terminal window and the browser tab open.

  5. Display the list of registered wind turbines in the dashboard.

    The application already implements the following features:

    • The back end exposes the /turbines endpoint, which gathers the list of registered wind turbines from a Kafka Streams store, called turbines-store. This store should contain WindTurbine objects.

    • The front end fetches wind turbines from the /turbines endpoint and displays them.

    You must implement the code to populate the turbines-store store with WindTurbine records, by reading the turbines Kafka topic as a KTable. Read the records from the turbines topic, according to this specification:

     ContentType
    KeyWind turbine ID Integer
    ValueWind turbine instance WindTurbine

    Use the same specification to materialize the KTable object as the turbines-store store.

    Finally, refresh the dashboard page in your browser, wait until Quarkus restarts the Kafka Streams engine, and verify that the dashboard displays three wind turbines.

    1. Open the StreamTopologyBuilder class and add the code to create the WindTurbine serde.

      // TODO: Create wind turbine serde
      ObjectMapperSerde<WindTurbine> turbineSerde = new ObjectMapperSerde<>(WindTurbine.class);
    2. Use the StreamsBuilder#table method to read the turbines Kafka topic into a KTable. Specify the key/value serdes to consume from the Kafka topic, and the key/value serdes to materialize the table as the turbines-store store.

      // TODO: read the "turbines" topic as a KTable
      builder.table(
          "turbines",
          Consumed.with(Serdes.Integer(), turbineSerde),
          Materialized
              .<Integer, WindTurbine, KeyValueStore<Bytes, byte[]>>as("turbines-store")
              .withKeySerde(Serdes.Integer())
              .withValueSerde(turbineSerde)
      );
    3. Return to your browser tab and refresh the page. Inspect the back-end application logs. Wait until the Kafka Streams engine switches to the RUNNING state.

      Note

      If you experience problems while restarting the application, then stop the Quarkus application and run the ./mvnw quarkus:dev command again.

      2021-09-14 14:39:51,336 INFO  [org.apa.kaf.str.KafkaStreams] (energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2-StreamThread-1) stream-client [energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2] State transition from REBALANCING to RUNNING

      In the browser tab, verify that the dashboard displays three wind turbines.

  6. Display real-time power generation values in the dashboard.

    The application already implements the following features:

    • The back end exposes the /turbines/generated-power endpoint, which reads the turbine-generated-mwatts Kafka topic and exposes a stream of server-sent events. Records in the turbine-generated-mwatts topic should use MWattsMeasurement instances as values.

    • The front end opens a connection to the /turbines/generated-power endpoint and displays the received values.

    You must implement the code to write a stream of MWattsMeasurement objects to the turbine-generated-mwatts Kafka topic. To this end, you must perform the following steps:

    • Read the turbine-generated-watts as a KStream.

    • Map the stream to a new stream of MWattsMeasurement objects.

    • Write the resulting stream to the turbine-generated-mwatts Kafka topic.

    Perform these operations according to this table:

     Original contentOriginal typeNew contentNew type
    KeyWind turbine ID Integer Wind turbine ID Integer
    ValueGenerated power value in Watts Integer Object encapsulating MWatts values MWattsMeasurement

    Note

    1 MWatt = 1,000,000 Watts

    Finally, refresh the front-end page again and wait until Quarkus restarts the Kafka Streams engine. Verify that the page displays consumption values in MWatts.

    1. Switch to the StreamTopologyBuilder class file. Use the StreamsBuilder#table to read the turbine-generated-watts Kafka topic as a KStream. Specify the required serdes to consume each record's keys and value.

      // TODO: read the "turbine-generated-watts" topic as a KStream
      KStream<Integer, Integer> wattsValuesStream = builder.stream(
          "turbine-generated-watts",
          Consumed.with(Serdes.Integer(), Serdes.Integer())
      );
    2. Create a serde for MWattsMeasurement objects.

      // TODO: Create MWattsMeasurement serde
      ObjectMapperSerde<MWattsMeasurement> mwattsMeasurementSerde = new ObjectMapperSerde<>(MWattsMeasurement.class);
    3. Map the values of the wattsValuesStream to a new KStream of MWattsMeasurement objects. Write the new stream to the turbine-generated-mwatts Kafka topic. Use the MWattsMeasurement serde to materialize this stream into the topic.

      // TODO: map the watts stream into a new mwatts stream
      wattsValuesStream.map((turbineId, watts) -> {
          Double megawatts = (double) watts / 1000000;
          MWattsMeasurement measurement = new MWattsMeasurement(turbineId, megawatts);
          System.out.println(
              "MAP - Turbine: " + turbineId +
              " | " + watts + " Watts -> " + megawatts + " MWatts"
          );
          return KeyValue.pair(turbineId, measurement);
      }).to(
          "turbine-generated-mwatts",
          Produced.with(Serdes.Integer(), mwattsMeasurementSerde)
      );
    4. Refresh the front-end page again. Verify that the page displays consumption values in MWatts. You might have to wait a few seconds until the Kafka Streams engine reaches the RUNNING state after the restart.

  7. Display power generation value counts in the dashboard.

    The application already implements the following features:

    • The back end exposes the /turbines/measurements-count endpoint, which reads the turbine-stats Kafka topic and exposes a stream of server-sent events. Record values in the turbine-stats topic should be instances of WindTurbineStats. Each one of these objects contains the turbine ID and the number of reported measurements.

    • The front end opens a connection to the /turbines/measurements-count endpoint and displays the received values.

    You must implement the code to write a stream of WindTurbineStats objects to the turbine-stats Kafka topic. To this end, perform the following steps:

    • Group the previously created KStream object by turbine ID.

    • Count the values in the grouped stream.

    • Transform the generated counts KTable to a stream.

    • Map the stream of (turbineId, count) records into another stream of (turbineId, WindTurbineStats) records.

    • Write the resulting stream to the turbine-stats topic.

    1. Create the serde for the WindTurbineStats

      // TODO: Create WindTurbineStats serde
      ObjectMapperSerde<WindTurbineStats> statsSerde = new ObjectMapperSerde<>(WindTurbineStats.class);
    2. Group the wattsValuesStream by key and count the records. Transform the resulting table into a stream. Map the stream of (turbineId, count) records into another stream of (turbineId, WindTurbineStats) records. Finally, write the resulting stream to the turbine-stats topic.

      // TODO: count measurements by turbine and write results to a new stream
      wattsValuesStream
          .groupByKey()
          .count()
          .toStream()
          .map((turbineId, count) -> {
              WindTurbineStats stats = new WindTurbineStats(turbineId, count);
              System.out.println("COUNT - Turbine: " + turbineId + " | Count:" + count);
              return KeyValue.pair(turbineId, stats);
          })
          .to(
              "turbine-stats",
              Produced.with(Serdes.Integer(), statsSerde)
          );

      An API endpoint makes the turbine-stats Kafka topic available to the front end.

    3. Refresh the front-end page again. Verify that the page displays the counts.

Evaluation

Do not stop the running applications.

Open a new terminal window, and make sure you are in your workspace directory. Activate the Python virtual environment, and use the lab command to grade your work. Correct any reported failures and rerun the command until successful.

(.venv) [user@host AD482]$ lab grade streams-building

Terminate all the running applications.

Finish

Use the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish streams-building

This concludes the lab.

Revision: ad482-1.8-cc2ae1c