Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this lab, you will use Kafka, Kafka Streams, and Kafka Connect to build a streaming application.
You are hired by a company, SmartGardeners Inc. They want you to create an application that uses their sensor data and make the relevant calculations.
The company already uses a legacy CRUD application with some data stored in a relational database. This relational database includes metadata for garden sensors registered in the system. You must use this metadata, as well as real-time sensor data, to build a new Streaming application.
The following are the main components of the system.
PostgreSQL database. A database that stores the static sensor metadata. You must extract sensor data from this database, and write the data into thegarden-sensorstopic.garden-sensors. A service that reads real-time measurements from garden sensors and produces records to thegarden-sensor-measurementsandgarden-sensors-measurements-repltopics. You must implement a producer to produce measurements to thegarden-sensor-measurements.garden-streams. A service that reads data from thegarden-sensorsandgarden-sensors-measurements-replto process and aggregate the data. You must write a Kafka Streams topology to process the data and write the results into multiple Kafka topics.garden-back. A back-end service that reads events produced by thegarden-streamsandgarden-sensorsservices, and makes the data available to the front end. You must implement the consumer that reads the records from thegarden-sensor-measurementstopic.garden-front. A front-end web application that displays the data exposed by thegarden-backendservice.
Outcomes
You should be able to:
Apply a Kafka Connect connector to gather data from an external database and write the data into a Kafka topic.
Create Kafka producers and consumers.
(De)serialize records with Service Registry and use Avro type.
Use Kafka Streams to split a stream or records into multiple streams.
Enrich a stream with static data with Kafka Streams.
Aggregate data from a stream, under a specific time window by using Kafka Streams.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The OpenShift CLI (
oc) installed.The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start comprehensive-review
Copy the Service Registry URL value.
You will use this value to connect to the service registry.
The lab command copies the exercise files, from the AD482-apps/comprehensive-review/apps directory, which is in your local Git repository, into the comprehensive-review directory, at the root of your workspace.
Procedure 7.1. Instructions
Create a PostgreSQL Debezium source connector by applying the
AD482/comprehensive-review/resources/sensor-connector.yamlfile. This file defines aKafkaConnectorcustom resource that reads sensor metadata from the PostgreSQL database, and writes the records into thegarden-sensorstopic.Create a producer in the
garden-sensorsapplication that gathers sensor measurements and produces them into thegarden-sensor-measurementstopic. The producer must send measurement events every five seconds, and use the Avro format.Before running the application, you must generate the
com.redhat.training.gardens.event.SensorMeasurementTakenandcom.redhat.training.gardens.event.SensorMeasurementTypeclasses from the provided Avro schema. From thecomprehensive-review/garden-sensorsdirectory, run the./mvnw generate-resourcescommand to generate the relevant classes.After implementing the producer, start the
garden-sensorsapplication in a separate terminal window. From thecomprehensive-review/garden-sensorsdirectory, run the./mvnw package quarkus:devcommand. The command makes the application available athttp://localhost:8080.Note that the
garden-sensorsservice also includes an additional producer to replicate measurements to thegarden-sensor-measurements-repltopic, without using Avro. This is necessary for thegarden-streamsservice, which does not use Avro to deserialize sensor measurements.Navigate to the
garden-sensorsapplication directory.(.venv) [user@host comprehensive-review]$
cd garden-sensorsOpen the
src/main/resources/application.propertiesfile and replaceYOUR_SERVICE_REGISTRY_URLwith your registry service URL that is provided in the start script output.In the same file, add the required configuration for an outgoing channel named
garden-sensor-measurements-out. Use thesmallrye-kafkaconnector, and setgarden-sensor-measurementsas the channel topic. The configuration should look like the following....configuration omitted... # TODO: configure an outgoing channel named "garden-sensor-measurements-out" for "garden-sensor-measurements" Kafka topic
mp.messaging.outgoing.garden-sensor-measurements-out.apicurio.registry.auto-register = true mp.messaging.outgoing.garden-sensor-measurements-out.connector = smallrye-kafka mp.messaging.outgoing.garden-sensor-measurements-out.topic = garden-sensor-measurements...configuration omitted...Generate the
com.redhat.training.gardens.event.SensorMeasurementTakenandcom.redhat.training.gardens.event.SensorMeasurementTypeclasses from the provided Avro schema. Run the following command to generate the classes.(.venv) [user@host garden-sensors]$
./mvnw generate-resources...output omitted... [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS ...output omitted...Open the
com.redhat.training.gardens.service.SensorMeasurementServiceclass. Add a method that producesSensorMeasurementTakenevent data to thegarden-sensor-measurements-outchannel. Use thegenerateEventmethod to create aSensorMeasurementTakenevent from sensor data.// TODO: Implement the Kafka producer
@Outgoing("garden-sensor-measurements-out") @Broadcast public Multi<Record<Integer, SensorMeasurementTaken>> measure() { return Multi.createFrom().ticks().every(Duration.ofMillis(5000)) .onOverflow().drop() .map(tick -> { SensorMeasurementTaken event = generateEvent( sensorService.getSensor() ); LOGGER.info("Sensor measurement taken: " + event); return Record.of(event.getSensorId(), event); }); }Run the application. You should see the
garden-sensorsapplication logging the output of the sensor measurement event data and its replication to another topic.[user@host garden-sensors]$
./mvnw package quarkus:dev...output omitted... 2021-10-19 13:56:20,022 INFO [com.red.tra.gar.ser.SensorMeasurementService] (executor-thread-0) Sensor measurement taken: {"sensorId": 1, "value": 0.061971520044841855, "timestamp": 1634640980022, "type": "HUMIDITY"} 2021-10-19 13:56:20,563 INFO [com.red.tra.gar.ser.SensorMeasurementReplicatorService] (vert.x-eventloop-thread-0) Sensor measurement event replicated: {"sensorId": 1, "value": 0.061971520044841855, "timestamp": 1634640980022, "type": "HUMIDITY"} 2021-10-19 13:56:25,014 INFO [com.red.tra.gar.ser.SensorMeasurementService] (executor-thread-0) Sensor measurement taken: {"sensorId": 1, "value": 0.4865613400995872, "timestamp": 1634640985014, "type": "HUMIDITY"} 2021-10-19 13:56:25,177 INFO [com.red.tra.gar.ser.SensorMeasurementReplicatorService] (vert.x-eventloop-thread-0) Sensor measurement event replicated: {"sensorId": 1, "value": 0.4865613400995872, "timestamp": 1634640985014, "type": "HUMIDITY"} ...output omitted...Leave this terminal window open.
Create two consumers in the
garden-backapplication. Thegarden-backapplication must consume raw sensor measurements produced by thegarden-sensors, and enriched sensor measurements produced bygarden-streamsservices.The consumer implementations must have the following configurations.
Consumed Kafka topic Incoming channel name Outgoing in-memory channel name Deserialization garden-sensor-measurementsgarden-sensor-measurements-rawin-memory-garden-sensor-measurements-rawAutomatic (Avro) garden-sensor-measurements-enrichedgarden-sensor-measurements-enrichedin-memory-garden-sensor-measurements-enrichedCustom For the custom deserialization, you must use the
com.redhat.training.gardens.serde.SensorMeasurementEnrichedDeserializerdeserializer class.The
garden-backservice uses enriched sensor measurements for exposing aggregated data in the front-end service. Thegarden-streamsservice produces the enriched sensor measurements.The
garden-backservice also exposes the raw sensor measurements for debugging purposes. Thegarden-sensorsservice produces the raw sensor measurements.Before running the application, you must generate the
com.redhat.training.gardens.event.SensorMeasurementTakenandcom.redhat.training.gardens.event.SensorMeasurementTypeclasses from the provided Avro schema. From thecomprehensive-review/garden-backdirectory, run the./mvnw generate-resourcescommand to generate the relevant classes.After implementing the consumers, start the
garden-backservice in a separate terminal window. From thecomprehensive-review/garden-backdirectory, run the./mvnw package quarkus:devcommand. The command makes the application available athttp://localhost:8081.In a new terminal window, navigate to the
garden-backapplication directory.[user@host AD482]$
cd comprehensive-review/garden-backIn the
src/main/resources/application.propertiesfile, add the required configuration for an incoming channel namedgarden-sensor-measurements-enriched. Use thesmallrye-kafkaconnector, and setgarden-sensor-measurements-enrichedas the channel topic. The configuration should look like the following.# TODO: configure an incoming channel named "garden-sensor-measurements-enriched"
mp.messaging.incoming.garden-sensor-measurements-enriched.connector = smallrye-kafka mp.messaging.incoming.garden-sensor-measurements-enriched.topic = garden-sensor-measurements-enriched mp.messaging.incoming.garden-sensor-measurements-enriched.value.deserializer = com.redhat.training.gardens.serde.SensorMeasurementEnrichedDeserializerIn the same file, replace
YOUR_SERVICE_REGISTRY_URLwith your registry service URL.In the same file, add the required configuration for an incoming channel named
garden-sensor-measurements-raw, and setgarden-sensor-measurementsas the channel topic. The configuration should look like the following.# TODO: configure an incoming channel named "garden-sensor-measurements-raw"
mp.messaging.incoming.garden-sensor-measurements-raw.connector = smallrye-kafka mp.messaging.incoming.garden-sensor-measurements-raw.topic = garden-sensor-measurements mp.messaging.incoming.garden-sensor-measurements-raw.enable.auto.commit = false mp.messaging.incoming.garden-sensor-measurements-raw.auto.offset.reset = earliest mp.messaging.incoming.garden-sensor-measurements-raw.apicurio.registry.use-specific-avro-reader = trueGenerate the
com.redhat.training.gardens.event.SensorMeasurementTakenandcom.redhat.training.gardens.event.SensorMeasurementTypeclasses from the provided Avro schema. Run the following command to generate the classes.(.venv) [user@host garden-back]$
./mvnw generate-resources...output omitted... [INFO] --- quarkus-maven-plugin:2.1.4.Final:generate-code (default) @ garden-back --- [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS ...output omitted...Open the
com.redhat.training.gardens.rest.SensorResourceclass, and add a method that consumesSensorMeasurementEnricheddata from thegarden-sensor-measurements-enrichedchannel. This method must use an in-memory outgoing channel calledin-memory-garden-sensor-measurements-enrichedto expose the data via a REST endpoint.// TODO: Implement a Kafka consumer that returns "SensorMeasurementEnriched" data. // Stream messages to an outgoing channel called "in-memory-garden-sensor-measurements-enriched"
@Incoming("garden-sensor-measurements-enriched") @Outgoing("in-memory-garden-sensor-measurements-enriched") @Broadcast public SensorMeasurementEnriched consumeEnrichedSensorMeasurements(SensorMeasurementEnriched event) { return event; }In the same class, add a method that consumes
SensorMeasurementTakenevent data from thegarden-sensor-measurements-rawchannel. This method must use an in-memory outgoing channel calledin-memory-garden-sensor-measurements-rawto expose the data via a REST endpoint. Use thecreateSensorMeasurementFromEventmethod to create aSensorMeasurementobject with the consumedSensorMeasurementTakenevent data.// TODO: Implement a Kafka consumer that returns "SensorMeasurement" data. // Stream messages to an outgoing channel called "in-memory-garden-sensor-measurements-raw"
@Incoming("garden-sensor-measurements-raw") @Outgoing("in-memory-garden-sensor-measurements-raw") @Broadcast public SensorMeasurement consumeRawSensorMeasurements(SensorMeasurementTaken event) throws JsonProcessingException { SensorMeasurement sensorMeasurement = createSensorMeasurementFromEvent(event); return sensorMeasurement; }Run the
garden-backservice.[user@host garden-back]$
./mvnw package quarkus:dev...output omitted... 2021-10-20 14:01:27,126 INFO [io.quarkus] (Quarkus Main Thread) garden-back ... started in 4.112s. Listening on: http://localhost:8081 ...output omitted...Open your web browser and navigate to
http://localhost:8081/sensor/measurements/raw. Verify that the web page shows the streaming raw sensor measurement data continuously....output omitted... data: {"sensorId":4,"type":"TEMPERATURE", "value":38.58951966283255,"timestamp":1634683887382} data: {"sensorId":3,"type":"WIND", "value":8.610418216859195,"timestamp":1634683892380} data: {"sensorId":3,"type":"WIND", "value":14.241427507143873,"timestamp":1634683897380} data: {"sensorId":3,"type":"WIND", "value":12.273055290240634,"timestamp":1634683902384} ...output omitted...
Important
Certain web browsers might try to download the stream as a file. In that case, verify that the file contains raw sensor measurements.
Alternatively, you can use a different browser or a tool such as
curl.Close the browser tab. Leave this terminal window open.
Start the front-end application in a separate terminal window.
From the
comprehensive-reviewdirectory, run thepython scripts/serve-frontend.pycommand.This script makes the front-end application available at
http://localhost:8083. Ensure the Python virtual environment is activated before running the Python script.Open a new terminal window, and ensure you are in your workspace directory. Activate the Python virtual environment, navigate to the
comprehensive-reviewdirectory, and run the front-end application.[user@host AD482]$
source .venv/bin/activate(.venv) [user@host AD482]$cd comprehensive-review(.venv) [user@host comprehensive-review]$python scripts/serve-frontend.py...output omitted...Leave this terminal window open.
Open your web browser and navigate to
http://localhost:8083. Verify that the dashboard lists no garden data.Leave the browser tab open.
In the
garden-streamsservice, read the sensor metadata and sensor measurements streams, and join them to create a stream of enriched sensor measurements.The application already implements the following features:
The Debezium source connector writes sensor metadata in the
garden-sensorstopic.The
garden-sensorsservice writes sensor measurements in thegarden-sensor-measurements-repltopic.
In the
com.redhat.training.gardens.GardenStreamsTopologyclass, you must implement the code to write a stream ofSensorMeasurementEnrichedrecords to thegarden-sensor-measurements-enrichedtopic. You must load the sensor metadata into a table, read the sensor measurements stream, and join them according to the following table:Stream Topic Key Value Sensors Stream garden-sensorsSensor ID ( Integer)Sensor metadata ( Sensor)Measurements Stream garden-sensor-measurements-replSensor ID ( Integer)Sensor measurement ( SensorMeasurement)Enriched Stream garden-sensor-measurements-enrichedSensor ID ( Integer)Sensor measurement + metadata ( SensorMeasurementEnriched)Finally, run the
garden-streamsapplication. You must skip tests, because the application is still not complete. Use the./mvnw package quarkus:dev -DskipTestscommand.In a new terminal window, navigate to the
garden-streamsservice directory.[user@host AD482]$
cd comprehensive-review/garden-streamsEdit the
com.redhat.training.gardens.GardenStreamsTopologyclass to load theSensorrecords into a global table.// TODO: Read sensors
GlobalKTable<Integer, Sensor> sensors = builder.globalTable( SENSORS_TOPIC, Consumed.with(Serdes.Integer(), sensorSerde));Read the
SensorMeasurementrecords as a stream.// TODO: Read sensor measurements
KStream<Integer, SensorMeasurement> sensorMeasurements = builder.stream( SENSOR_MEASUREMENTS_TOPIC, Consumed.with(Serdes.Integer(), sensorMeasurementSerde));Join the stream of sensor measurements with the sensor table. Use the
SensorMeasurementEnrichedclass to create joint records.// TODO: Join measurements with sensor table
KStream<Integer, SensorMeasurementEnriched> enrichedSensorMeasurements = sensorMeasurements .join( sensors, (sensorId, measurement) -> sensorId, (measurement, sensor) -> new SensorMeasurementEnriched( measurement, sensor));Send the enriched sensor measurements to the
garden-enriched-sensor-measurementstopic.// TODO: Send enriched measurements to topic
enrichedSensorMeasurements.to( ENRICHED_SENSOR_MEASUREMENTS_TOPIC, Produced.with(Serdes.Integer(), sensorMeasurementEnrichedSerde));Print the
enrichedSensorMeasurements. This helps you to debug whether the topology is receiving and joining data.// TODO: Send enriched measurements to topic enrichedSensorMeasurements.to( ENRICHED_SENSOR_MEASUREMENTS_TOPIC, Produced.with(Serdes.Integer(), sensorMeasurementEnrichedSerde));enrichedSensorMeasurements.print(Printed.toSysOut());Run the
garden-streamsapplication. Note that you must use the-DskipTestsparameter to skip the execution of unit tests, because the application is not complete yet.[user@host garden-streams]$
./mvnw package quarkus:dev -DskipTests...output omitted... [KSTREAM-LEFTJOIN-0000000005]: 4, com.redhat...SensorMeasurementEnriched@7627d833 [KSTREAM-LEFTJOIN-0000000005]: 1, com.redhat...SensorMeasurementEnriched@62bd28b4Verify that the output displays
SensorMeasurementEnrichedrecords.Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.
Note
The Quarkus hot-reload feature might not work well if your editor automatically saves changes. In this case, you can stop the application and run the
./mvnw quarkus:devcommand again.Return to your browser tab and refresh the page. Verify that the
Sensor Measurementstable displays enriched sensor measurement data.
In the
garden-streamsservice, compute garden events based on sensor measurements to display them in the front end.In the
GardenStreamsTopologyclass, implement the necessary transformations to split the enriched sensor measurements stream by type. Next, create three new streams of events by applying conditions to measurement values, as specified in the following table:SensorMeasurementType Condition (Key, Event) Topic TEMPERATURE value < 5.0 (sensorId, LowTemperatureDetected) garden-low-temperature-events HUMIDITY value < 0.2 (sensorId, DryConditionsDetected) garden-low-humidity-events WIND value > 10 (sensorId, StrongWindDetected) garden-strong-wind-events Important
If you experience problems after restarting the
garden-streamsservice, then you might want to delete the Kafka Streams states directory. In Linux and Mac, this directory is located at/tmp/kafka-streams.Create the
GardenStreamsTopology#processTemperatureprivate method to produceLowTemperatureDetectedevents.// TODO: implement temperature processor
private void processTemperature(KStream<Integer, SensorMeasurementEnriched> temperatureMeasurements) { temperatureMeasurements .filter((sensorId, measurement) -> measurement.value < LOW_TEMPERATURE_THRESHOLD_CELSIUS) .mapValues((measurement) -> new LowTemperatureDetected(measurement.gardenName, measurement.sensorId, measurement.value, measurement.timestamp)) .to(LOW_TEMPERATURE_EVENTS_TOPIC, Produced.with(Serdes.Integer(), lowTemperatureEventSerde)); }Create the
GardenStreamsTopology#proccessHumidityprivate method to produceLowHumidityDetectedevents.// TODO: implement humidity processor
private void processHumidity(KStream<Integer, SensorMeasurementEnriched> humidityMeasurements) { humidityMeasurements .filter((sensorId, measurement) -> measurement.value < LOW_HUMIDITY_THRESHOLD_PERCENT) .mapValues((measurement) -> new LowHumidityDetected(measurement.gardenName, measurement.sensorId, measurement.value, measurement.timestamp)) .to(LOW_HUMIDITY_EVENTS_TOPIC, Produced.with(Serdes.Integer(), lowHumidityEventSerde)); }Create the
GardenStreamsTopology#processWindprivate method to produceStrongWindDetectedevents.// TODO: implement wind processor
private void processWind(KStream<Integer, SensorMeasurementEnriched> windMeasurements) { windMeasurements .filter((sensorId, measurement) -> measurement.value > STRONG_WIND_THRESHOLD_MS) .mapValues((measurement) -> new StrongWindDetected(measurement.gardenName, measurement.sensorId, measurement.value, measurement.timestamp)) .to(STRONG_WIND_EVENTS_TOPIC, Produced.with(Serdes.Integer(), strongWindEventSerde)); }Split the enriched sensor measurements stream by type. The type is one of the values of the
SensorMeasurementTypeenum.// TODO: split stream
enrichedSensorMeasurements .split() .branch((sensorId, measurement) -> measurement.type.equals(SensorMeasurementType.TEMPERATURE), Branched.withConsumer(this::processTemperature)) .branch((sensorId, measurement) -> measurement.type.equals(SensorMeasurementType.HUMIDITY), Branched.withConsumer(this::processHumidity)) .branch((sensorId, measurement) -> measurement.type.equals(SensorMeasurementType.WIND), Branched.withConsumer(this::processWind));Wait until Quarkus restarts the application and the Kafka Streams engine switches to the RUNNING state. Alternatively, you can terminate the command and rerun the application.
2021-09-14 14:39:51,336 INFO [...]
State transition from REBALANCING to RUNNINGReturn to your browser tab and refresh the page. Verify that the dashboard displays events in the
Garden Eventstable.
In the
garden-streamsservice, compute the last-minute status of each garden to display the values in the front end.The application already implements the following features:
The
garden-backapplication exposes the/garden/statusesendpoint, which reads thegarden-status-eventstopic and exposes a stream of server-sent events. Record values in thegarden-status-eventstopic must be instances ofGardenStatus. Each one of these objects contains the garden name, the last reported measurements and the trend.The
garden-frontapplication opens a connection to the/garden/statusesendpoint and displays the received values.
In the
GardenStreamsTopologyclass, implement the necessary transformations to write a stream ofGardenStatusobjects to thegarden-status-eventsKafka topic. To achieve this, perform the following steps:Group the enriched sensor measurements stream by garden name.
Add a time window of one minute.
Use the
GardenStatusclass to aggregate the data. Add records with theGardenStatus#updateWithmethod.Transform the generated table of
GardenStatusrecords to a stream.Map the stream of
(windowedGardenName, gardenStatus)records into another stream of(null, gardenStatus)records.Send the stream to the
garden-status-eventstopic.
In the
GardenStreamsTopologyclass of thegarden-streamsservice, group theenrichedSensorMeasurementsby garden name, apply a window of one minute, and aggregate by using aGardenStatusobject as the accumulator.// Aggregate enriched measurements
enrichedSensorMeasurements .groupBy( (sensorId, measurement) -> measurement.gardenName, Grouped.with(Serdes.String(), sensorMeasurementEnrichedSerde) ) .windowedBy( TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1)) ) .aggregate( GardenStatus::new, (gardenName, measurement, gardenStatus) -> gardenStatus.updateWith(measurement), Materialized .<String, GardenStatus, WindowStore<Bytes, byte[]>>as( "garden-status-store" ) .withKeySerde(Serdes.String()) .withValueSerde(gardenStatusSerde))Transform the resulting table into a stream. Map the stream of windowed records into another stream of
(null, GardenStatus)records. Finally, write the resulting stream to thegarden-status-eventstopic..aggregate( GardenStatus::new, (gardenName, measurement, gardenStatus) -> gardenStatus.updateWith(measurement), Materialized .<String, GardenStatus, WindowStore<Bytes, byte[]>>as( "garden-status-store" ) .withKeySerde(Serdes.String()) .withValueSerde(gardenStatusSerde)).toStream() .map((windowedGardenName, gardenStatus) -> new KeyValue<Void, GardenStatus>( null, gardenStatus)) .to( GARDEN_STATUS_EVENTS_TOPIC, Produced.with(Serdes.Void(), gardenStatusSerde));Wait for Quarkus to restart the service or restart the application manually. You might have to wait a few seconds until the Kafka Streams engine reaches the RUNNING state after the restart.
Refresh the front-end page. Verify that the page displays garden status.
The front end should display the status of four gardens. Each garden shows only one measurement type because just one sensor per garden is registered in the application.
This concludes the lab.