Bookmark this page

Chapter 6. Troubleshooting AMQ Streams Applications

Abstract

Goal Handle common problems in Kafka and AMQ Streams applications.
Objectives
  • Handle out-of-order or late events.

  • Configure producer retries and idempotence.

  • Prevent duplication and data loss.

  • Implement test cases in event-driven applications.

Sections
  • Maintaining Message Ordering (and Guided Exercise)

  • Configuring Producer Retries and Idempotence (and Guided Exercise)

  • Preventing Duplication and Data Loss (and Guided Exercise)

  • Implementing Test Cases in Event-driven Applications (and Guided Exercise)

Lab

Troubleshooting AMQ Streams Applications

Maintaining Message Ordering

Objectives

After completing this section, you should be able to handle out-of-order or late events.

Describing Ordering in Distributed Applications

In non-distributed applications, the application sets and enforces the execution order. For example, when a user calls a REST endpoint, a synchronous function that implements the endpoint waits for the results of each line before continuing with the execution. In distributed, event-driven applications, events are produced and consumed asynchronously. Event ordering refers to the order in which consumers receive events. In many business domains, the order of the events is critical.

For example, consider the possible result of receiving the following events:

Table 6.1. Changing Event Order Can Change the State of a System

First eventSecond eventResult
Acme stock price: 7$Acme stock price: 5$Acme stock trending down
Acme stock price: 5$Acme stock price: 7$Acme stock trending up
Delete user Acme Create user Acme User Acme exists in the system
Create user Acme Delete user Acme User Acme is absent from the system
Deposit $100 to account Acme Withdraw $50 from account Acme Withdrawing permitted
Withdraw $50 from account Acme Deposit $100 to account Acme Withdrawing forbidden

In the preceding example, incorrect order can cause faulty logic, such as deleting a nonexistent user.

Defining Apache Kafka Polling Behavior

Apache Kafka does not guarantee record order for topics. Kafka preserves the record order only in a topic partition. A Kafka consumer polls from all partitions at the same time. Depending on a number of factors, such as network connectivity or broker utilization, a Kafka consumer receives responses with records from multiple partitions in a non-deterministic order.

Recall that when a Kafka topic contains one partition only, a consumer receives records in order. This is because with one partition, a consumer polls only one Kafka broker. Therefore, a consumer receives the records in the order preserved within a partition.

For more information about one-partition Kafka topics, see the section called “Creating Topics”.

Defining Event Flows

Event order tends to be crucial for event flows. An event flow is a logical group of events that model a business process. For example, the following series of events creates a mortgage pre-approval event flow:

Mortgage application pre-approval flow

  • CustomerAppliedForMortgage

  • CalculatedMonthlyPayment

  • CalculatedEligibility

  • DeterminedPreApproval

In the preceding flow, each event might depend on the result of previous events. For example, determining if a customer is eligible for a mortgage can depend on calculated monthly payments. Pre-approval can then depend on information from all of the preceding events. If a consumer receives the CalculatedEligibility event but no other events from the flow, then it cannot determine the next action.

Note

You can manage distributed transactions, for example, by implementing the Saga pattern. Managing distributed transactions is out of the scope of this course. See the references section for more information about distributed transactions.

Event flows can span multiple services. Each event in the flow might be produced by a different microservice in your application. If you can identify event flows within your application, then consider creating topics per event flow. This enables the partial ordering of events emitted by multiple producers within a topic.

Guaranteeing Order with Kafka

You can enforce event order in the following ways depending on the requirements for your use case:

Total order

All consumers receive all of the events in the order that producers emitted them.

Partial order

Related events, such as events from an event flow, are ordered. However, events that do not have a relationship, are not ordered.

If your application requires ordering, then consider if you need total order. You can achieve partial order by assigning events related keys.

For example, consider a stock market ticker application. Each event in the application contains a unique stock ticker symbol and current value, such as ACME, 15. If you use the stock ticker symbol as an event key, then events related to the same stock arrive to the same partition. Consequently, when consuming the events, one consumer receives ticks from the same stock.

