Bookmark this page

Lab: Creating Asynchronous Services with Event Collaboration

In this lab, you will apply stateful and stateless transformations in an event stream of an electric company. The application is an enhancement requested by the management team of the company. They want to detect wind turbines with low profit margins, so they can make better business decisions.

You will apply stateful transformations on the earnings and expenses event streams to calculate the profit margin. You will apply stateless transformations on the calculated profit margin, to raise an alert for the turbines with low margin.

Outcomes

You should be able to apply stateful and stateless transformation in a data stream.

You will find the solution files for this exercise in the AD482-apps repository, within the collaboration-async/solutions directory.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start collaboration-async

The lab command copies the exercise files, from the AD482-apps/collaboration-async/apps directory, which is in your local Git repository, into the collaboration-async directory, which is in your workspace.

Procedure 4.4. Instructions

  1. Create a topology in the WindTurbineProfitMarginsPipeline class that calculates the average earnings per wind turbine, and saves the results into a KTable.

    • The wind-turbine-earnings topic contains the WindTurbineEarningWasAdded events. Each event record has the wind turbine ID as the record key.

    • You must use the AverageData class to store the intermediate aggregated calculations. Materialize the aggregation in a store named earnings-aggregated-store.

    • Map the values of the aggregation to a KTable that contains the average earnings. Materialize the mapping in a store named wind-turbine-average-expenses-store.

    1. Navigate to the collaboration-async directory located in your workspace directory.

    2. By using your editor of choice, open the WindTurbineProfitMarginsPipeline class, and examine the code. The class defines all the SerDe objects, the state store names, the topic names, and the logic to start and stop the stream. You define the topology in the onStart method.

    3. Create a stream from the wind-turbine-earnings topic, and assign it to a variable.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Create a KStream for the earning events
          KStream<Integer, WindTurbineEarningWasAdded> earningsStream = builder.stream(
              WIND_TURBINE_EARNINGS_TOPIC,
              Consumed.with(Serdes.Integer(), earningEventSerde)
          );
      
          ...output omitted...
      }
    4. Calculate the intermediate earnings by first grouping the events stored in the earnings stream by key, and then by aggregating the record values in the AverageData class. Materialize the resulting KTable as a store named earnings-aggregated-store.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Aggregate the earnings
          KTable<Integer, AverageData> aggregatedEarnings = earningsStream
          .groupByKey()
          .aggregate(
              AverageData::new,
              (key, value, aggregate) -> {
                  aggregate.increaseCount(1);
                  aggregate.increaseSum(value.amount);
      
                  return aggregate;
              },
              Materialized.<Integer, AverageData, KeyValueStore<Bytes, byte[]>>
                              as(AGGREGATED_EARNINGS_STORE)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(averageDataSerde)
          );
      
          ...output omitted...
      }
    5. Map the aggregation values into a KTable that contains the average earnings. Materialize the KTable as a store named wind-turbine-average-earnings-store.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Calculate the average earnings
          KTable<Integer, Double> averageEarningsTable = aggregatedEarnings.mapValues(
              value -> value.sum / value.count,
              Materialized.<Integer, Double, KeyValueStore<Bytes, byte[]>>
                              as(AVERAGE_EARNINGS_STORE)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(Serdes.Double())
          );
      
          ...output omitted...
      }
  2. Update the topology created in the WindTurbineProfitMarginsPipeline class to calculate the average expenses per wind turbine, and save the results into a KTable.

    • The wind-turbine-expenses topic contains the WindTurbineExpenseWasAdded events. Each event record has the wind turbine ID as the record key.

    • You must use the AverageData class to store the intermediate aggregated calculations. Materialize the aggregation in a store named expenses-aggregated-store.

    • Map the values of the aggregation to a KTable that contains the average earnings. Materialize the mapping in a store named wind-turbine-average-expenses-store.

    1. Create a stream from the wind-turbine-expenses topic, and assign it to a variable.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Create a KStream for the expense events
          KStream<Integer, WindTurbineExpenseWasAdded> expensesStream = builder.stream(
              WIND_TURBINE_EXPENSES_TOPIC,
              Consumed.with(Serdes.Integer(), expenseEventSerde)
          );
      
          ...output omitted...
      }
    2. Calculate the intermediate expenses by first grouping by key the events stored in the expenses stream, and then aggregating the record values in the AverageData class. Materialize the resulting KTable as a store named expenses-aggregated-store.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Aggregate the expenses
          KTable<Integer, AverageData> aggregatedExpenses = expensesStream
          .groupByKey()
          .aggregate(
              AverageData::new,
              (key, value, aggregate) -> {
                  aggregate.increaseCount(1);
                  aggregate.increaseSum(value.amount);
      
                  return aggregate;
              },
              Materialized.<Integer, AverageData, KeyValueStore<Bytes, byte[]>>
                              as(AGGREGATED_EXPENSES_STORE)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(averageDataSerde)
          );
      
          ...output omitted...
      }
    3. Map the aggregation values into a KTable that contains the average expenses. Materialize the KTable as a store named wind-turbine-average-expenses-store.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
              // TODO: Calculate the average expenses
              KTable<Integer, Double> averageExpensesTable = aggregatedExpenses
              .mapValues(
                  value -> value.sum / value.count,
                  Materialized.<Integer, Double, KeyValueStore<Bytes, byte[]>>
                                  as(AVERAGE_EXPENSES_STORE)
                      .withKeySerde(Serdes.Integer())
                      .withValueSerde(Serdes.Double())
              );
      
          ...output omitted...
      }
  3. Update the topology created in the WindTurbineProfitMarginsPipeline class to join the average earnings and average expenses KTables. Use the WindTurbineProfitMarginWasCalculated event to calculate the profit margin per wind turbine, and send the result to a topic named wind-turbine-profit-margins.

    1. Join the average earnings and average expenses variables. Use the WindTurbineProfitMarginWasCalculated class as the value joiner, and transform the resulting KTable into a stream.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Calculate the profit margins
          averageEarningsTable.join(
              averageExpensesTable,
              WindTurbineProfitMarginWasCalculated::new
          ).toStream()
      
          ...output omitted...
      }
    2. Send the records available in the joining stream to a topic named wind-turbine-profit-margins.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Calculate the profit margins
          averageEarningsTable.join(
              averageExpensesTable,
              WindTurbineProfitMarginWasCalculated::new
          ).toStream()
          .to(
              WIND_TURBINE_PROFIT_MARGINS_TOPIC,
              Produced.with(Serdes.Integer(), profitEventsSerde)
          );
      
          ...output omitted...
      }
  4. Create a topology in the NotifyAboutLowProfitMarginPipeline class that sends an alert event to the low-profit-margin-alert topic when the profit margin of a wind turbine is lower than 0.10.

    The wind-turbine-profit-margins topic contains the WindTurbineProfitMarginWasCalculated events. Each event record has the wind turbine ID as the record key.

    1. Build a stream from the wind-turbine-profit-margins topic, and filter all the events with a profit margin lower than 0.10.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Build the stream topology
          builder.stream(
              WIND_TURBINE_PROFIT_MARGINS_TOPIC,
              Consumed.with(Serdes.Integer(), profitEventSerde)
          ).filter(
              (key, profit) -> profit.profitMargin < 0.10
          )
      
          ...output omitted...
      }
    2. Map the filtered events to a new events stream. Set the key of the new record to be the same as the records in the wind-turbine-profit-margins topic. Set the record value to a new LowProfitMarginWasDetected class.

      You can use the helper methods for debugging purposes.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Build the stream topology
          builder.stream(
              WIND_TURBINE_PROFIT_MARGINS_TOPIC,
              Consumed.with(Serdes.Integer(), profitEventSerde)
          ).filter(
              (key, profit) -> profit.profitMargin < 0.10
          )
          .map((key, profit) -> {
              logLowProfitMarginAlert(key, profit.profitMargin);
      
              return new KeyValue<>(
                  key,
                  new LowProfitMarginWasDetected(key, profit.profitMargin)
              );
          })
      
          ...output omitted...
      }
    3. Send the LowProfitMarginWasDetected events to the low-profit-margin-alert topic.

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Build the stream topology
          builder.stream(
              WIND_TURBINE_PROFIT_MARGINS_TOPIC,
              Consumed.with(Serdes.Integer(), profitEventSerde)
          ).filter(
              (key, profit) -> profit.profitMargin < 0.10
          )
          .map((key, profit) -> {
              logLowProfitMarginAlert(key, profit.profitMargin);
      
              return new KeyValue<>(
                  key,
                  new LowProfitMarginWasDetected(key, profit.profitMargin)
              );
          })
          .to(
              LOW_PROFIT_MARGIN_TOPIC,
              Produced.with(Serdes.Integer(), alertsEventSerde)
          );
      
          ...output omitted...
      }
  5. Execute the application, run the provided collaboration-async/scripts/produce_events.py Python script to test the complete implementation, and stop the Quarkus application.

    1. Return to the command line terminal, and start the application.

      (.venv) [user@host collaboration-async]$ ./mvnw quarkus:dev
      ...output omitted...
    2. Open a new terminal window, navigate to your workspace directory, and activate the Python virtual environment.

      [user@host ~]$ cd AD482
      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$
    3. Run the produce_events.py Python script.

      (.venv) [user@host AD482]$ python \
       collaboration-async/scripts/produce_events.py
      Sending earning and expense events...
      
      ...output omitted...
      
      - New WindTurbineExpenseWasAdded event
               Wind Turbine ID: 1
               Amount: 4200.0
    4. Return to the console running the Quarkus application. It might take some time for the application to process the events and display a log message about a low profit margin detected.

      ...output omitted... LowProfitMarginWasDetected - Turbine ID: 2 Profit Margin: 0.087
    5. Stop the application.

Evaluation

Go back to your workspace. Run the lab command to grade your work. Correct any reported failures and rerun the command until successful.

(.venv) [user@host AD482]$ lab grade collaboration-async

Finish

Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish collaboration-async

This concludes the lab.

Revision: ad482-1.8-cc2ae1c