Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will use the Kafka Streams DSL for stream processing. You will apply different transformations to a stream of data, processing data records, generating transformations between KStreams and KTables, and writing the results back to Kafka.
The example application for this exercise reads a stream of position messages sent by a set of vehicles, which continuously report their position to the vehicle-positions topic.
Each position sent to the vehicle-positions topic is a JSON containing vehicle ID, latitude, longitude, and elevation values.
You will find the solution files for this exercise in the AD482-apps repository, within the streams-data/solutions directory.
Outcomes
You should be able to use KTables, KStreams and serdes to process data streams.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment, and use the lab command to start the scenario for this exercise.
[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$lab start streams-data...output omitted...
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Make sure you copy the Bootstrap Server and Bootstrap Port values from the lab command output.
You will use these values to configure Kafka Streams.
The lab command copies the exercise files, from the AD482-apps/streams-data/apps/vehicles directory, which is in your local Git repository, into the streams-data directory, which is in your workspace.
Procedure 3.2. Instructions
Configure your Kafka Streams application.
Change to the
streams-datadirectory.(.venv) [user@host AD482]$
cd streams-data(.venv) [user@host streams-data]$This directory contains the Quarkus application for this exercise.
By using your editor of choice, open the
src/main/resources/application.propertiesfile, and configure the Kafka Streams properties as follows:# TODO: add configuration values
quarkus.kafka-streams.bootstrap-servers =YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORTquarkus.kafka-streams.topics = vehicle-positions,vehicle-feet-elevationsNote that the rest of Kafka-specific configuration values are in the
.envfile, which has been copied from your workspace by thelabcommand.
Implement the code to read a stream of vehicle positions from the
vehicle-positionstopic. Each record in thevehicle-positionstopic consists of a null key and a JSON value.Inspect the source code of the
VehiclePositionclass. This class defines the format and structure of the messages that you receive from thevehicle-positionstopic.Edit the
VehiclePositionsStreamclass. Add the code to create a SerDe (Serializer/Deserializer) forVehiclePositionobjects.public Topology buildTopology() { StreamsBuilder builder = new StreamsBuilder(); // TODO: create serde to deserialize VehiclePosition messagesObjectMapperSerde<VehiclePosition> vehiclePositionSerde = new ObjectMapperSerde<>( VehiclePosition.class );...output omitted... }The
ObjectMapperSerdeclass serializes JSON messages using the Jackson library. Notice that the constructor parameter specifies the class to deserialize into, which isVehiclePositionin this case.Add the code to create a
KStreamfrom thevehicle-positionstopic.public Topology buildTopology() { ...output omitted... // TODO: Create the stream from the "vehicle-positions" topicKStream<String, VehiclePosition> stream = builder.stream( "vehicle-positions", Consumed.with(stringSerde, vehiclePositionSerde) );...output omitted... }Note the use of a string SerDe to deserialize null keys and the
ObjectMapperSerdeinstance to deserializeVehiclePositionvalues.Add the code to print each incoming vehicle position.
public Topology buildTopology() { ...output omitted... // TODO: print stream valuesstream.foreach((key, value) -> System.out.println("Received vehicle position: " + value) );...output omitted... }Switch back to the terminal and run the application.
(.venv) [user@host streams-data]$
./mvnw clean package quarkus:dev...output omitted... 2021-07-16 15:46:59,182 INFO [org.apa.kaf.cli.con.int.ConsumerCoordinator] ...output omitted... Setting offset for partition vehicle-positions-0 to the comitted offset ...output omitted...Leave this terminal window open. The Quarkus dev mode hot-reloads the application if you make changes to the source code.
Note
The Quarkus hot-reload feature might not work well if your editor auto saves changes. In this case, you can stop the application and run the
./mvnw clean package quarkus:devcommand again.Open a new terminal window, make sure you are in your workspace directory, activate the Python virtual environment, and run the
python streams-data/scripts/produce_vehicle_positions.py[user@host AD482]$
source .venv/bin/activate(.venv) [user@host AD482]$python streams-data/scripts/produce_vehicle_positions.py...output omitted... - Generated vehicle position to 'vehicle-positions' {'elevation': 0.03501904353249241, 'latitude': 41.2265693535002, 'longitude': 4.398626580777269, 'vehicleId': 5} - Generated vehicle position to 'vehicle-positions' {'elevation': 0.2595584138525602, 'latitude': 39.816763660245876, 'longitude': 3.9794423934152765, 'vehicleId': 9}The script produces random records to the
vehicle-positionstopic.Leave this terminal window open.
Use the
maptransformation to create a newKStreaminstance. Map the stream of vehicle positions, in which each record includes elevation in meters, to a new stream of elevations in feet. Send the new stream to thevehicle-positions-feettopic.Edit the
VehiclePositionsStreamclass. Add the code to map the original stream to a stream of elevations in feetpublic Topology buildTopology() { ...output omitted... // TODO: map positions to elevations in feet // and send the stream to "vehicle-feet-elevations" topicstream .map((key, value) -> { Double feet = value.elevation * 3.28084; return KeyValue.pair(value.vehicleId, feet); })...output omitted... }The
mapfunction passes each record of the original stream to the provided lambda expression, and writes the new record to a newKStreaminstance. In this case, the records of the new stream use the vehicle ID as the key, and the elevation in feet as the value.Add the code to write the new stream to the
vehicle-feet-elevationstopic.Add the code to map the original stream to a stream of elevations in feet.
public Topology buildTopology() { ...output omitted... // TODO: map positions to elevations in feet // and send the stream to "vehicle-feet-elevations" topic stream .map((key, value) -> { Double feet = value.elevation * 3.28084; return KeyValue.pair(value.vehicleId, feet); }).to( "vehicle-feet-elevations", Produced.with(Serdes.Integer(), Serdes.Double()) );...output omitted... }Note how the application produces integer keys and double values for each record sent to the
vehicle-feet-elevationstopic.After your changes, the Quarkus application should hot-reload automatically.
Note
Alternatively, you can stop the application and run the
./mvnw clean package quarkus:devcommand again.
Consume the messages from the
vehicle-feet-elevationstopic and verify that the messages include the vehicle ID and the elevation in feet.In a new terminal window, run the
oc execcommand to execute the built-inkafka-console-consumer.shfrom inside thekafkacontainer in the cluster.[user@host ~]$
oc exec \-it my-cluster-kafka-0 \-c kafka -nRHT_OCP4_DEV_USER-kafka-cluster \-- bin/kafka-console-consumer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic vehicle-feet-elevations \--key-deserializer org.apache.kafka.common.serialization.IntegerDeserializer \--value-deserializer org.apache.kafka.common.serialization.DoubleDeserializer \--property print.key=true --property key.separator=" - "...output omitted... 7 - 2.411428440449955 9 - 2.8870592230148593 0 - 9.113734181610747Replace
RHT_OCP4_DEV_USERwith your OpenShift username. You can get this value from the tab in the ROL web interface.You should see the records in the
vehicleId - elevationInFeetformat.Terminate the script and close this terminal window.
Count the number of reported positions by vehicle.
Transform the
KStreamobject into aKGroupedStreamobject, by using thegroupBymethod. Next, use theKGroupedStream#countmethod to create aKTablecontaining the number of positions by vehicle.Edit the
VehiclePositionsStreamclass. Add the code to group positions by vehicle ID.public Topology buildTopology() { ...output omitted... // TODO: group positions by vehicleIdKGroupedStream<Integer, VehiclePosition> positionsByVehicle = stream .groupBy( (key, value) -> value.vehicleId, Grouped.with(Serdes.Integer(), vehiclePositionSerde) );...output omitted... }Notice the first parameter of the
groupBycall. This lambda expression returns the key to group by. In this case, the key isvehicleId.The second parameter specifies the serdes required after the data is grouped. The
Integerserde is required as the key serde, because the vehicle ID is an integer.Add the code to count how many positions each vehicle has reported.
public Topology buildTopology() { ...output omitted... KGroupedStream<Integer, VehiclePosition> positionsByVehicle = stream .groupBy( (key, value) -> value.vehicleId, Grouped.with(Serdes.Integer(),vehiclePositionSerde) ); // TODO: count positions by vehicleKTable<Integer, Long> countsByVehicle = positionsByVehicle.count();...output omitted... }Note how the
countoperation generates a newKTable.Add the code to print the count values.
To print the values of the
countsByVehicletable, first convert it to aKStream, and then use theforeachmethod of the stream to print each record.public Topology buildTopology() { ...output omitted... // TODO: print the count valuescountsByVehicle .toStream() .foreach((vehicleId, count) -> System.out.println( "Vehicle: " + vehicleId + " Positions count: " + count + "\n" ) );...output omitted... }After the application hot-reloads, verify that the application displays the counts.
...output omitted... Vehicle: 7 Positions count: 17 Vehicle: 1 Positions count: 10 ...output omitted...
Note
You might have to wait up to 30 seconds to see the first count message.
This is because of Kafka Streams record caches and memory management policies. Kafka Streams uses the record cache to compact multiple records together before writing them to the internal store of a
KTable.Refer to Kafka Streams Memory Management for more information.
Stop the Quarkus application and the
produce_vehicle_positions.pyscript.
This concludes the guided exercise.