The preceding example, however, does not maintain total order due to the polling behavior. A consumer receives responses with records from multiple partitions in a non-deterministic order. Consequently, a Kafka application might process the ACME ticks before the AACME ticks. However, every event that contains the ACME key is sent to the same partition. Consequently, because a partition that contains events with the ACME key is ordered, a consumer node receives all ACME records in order.

Describing Event Time

Distributed systems use event time to determine the order in which the events occur. You can use event time to implement processing logic even when your application receives an event late, or out of order. All Kafka records have a timestamp.

Recall that you can use the following timestamp types:

  • Event time: time when the event happened.

  • Processing time: time when a Kafka application processed the event.

  • Ingestion time: time when a Kafka broker received the event.

In a Kafka producer application, you can specify a timestamp when instantiating the record.

In Kafka Streams, you can use a TimestampExtractor implementation to extract the timestamp from a record and embed it into a stream.

You can use the following timestamp extractors:

FailOnInvalidTimestamp (default)

Use the record's timestamp, and throw an exception if a record contains an invalid (negative) timestamp.

LogAndSkipOnInvalidTimestamp

Log records with invalid timestamps and skip the records to continue the stream.

UsePartitionTimeOnInvalidTimestamp

Use the record's timestamp. If the record does not have a valid timestamp, then the extractor attempts to use a timestamp from a previous record of the same topic partition.

WallclockTimestampExtractor

Returns the current time in milliseconds from the system clock. This extractor discards the record timestamp and uses the processing time.

Configure the time extractor on the StreamsBuilder object, for example:

import org.apache.kafka.streams.processor.UsePartitionTimeOnInvalidTimestamp;

// class omitted

new StreamsBuilder().stream("input-topic",
  Consumed.with(Serdes.String(), Serdes.String())
    .withTimestampExtractor(new UsePartitionTimeOnInvalidTimestamp()));

Describing Windowing

Kafka enables you to group records based on the record key. Windowing enables you to further split the key-based groups based on time intervals.

Kafka Streams enables you to define the following windows:

Hopping time windows

Hopping windows create overlapping windows of a fixed time interval advanced by hops, or advance intervals.

For example, you can define 30-minute windows that advance every minute. This selects all events with the same key in 30-minute intervals where the lower bound is inclusive and the upper bound is exclusive.

If the window starts from midnight (00:00), then you create time intervals, such as:

  • Window 1: [00:00, 00:30)

  • Window 2: [00:01, 00:31)

  • Window 3: [00:02, 00:32)

    And so on. Note that the first window overlaps the second window on the time interval of 1 minute after midnight to 30 minutes after midnight (excluding), that is [00:01, 00:30). This means one event can belong to multiple windows.

    Hopping time windows are useful for reacting to the number of events within a specified time frame. This enables you, for example, to monitor the number of cars passing a sensor within a specified interval to detect a possible traffic jam.

    Use the TimeWindows class to create a hopping time window, for example:

    import java.time.Duration;
    import org.apache.kafka.streams.kstream.TimeWindows;
    
    TimeWindows.of(Duration.ofMinutes(30))
      .advanceBy(Duration.ofMinutes(1));
Tumbling windows

Tumbling windows create non-overlapping windows of a fixed-time interval. Tumbling windows are a special case of hopping windows, where the interval and advance time are identical. Similarly to hopping windows, the lower bound is inclusive and the upper bound is exclusive

For example, if you define a 30-minute hopping window that advances every 30 minutes, then you create tumbling windows:

  • Window 1: [00:00, 00:30)

  • Window 2: [00:30, 01:00)

  • Window 3: [01:00, 01:30)

    And so on. Because tumbling windows are mutually exclusive, an event always belongs to only one window.

    Tumbling windows are useful when you want to execute logic based on all events within a specified interval. This enables you, for example, to infer the most and least frequented train stations during specified times.

    Use the TimeWindows class without specifying the advance interval to create a tumbling window, for example:

    import java.time.Duration;
    import org.apache.kafka.streams.kstream.TimeWindows;
    
    TimeWindows.of(Duration.ofMinutes(30))
      .grace(Duration.ofMinutes(1));
