Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this lab, you will process data streams in a Quarkus application, by using the Kafka Streams library.
You will build the stream processing topology required for a wind turbines monitoring dashboard. This application should handle input data streams from the following Kafka topics:
turbines. A stream of wind turbines registered in the system.turbine-generated-watts. A live stream of how much power each turbine generates, in Watts.
The application should show general information about each turbine, power production in real time, in Megawatts (MWatts), and the number of processed power production values by turbine. By using Kafka Streams, you must process the input streams in the application's back end to display such values in the front end.
Outcomes
You should be able to:
Install and configure Kafka Streams in a Quarkus application.
Create a processing topology with the Kafka Streams DSL.
Read streams from and write streams to Kafka topics.
Use core DSL abstractions: KStream and KTable.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.
[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$lab start streams-building
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Copy the Bootstrap Server and Bootstrap Port values.
You will use these values to connect to the Kafka cluster.
The lab command copies the exercise files, from the AD482-apps/streams-building/apps/energy-meter directory, which is in your local Git repository, into the streams-building directory, at the root of your workspace.
Procedure 3.3. Instructions
Navigate to the
streams-buildingdirectory and briefly inspect the back-end and the front-end applications.The
energy-meterdirectory contains the Quarkus Kafka Streams back-end application, which includes the following elements:The
apipackage, which exposes Kafka Streams stores and Kafka topics to the front end.The
recordspackage, which includes classes that specify the format of stream records.The
StreamTopologyBuilderclass. This is the only class that you must edit in this exercise, to populate the stores and topics exposed to the front end.
The
energy-meter-frontdirectory contains a React application, which connects to the back end and displays the processed data in a web dashboard.Install the Quarkus extension for Kafka Streams in the back-end application.
Navigate to the
energy-meterdirectory.(.venv) [user@host streams-building]$
cd energy-meterRun the following command to add the
kafka-streamsQuarkus extension to your application:(.venv) [user@host energy-meter]$
./mvnw quarkus:add-extension \-Dextensions="kafka-streams"...output omitted... [INFO] [SUCCESS] ✅ Extension io.quarkus:quarkus-kafka-streams has been installed ...output omitted...
Configure Kafka Streams in the back-end application, according to the following specifications:
Table 3.2. Kafka Streams Configuration
Application ID energy-meter-streamsBootstrap server Kafka bootstrap host and port Topics turbines,turbine-generated-watts,turbine-generated-mwatts,turbine-statsBy using your editor of choice, open the
src/main/resources/application.propertiesfile, and add the following configuration parameters.# TODO: configure Kafka Streams quarkus.kafka-streams.application-id=energy-meter-streams quarkus.kafka-streams.bootstrap-servers=
quarkus.kafka-streams.topics=turbines,turbine-generated-watts,turbine-generated-mwatts,turbine-statsYOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORTMake sure to replace the
YOUR_KAFKA_BOOTSTRAP_HOSTandYOUR_KAFKA_BOOTSTRAP_PORTstrings with the values from thelab startscript.
Start the applications for this exercise. Run each of them in a separate terminal window.
Back-end application: From the
streams-building/energy-meterdirectory, run the./mvnw quarkus:devcommand. The application should be available athttp://localhost:8080. The logs should display an error caused by an empty Kafka streams topology.Front-end application: From the
streams-buildingdirectory, run thepython scripts/serve-frontend.pycommand. A simple Python web server should make the front-end application available athttp://localhost:8081.Producer script: From the
streams-buildingdirectory, run thepython scripts/produce_turbines.pycommand. This script should produce three wind turbines into theturbinesKafka topic, and continuous energy production Watts values for each turbine into theturbine-generated-wattstopics.
Make sure that the Python virtual environment is activated before running the Python scripts.
Run the Quarkus application.
[user@host energy-meter]$
./mvnw quarkus:dev...output omitted... 2021-09-13 13:44:20,514 ERROR [io.qua.run.Application] (Quarkus Main Thread) Failed to start application (with profile dev): org.apache.kafka.streams.errors.TopologyException: Invalid topology: Topology has no stream threads and no global threads, must subscribe to at least one source topic or global table.The application fails because the processing topology is empty.
Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.
Note
The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the
./mvnw quarkus:devcommand again.Open a new terminal window and make sure you are in your workspace directory. Activate the Python virtual environment, navigate to the
streams-buildingdirectory, and run the producer script.[user@host AD482]$
source .venv/bin/activate(.venv) [user@host AD482]$cd streams-building[user@host streams-building]$python scripts/produce_turbines.py- Registered new turbine in 'turbines' topic (1, {'cutOutWindSpeed': 50, 'description': 'Wind Turbine A', 'id': 1, 'powerCapacity': 2000000}) ...output omitted... - Produced power in 'turbine-generated-watts' topic (1, 1462802) ...output omitted...Leave this terminal window open.
Open a new terminal window, and make sure you are in your workspace directory. Activate the Python virtual environment, navigate to the
streams-buildingdirectory, and run the front-end application.[user@host AD482]$
source .venv/bin/activate(.venv) [user@host AD482]$cd streams-building(.venv) [user@host streams-building]$python scripts/serve-frontend.py...output omitted...Open your web browser and navigate to
http://localhost:8081. The dashboard should list no wind turbines.Leave this terminal window and the browser tab open.
Display the list of registered wind turbines in the dashboard.
The application already implements the following features:
The back end exposes the
/turbinesendpoint, which gathers the list of registered wind turbines from a Kafka Streams store, calledturbines-store. This store should containWindTurbineobjects.The front end fetches wind turbines from the
/turbinesendpoint and displays them.
You must implement the code to populate the
turbines-storestore withWindTurbinerecords, by reading theturbinesKafka topic as a KTable. Read the records from theturbinestopic, according to this specification:Content Type Key Wind turbine ID IntegerValue Wind turbine instance WindTurbineUse the same specification to materialize the
KTableobject as theturbines-storestore.Finally, refresh the dashboard page in your browser, wait until Quarkus restarts the Kafka Streams engine, and verify that the dashboard displays three wind turbines.
Open the
StreamTopologyBuilderclass and add the code to create theWindTurbineserde.// TODO: Create wind turbine serde
ObjectMapperSerde<WindTurbine> turbineSerde = new ObjectMapperSerde<>(WindTurbine.class);Use the
StreamsBuilder#tablemethod to read theturbinesKafka topic into a KTable. Specify the key/value serdes to consume from the Kafka topic, and the key/value serdes to materialize the table as theturbines-storestore.// TODO: read the "turbines" topic as a KTable
builder.table( "turbines", Consumed.with(Serdes.Integer(), turbineSerde), Materialized .<Integer, WindTurbine, KeyValueStore<Bytes, byte[]>>as("turbines-store") .withKeySerde(Serdes.Integer()) .withValueSerde(turbineSerde) );Return to your browser tab and refresh the page. Inspect the back-end application logs. Wait until the Kafka Streams engine switches to the RUNNING state.
Note
If you experience problems while restarting the application, then stop the Quarkus application and run the
./mvnw quarkus:devcommand again.2021-09-14 14:39:51,336 INFO [org.apa.kaf.str.KafkaStreams] (energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2-StreamThread-1) stream-client [energy-meter-streams-2571d5e9-9883-4d7b-9f21-8f30ad65c2b2]
State transition from REBALANCING to RUNNINGIn the browser tab, verify that the dashboard displays three wind turbines.
Display real-time power generation values in the dashboard.
The application already implements the following features:
The back end exposes the
/turbines/generated-powerendpoint, which reads theturbine-generated-mwattsKafka topic and exposes a stream of server-sent events. Records in theturbine-generated-mwattstopic should useMWattsMeasurementinstances as values.The front end opens a connection to the
/turbines/generated-powerendpoint and displays the received values.
You must implement the code to write a stream of
MWattsMeasurementobjects to theturbine-generated-mwattsKafka topic. To this end, you must perform the following steps:Read the
turbine-generated-wattsas a KStream.Map the stream to a new stream of
MWattsMeasurementobjects.Write the resulting stream to the
turbine-generated-mwattsKafka topic.
Perform these operations according to this table:
Original content Original type New content New type Key Wind turbine ID IntegerWind turbine ID IntegerValue Generated power value in Watts IntegerObject encapsulating MWatts values MWattsMeasurementNote
1 MWatt = 1,000,000 Watts
Finally, refresh the front-end page again and wait until Quarkus restarts the Kafka Streams engine. Verify that the page displays consumption values in MWatts.
Switch to the
StreamTopologyBuilderclass file. Use theStreamsBuilder#tableto read theturbine-generated-wattsKafka topic as aKStream. Specify the required serdes to consume each record's keys and value.// TODO: read the "turbine-generated-watts" topic as a KStream
KStream<Integer, Integer> wattsValuesStream = builder.stream( "turbine-generated-watts", Consumed.with(Serdes.Integer(), Serdes.Integer()) );Create a serde for
MWattsMeasurementobjects.// TODO: Create MWattsMeasurement serde
ObjectMapperSerde<MWattsMeasurement> mwattsMeasurementSerde = new ObjectMapperSerde<>(MWattsMeasurement.class);Map the values of the
wattsValuesStreamto a new KStream ofMWattsMeasurementobjects. Write the new stream to theturbine-generated-mwattsKafka topic. Use theMWattsMeasurementserde to materialize this stream into the topic.// TODO: map the watts stream into a new mwatts stream
wattsValuesStream.map((turbineId, watts) -> { Double megawatts = (double) watts / 1000000; MWattsMeasurement measurement = new MWattsMeasurement(turbineId, megawatts); System.out.println( "MAP - Turbine: " + turbineId + " | " + watts + " Watts -> " + megawatts + " MWatts" ); return KeyValue.pair(turbineId, measurement); }).to( "turbine-generated-mwatts", Produced.with(Serdes.Integer(), mwattsMeasurementSerde) );Refresh the front-end page again. Verify that the page displays consumption values in MWatts. You might have to wait a few seconds until the Kafka Streams engine reaches the RUNNING state after the restart.
Display power generation value counts in the dashboard.
The application already implements the following features:
The back end exposes the
/turbines/measurements-countendpoint, which reads theturbine-statsKafka topic and exposes a stream of server-sent events. Record values in theturbine-statstopic should be instances ofWindTurbineStats. Each one of these objects contains the turbine ID and the number of reported measurements.The front end opens a connection to the
/turbines/measurements-countendpoint and displays the received values.
You must implement the code to write a stream of
WindTurbineStatsobjects to theturbine-statsKafka topic. To this end, perform the following steps:Group the previously created
KStreamobject by turbine ID.Count the values in the grouped stream.
Transform the generated counts KTable to a stream.
Map the stream of
(turbineId, count)records into another stream of(turbineId, WindTurbineStats)records.Write the resulting stream to the
turbine-statstopic.
Create the serde for the
WindTurbineStats// TODO: Create WindTurbineStats serde
ObjectMapperSerde<WindTurbineStats> statsSerde = new ObjectMapperSerde<>(WindTurbineStats.class);Group the
wattsValuesStreamby key and count the records. Transform the resulting table into a stream. Map the stream of(turbineId, count)records into another stream of(turbineId, WindTurbineStats)records. Finally, write the resulting stream to theturbine-statstopic.// TODO: count measurements by turbine and write results to a new stream
wattsValuesStream .groupByKey() .count() .toStream() .map((turbineId, count) -> { WindTurbineStats stats = new WindTurbineStats(turbineId, count); System.out.println("COUNT - Turbine: " + turbineId + " | Count:" + count); return KeyValue.pair(turbineId, stats); }) .to( "turbine-stats", Produced.with(Serdes.Integer(), statsSerde) );An API endpoint makes the
turbine-statsKafka topic available to the front end.Refresh the front-end page again. Verify that the page displays the counts.
This concludes the lab.