Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
Outcomes
In this exercise you will handle out-of-order or late events.
You should be able to process a stream of events and implement a strategy for handling out-of-order or late events. First you will discard late events, and next you will update the implementation to process the late events received in the grace period.
You will find the solution files for this exercise in the AD482-apps repository, within the troubleshooting-order/solutions directory.
Notice that you might need to copy the .env-streams file available in your workspace directory.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
The
occommand-line binary.A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment:
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise:
(.venv) [user@host AD482]$ lab start troubleshooting-order
Procedure 6.1. Instructions
Navigate to the
troubleshooting-orderdirectory.(.venv) [user@host AD482]$
cd troubleshooting-orderCreate a stream processor that implements a strategy to discard out-of-order or late events.
Consume the
PotentialCustomersWereDetectedevents sent to thepotential-customers-detectedtopic. Create a tumbling window with a size of 10 seconds without a grace period. Count the records, and print theKTableto the standard output.Open the
TumblingWindowsclass and examine the code.Implement the discard strategy on the
onStartmethod.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Build the stream topologybuilder.stream( POTENTIAL_CUSTOMERS_TOPIC, Consumed.with(Serdes.String(), customersEventSerde) ).groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(WINDOW_SIZE)).grace(Duration.ofSeconds(0))).count().toStream() .print(Printed.toSysOut());...output omitted...Return to the command line terminal, and start the application.
(.venv) [user@host troubleshooting-order]$
./mvnw clean package quarkus:dev...output omitted...The stream processor listens for new events in the
potential-customers-detectedtopic.
Verify that the stream processor discards late events produced in the
potential-customers-detectedtopic. In a new terminal window, run the providedproduce_late_events.pyPython script to write on-time and late events into this topic.The script assigns the event-time manually and is equal to the current timestamp. Every 10 events, the script sets the timestamp to be 11 seconds earlier than the current timestamp. That event is a late event.
Open a new terminal window. Active your workspace directory and then move to the
troubleshooting-orderdirectory.[user@host ~]$
cd AD482[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$cd troubleshooting-order(.venv) [user@host troubleshooting-order]$Run the
produce_late_events.pyPython script. You must provide the topic and the number of events to produce. Produce 10 events.(.venv) [user@host troubleshooting-order]$
python scripts/produce_late_events.py \potential-customers-detected 10Sending 10 events to the 'potential-customers-detected' topic... Event sent - timestamp 1627552463862 (on time) ...output omitted... Event sent - timestamp 1627552462012 (late)The timestamp assigned to each event determines the window to which it belongs.
Return to the Quarkus terminal, and verify that the stream processor consumed the events, and printed the
Ktable. Notice that you might have to wait up to 30 seconds for theKTableoutput, and the output might be slightly different....output omitted... [KTABLE-TOSTREAM-0000000003]: [front-area@1627552460000/1627552470000], 6 [KTABLE-TOSTREAM-0000000003]: [front-area@1627552470000/1627552480000], 3 ...output omitted...
The preceding output displays the number of events processed in each window. Each window specifies a range of timestamps, the lower interval bound is inclusive and the upper bound is exclusive. Verify that the sum of all windows is 9 because one of the events is late.
Update the stream processor to process late events that arrive in a grace period of 12 seconds.
Open the
TumblingWindowsclass and update the grace period of the window.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Build the stream topology builder.stream( POTENTIAL_CUSTOMERS_TOPIC, Consumed.with(Serdes.String(), customersEventSerde) ).groupByKey() .windowedBy( TimeWindows.of(Duration.ofSeconds(WINDOW_SIZE)).grace(Duration.ofSeconds(12))).count() .toStream() .print(Printed.toSysOut()); ...output omitted...In the command line terminal, press to restart the application.
The stream processor is going to process again all the events stored in the
potential-customers-detectedtopic....output omitted... [KTABLE-TOSTREAM-0000000003]: [front-area@1627552470000/1627552480000], 3 [KTABLE-TOSTREAM-0000000003]: [front-area@1627552460000/1627552470000], 7 ...output omitted...
Verify that the sum of all windows is 10 because the window has a grace period that allows the late event to be processed.
Stop the command, and close the terminal.
Stop the application, close the late events producer terminal, and navigate to the workspace directory.
This concludes the guided exercise.