Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this lab, you will apply stateful and stateless transformations in an event stream of an electric company. The application is an enhancement requested by the management team of the company. They want to detect wind turbines with low profit margins, so they can make better business decisions.
You will apply stateful transformations on the earnings and expenses event streams to calculate the profit margin. You will apply stateless transformations on the calculated profit margin, to raise an alert for the turbines with low margin.
Outcomes
You should be able to apply stateful and stateless transformation in a data stream.
You will find the solution files for this exercise in the AD482-apps repository, within the collaboration-async/solutions directory.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start collaboration-async
The lab command copies the exercise files, from the AD482-apps/collaboration-async/apps directory, which is in your local Git repository, into the collaboration-async directory, which is in your workspace.
Procedure 4.4. Instructions
Create a topology in the
WindTurbineProfitMarginsPipelineclass that calculates the average earnings per wind turbine, and saves the results into aKTable.The
wind-turbine-earningstopic contains theWindTurbineEarningWasAddedevents. Each event record has the wind turbine ID as the record key.You must use the
AverageDataclass to store the intermediate aggregated calculations. Materialize the aggregation in a store namedearnings-aggregated-store.Map the values of the aggregation to a
KTablethat contains the average earnings. Materialize the mapping in a store namedwind-turbine-average-expenses-store.
Navigate to the
collaboration-asyncdirectory located in your workspace directory.By using your editor of choice, open the
WindTurbineProfitMarginsPipelineclass, and examine the code. The class defines all the SerDe objects, the state store names, the topic names, and the logic to start and stop the stream. You define the topology in theonStartmethod.Create a stream from the
wind-turbine-earningstopic, and assign it to a variable.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Create a KStream for the earning eventsKStream<Integer, WindTurbineEarningWasAdded> earningsStream = builder.stream( WIND_TURBINE_EARNINGS_TOPIC, Consumed.with(Serdes.Integer(), earningEventSerde) );...output omitted... }Calculate the intermediate earnings by first grouping the events stored in the earnings stream by key, and then by aggregating the record values in the
AverageDataclass. Materialize the resultingKTableas a store namedearnings-aggregated-store.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Aggregate the earningsKTable<Integer, AverageData> aggregatedEarnings = earningsStream .groupByKey() .aggregate( AverageData::new, (key, value, aggregate) -> { aggregate.increaseCount(1); aggregate.increaseSum(value.amount); return aggregate; }, Materialized.<Integer, AverageData, KeyValueStore<Bytes, byte[]>> as(AGGREGATED_EARNINGS_STORE) .withKeySerde(Serdes.Integer()) .withValueSerde(averageDataSerde) );...output omitted... }Map the aggregation values into a
KTablethat contains the average earnings. Materialize theKTableas a store namedwind-turbine-average-earnings-store.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Calculate the average earningsKTable<Integer, Double> averageEarningsTable = aggregatedEarnings.mapValues( value -> value.sum / value.count, Materialized.<Integer, Double, KeyValueStore<Bytes, byte[]>> as(AVERAGE_EARNINGS_STORE) .withKeySerde(Serdes.Integer()) .withValueSerde(Serdes.Double()) );...output omitted... }
Update the topology created in the
WindTurbineProfitMarginsPipelineclass to calculate the average expenses per wind turbine, and save the results into aKTable.The
wind-turbine-expensestopic contains theWindTurbineExpenseWasAddedevents. Each event record has the wind turbine ID as the record key.You must use the
AverageDataclass to store the intermediate aggregated calculations. Materialize the aggregation in a store namedexpenses-aggregated-store.Map the values of the aggregation to a
KTablethat contains the average earnings. Materialize the mapping in a store namedwind-turbine-average-expenses-store.
Create a stream from the
wind-turbine-expensestopic, and assign it to a variable.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Create a KStream for the expense eventsKStream<Integer, WindTurbineExpenseWasAdded> expensesStream = builder.stream( WIND_TURBINE_EXPENSES_TOPIC, Consumed.with(Serdes.Integer(), expenseEventSerde) );...output omitted... }Calculate the intermediate expenses by first grouping by key the events stored in the expenses stream, and then aggregating the record values in the
AverageDataclass. Materialize the resultingKTableas a store namedexpenses-aggregated-store.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Aggregate the expensesKTable<Integer, AverageData> aggregatedExpenses = expensesStream .groupByKey() .aggregate( AverageData::new, (key, value, aggregate) -> { aggregate.increaseCount(1); aggregate.increaseSum(value.amount); return aggregate; }, Materialized.<Integer, AverageData, KeyValueStore<Bytes, byte[]>> as(AGGREGATED_EXPENSES_STORE) .withKeySerde(Serdes.Integer()) .withValueSerde(averageDataSerde) );...output omitted... }Map the aggregation values into a
KTablethat contains the average expenses. Materialize theKTableas a store namedwind-turbine-average-expenses-store.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Calculate the average expensesKTable<Integer, Double> averageExpensesTable = aggregatedExpenses .mapValues( value -> value.sum / value.count, Materialized.<Integer, Double, KeyValueStore<Bytes, byte[]>> as(AVERAGE_EXPENSES_STORE) .withKeySerde(Serdes.Integer()) .withValueSerde(Serdes.Double()) );...output omitted... }
Update the topology created in the
WindTurbineProfitMarginsPipelineclass to join the average earnings and average expensesKTables. Use theWindTurbineProfitMarginWasCalculatedevent to calculate the profit margin per wind turbine, and send the result to a topic namedwind-turbine-profit-margins.Join the average earnings and average expenses variables. Use the
WindTurbineProfitMarginWasCalculatedclass as the value joiner, and transform the resultingKTableinto a stream.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Calculate the profit marginsaverageEarningsTable.join( averageExpensesTable, WindTurbineProfitMarginWasCalculated::new ).toStream()...output omitted... }Send the records available in the joining stream to a topic named
wind-turbine-profit-margins.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Calculate the profit margins averageEarningsTable.join( averageExpensesTable, WindTurbineProfitMarginWasCalculated::new ).toStream().to( WIND_TURBINE_PROFIT_MARGINS_TOPIC, Produced.with(Serdes.Integer(), profitEventsSerde) );...output omitted... }
Create a topology in the
NotifyAboutLowProfitMarginPipelineclass that sends an alert event to thelow-profit-margin-alerttopic when the profit margin of a wind turbine is lower than 0.10.The
wind-turbine-profit-marginstopic contains theWindTurbineProfitMarginWasCalculatedevents. Each event record has the wind turbine ID as the record key.Build a stream from the
wind-turbine-profit-marginstopic, and filter all the events with a profit margin lower than 0.10.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Build the stream topologybuilder.stream( WIND_TURBINE_PROFIT_MARGINS_TOPIC, Consumed.with(Serdes.Integer(), profitEventSerde) ).filter( (key, profit) -> profit.profitMargin < 0.10 )...output omitted... }Map the filtered events to a new events stream. Set the key of the new record to be the same as the records in the
wind-turbine-profit-marginstopic. Set the record value to a newLowProfitMarginWasDetectedclass.You can use the helper methods for debugging purposes.
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Build the stream topology builder.stream( WIND_TURBINE_PROFIT_MARGINS_TOPIC, Consumed.with(Serdes.Integer(), profitEventSerde) ).filter( (key, profit) -> profit.profitMargin < 0.10 ).map((key, profit) -> { logLowProfitMarginAlert(key, profit.profitMargin); return new KeyValue<>( key, new LowProfitMarginWasDetected(key, profit.profitMargin) ); })...output omitted... }Send the
LowProfitMarginWasDetectedevents to thelow-profit-margin-alerttopic.void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Build the stream topology builder.stream( WIND_TURBINE_PROFIT_MARGINS_TOPIC, Consumed.with(Serdes.Integer(), profitEventSerde) ).filter( (key, profit) -> profit.profitMargin < 0.10 ) .map((key, profit) -> { logLowProfitMarginAlert(key, profit.profitMargin); return new KeyValue<>( key, new LowProfitMarginWasDetected(key, profit.profitMargin) ); }).to( LOW_PROFIT_MARGIN_TOPIC, Produced.with(Serdes.Integer(), alertsEventSerde) );...output omitted... }
Execute the application, run the provided
collaboration-async/scripts/produce_events.pyPython script to test the complete implementation, and stop the Quarkus application.Return to the command line terminal, and start the application.
(.venv) [user@host collaboration-async]$
./mvnw quarkus:dev...output omitted...Open a new terminal window, navigate to your workspace directory, and activate the Python virtual environment.
[user@host ~]$
cd AD482[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$Run the
produce_events.pyPython script.(.venv) [user@host AD482]$
python \collaboration-async/scripts/produce_events.pySending earning and expense events... ...output omitted... - New WindTurbineExpenseWasAdded event Wind Turbine ID: 1 Amount: 4200.0Return to the console running the Quarkus application. It might take some time for the application to process the events and display a log message about a low profit margin detected.
...output omitted... LowProfitMarginWasDetected - Turbine ID: 2 Profit Margin: 0.087Stop the application.
This concludes the lab.