Sliding windows

Sliding windows create overlapping windows of a fixed-time interval. A sliding window models a window that slides over the time axis and includes events with the same key if their event timestamp difference is lower than the window size.

Because events are included in the window based on their timestamp differences, you must configure a grace period. A grace period configures wait time for late events. Grace period defaults to 24 hours.

Sliding windows are similar to hopping time windows with a small advance time configuration, such as 1ms. Sliding windows are useful for continuously reacting to the number of events within a specified timeframe. This enables you, for example, to detect suspicious behavior, like login events from geographically distant places within several seconds.

For example, consider the following events:

  • Event 1 (E1) timestamp: 00:00

  • Event 2 (E2) timestamp: 00:05

  • Event 3 (E3) timestamp: 00:06

    Considering a duration of 3 minutes and a grace period of 1 minute, Kafka Streams matches the following windows:

  • Window 1: [E1]

  • Window 2: [E2]

  • Window 3: [E2, E3]

    Use the SlidingWindows class to create a sliding window, for example:

    import java.time.Duration;
    import org.apache.kafka.streams.kstream.SlidingWindows;
    
    var durationPeriod = Duration.ofMinutes(3);
    var gracePeriod = Duration.ofMinutes(1);
    SlidingWindows.withTimeDifferenceAndGrace(durationPeriod, gracePeriod);
Session windows

Session windows create non-overlapping windows of dynamic size. Session windows size is dynamic because session windows close after a period of inactivity that separates groups of events.

For example, if you create a session window with a duration of 3 minutes, then a new window is created 3 minutes after the last event. Consider the following events:

  • Event 1 (E1) timestamp: 00:00

  • Event 2 (E2) timestamp: 00:05

  • Event 3 (E3) timestamp: 00:06

    Kafka Streams matches the following windows:

  • Window 1: [E1]

  • Window 2: [E2, E3]

    Session windows are useful for session-based metric analysis, such as time spent on a website per website visit.

    Use the SessionWindows class to create a session window, for example:

    import java.time.Duration;
    import org.apache.kafka.streams.kstream.SessionWindows;
    
    SessionWindows.with(Duration.ofMinutes(3));

Use the Kafka Streams DSL to implement event windowing. The following is an example code snippet that uses a window:

import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;

// class and imports omitted

new StreamsBuilder().stream("input-topic",
  Consumed.with(Serdes.String(), Serdes.String())
  .groupByKey()
  .windowedBy( 1
    TimeWindows.of(Duration.ofSeconds(10)) 2
    .grace(Duration.ofSeconds(5))) 3
  .count() 4
  .suppress(
    Suppressed.untilWindowCloses(
      Suppressed.BufferConfig.unbounded())) 5
  .toStream() 6
  .print(Printed.toSysOut());

1

Specifying Windowing on a grouped stream.

2

Using a 10 second sized tumbling window.

3

Specifying a 5 second grace period.

4

Example processing action.

5

Suppress processing a window until the window is closed.

6

Example processing of the stream, such as printing it.

Handling Out-of-order Events

Out-of-order (OOO) or late events are events that arrive later than expected. For example, an event with timestamp 00:00 can arrive at 00:30 due to various issues, such as a network outage or a consumer issue.

In stateless applications, late or out of order events typically do not impact processing logic. In applications that use stateful operations, like joins or other aggregations, a late event can cause incorrect results.

You can implement several strategies for dealing with late events, such as:

Discard the event

Discarding OOO or late events is a suitable strategy for low-priority events where loss of events does not make a significant difference.

For example, when reading a temperature sensor value every second, a late event is probably no longer relevant.

Increase the grace period

You can gracefully handle OOO or late events by using windowing with a grace period. A grace period configures wait time for late or OOO events. This means that the overall application is more resilient to small network partitions or other intermittent issues.

Grace period can increase the time to process an event group.

Revision: ad482-1.8-cc2ae1c