Bookmark this page

Guided Exercise: Maintaining Message Ordering

Outcomes

In this exercise you will handle out-of-order or late events.

You should be able to process a stream of events and implement a strategy for handling out-of-order or late events. First you will discard late events, and next you will update the implementation to process the late events received in the grace period.

You will find the solution files for this exercise in the AD482-apps repository, within the troubleshooting-order/solutions directory. Notice that you might need to copy the .env-streams file available in your workspace directory.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • The oc command-line binary.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment:

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise:

(.venv) [user@host AD482]$ lab start troubleshooting-order

Procedure 6.1. Instructions

  1. Navigate to the troubleshooting-order directory.

    (.venv) [user@host AD482]$ cd troubleshooting-order
  2. Create a stream processor that implements a strategy to discard out-of-order or late events.

    Consume the PotentialCustomersWereDetected events sent to the potential-customers-detected topic. Create a tumbling window with a size of 10 seconds without a grace period. Count the records, and print the KTable to the standard output.

    1. Open the TumblingWindows class and examine the code.

    2. Implement the discard strategy on the onStart method.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Build the stream topology
          builder.stream(
                  POTENTIAL_CUSTOMERS_TOPIC,
                  Consumed.with(Serdes.String(), customersEventSerde)
          ).groupByKey() 1
          .windowedBy( 2
                  TimeWindows.of(Duration.ofSeconds(WINDOW_SIZE)) 3
                          .grace(Duration.ofSeconds(0)) 4
          ).count() 5
          .toStream()
          .print(Printed.toSysOut());
      
      ...output omitted...

      1

      Groups events by the same key

      2

      Creates a window

      3

      Creates a tumbling window with a specific size

      4

      Indicates the grace period of the window

      5

      Counts the number of events in the same window

    3. Return to the command line terminal, and start the application.

      (.venv) [user@host troubleshooting-order]$ ./mvnw clean package quarkus:dev
      ...output omitted...

      The stream processor listens for new events in the potential-customers-detected topic.

  3. Verify that the stream processor discards late events produced in the potential-customers-detected topic. In a new terminal window, run the provided produce_late_events.py Python script to write on-time and late events into this topic.

    The script assigns the event-time manually and is equal to the current timestamp. Every 10 events, the script sets the timestamp to be 11 seconds earlier than the current timestamp. That event is a late event.

    1. Open a new terminal window. Active your workspace directory and then move to the troubleshooting-order directory.

      [user@host ~]$ cd AD482
      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$ cd troubleshooting-order
      (.venv) [user@host troubleshooting-order]$
    2. Run the produce_late_events.py Python script. You must provide the topic and the number of events to produce. Produce 10 events.

      (.venv) [user@host troubleshooting-order]$ python scripts/produce_late_events.py \
       potential-customers-detected 10
      Sending 10 events to the 'potential-customers-detected' topic...
      Event sent - timestamp 1627552463862 (on time)
      ...output omitted...
      Event sent - timestamp 1627552462012 (late)

      The timestamp assigned to each event determines the window to which it belongs.

    3. Return to the Quarkus terminal, and verify that the stream processor consumed the events, and printed the Ktable. Notice that you might have to wait up to 30 seconds for the KTable output, and the output might be slightly different.

      ...output omitted...
      [KTABLE-TOSTREAM-0000000003]: [front-area@1627552460000/1627552470000], 6
      [KTABLE-TOSTREAM-0000000003]: [front-area@1627552470000/1627552480000], 3
      ...output omitted...

      The preceding output displays the number of events processed in each window. Each window specifies a range of timestamps, the lower interval bound is inclusive and the upper bound is exclusive. Verify that the sum of all windows is 9 because one of the events is late.

  4. Update the stream processor to process late events that arrive in a grace period of 12 seconds.

    1. Open the TumblingWindows class and update the grace period of the window.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Build the stream topology
          builder.stream(
                  POTENTIAL_CUSTOMERS_TOPIC,
                  Consumed.with(Serdes.String(), customersEventSerde)
          ).groupByKey()
          .windowedBy(
                  TimeWindows.of(Duration.ofSeconds(WINDOW_SIZE))
                          .grace(Duration.ofSeconds(12))
          ).count()
          .toStream()
          .print(Printed.toSysOut());
      
      ...output omitted...
    2. In the command line terminal, press s to restart the application.

    3. The stream processor is going to process again all the events stored in the potential-customers-detected topic.

      ...output omitted...
      [KTABLE-TOSTREAM-0000000003]: [front-area@1627552470000/1627552480000], 3
      [KTABLE-TOSTREAM-0000000003]: [front-area@1627552460000/1627552470000], 7
      ...output omitted...

      Verify that the sum of all windows is 10 because the window has a grace period that allows the late event to be processed.

    4. Stop the command, and close the terminal.

    5. Stop the application, close the late events producer terminal, and navigate to the workspace directory.

Finish

Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish troubleshooting-order

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c