Bookmark this page

Preventing Duplication and Data Loss

Objectives

After completing this section, you should be able to prevent duplication and data loss.

Determining Duplication and Data Loss Scenarios

Message duplication and loss can corrupt your application's event history. These situations can occur in different scenarios, for example:

  • A user mistakenly performs the same action twice in the UI.

  • A communication error between the application and the Kafka broker.

  • An application crash while processing an event. If the event was processed but not marked as consumed, then the application might process the same event after a restart.

Handling Duplication in Application Requests

When users generate duplication at the boundaries of the application, such as user interfaces or APIs, the application receives duplicated commands, queries, or input parameters. For example, a UI error in the sign-up process of an application might cause the back-end to receive a duplicated CreateUser command.

Duplicate Requests in Transactional Applications

In ACID transactional applications, the database constraints normally prevent data inconsistencies.

Note

ACID is the acronym for atomicity, consistency, isolation, and durability, which are properties of database transactions.

For example, if a user tries to create an account with the same email twice, then the database prevents that from happening due to primary or unique key constraints. The database makes the second request fail.

Duplicate Requests in Event-driven Applications

In event-driven applications, it is common to use an eventual consistency approach because it allows you to improve your application's performance. An eventually consistent system is a system that might show different results for the same operations, but eventually provides a consistent result.

For example, consider a system where a user can sign up from a UI. The UI sends requests to a back end with two application instances. The application saves the information in a database where data is written to the primary instance and data is read from the replicas. The primary database instance eventually updates the replicas with the latest data written. Because the update of the replicas can take some time, it is possible that the sign-up application reads inconsistent information during an undefined period of time. However, the system is eventually consistent. The following diagram shows this behavior.

Figure 6.4: Sign-up application
  1. The user produces two identical UI requests. The first one is served by Instance 1 and the second one by Instance 2.

  2. The sign-up application checks if a user with the same email exists. If Request 1 is written into the database first, then Request 2 should be discarded. However, because it might take some time to sync the replicas, the validation does not work and Instance 2 reads an incorrect value.

  3. Eventually, replicas are updated, and two users with the same email are in the system.

In an ACID database, Request 2 would have been locked until Request 1 was replicated across all replicas. Locking means that the application has to wait and, therefore, it affects the performance of the system.

Kafka Streams provides tools that might help de-duplicate input events, such as filtering transformations and tables.

Handling Duplication Caused by Application Failures

Application internals might also generate duplication and data loss. A network delay, and application crash, or any other unexpected failure can break the event stream processing flow, and cause duplicate or lost events.

Consumer Offsets

Kafka consumers normally commit the offset of the messages that they consume to the Kafka broker, to keep track of what they have already processed.

By default, Kafka enables an autocommit feature for consumers. Autocommit runs every five seconds to commit the offset of the last message that the consumer has read, however, the offset is updated only when the poll method is executed. This means that although autocommit is configured to run every five seconds, if the poll method is executed every twenty seconds, then autocommit runs every five seconds committing the same offset until poll is executed again. The autocommit period is configurable.

The message delivery semantics can also be considered from the consumer side:

  • At most once: guarantees that the consumer might lose a record, but it is never duplicated.

  • At least once: guarantees that a record is never lost, but it might be duplicated.

  • Exactly once: guarantees that a record is never lost, and it is never duplicated.

However, autocommitting offsets can cause problems when the application fails.

Failure Scenarios

Consider a consumer using autocommit with the default period of five seconds. If the consumer takes too long processing a record and throws an exception after six seconds, although the consumer has thrown an error, the record has already been committed.

In the case of a failure, you might lose records or read duplicate records.

Reading duplicate records

Duplicate records occur when the offset committed is smaller than the real offset processed by the consumer. This is usually caused when a rebalance is triggered before the consumer commits its latest offset or when the application processes some records but is unable to commit the offset.

Consider a consumer with autocommit of 10 seconds. The polling operation occurs within the first five seconds, as shown in the following diagram.

Figure 6.5: Reading duplicate records timeline

However, a partition rebalance happens after the second polling. From that moment, Kafka assigns a new partition to the consumer, and therefore, the consumer is unable to send a commit request. Although five records were processed before the rebalance, the consumer did not commit the offset. The new consumer that is assigned the partition reads records from the last committed offset. Therefore, the five records are read again, because the offset for the partition is smaller than the real offset that the consumer processed.

Missing records

Missing records can occur when the committed offset is larger than the real offset processed by the consumer. It is usually caused when the offset is committed but some records could not be processed in the consumer application.

Consider a consumer with autocommit of 5 seconds. The polling operation happens within those five seconds, as shown in the following diagram.

Figure 6.6: Missing records timeline

The application starts processing the records. Then, at time=5s during the second poll, autocommit requests to commit the last record offset, which is 5. Meanwhile, the application keeps processing the records. A few seconds after, the application crashes before it finishes processing all the records. Because the offset is already committed, no consumer can read those missing records again, and therefore they are lost.

The processing logic of your application can also lead to missing records. For example, consider that you make an asynchronous request to an external service after executing the poll method. Because the request is asynchronous, it is possible that the consumer commits the offsets of the records before you get a response from the external service. If eventually, the response is an error, then the records are already committed.

Depending on the business logic of your application, there are several ways in which you can achieve an exactly once message delivery semantic to avoid data duplicate and data loss.

Solution 1. Tuning Kafka Consumer Configuration

In the consumer side, autocommit might cause both data duplication and loss. To avoid it, you can disable the autocommit feature by setting the enable.auto.commit property to false. If you want to keep it, then you can control how often it is triggered by setting the auto.commit.interval.ms property.

If you disable autocommitting, then you must manually send the offset of the consumer by using the commitSync and commitAsync methods in the KafkaConsumer class.

The commitSync method blocks the current thread until the Kafka broker returns a response. This results in higher latency because your application waits for a success or an error response, and it retries sending the commit in the case of a transient error.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100); 1
    for (ConsumerRecord<String, String> record : records) {
        // Record processing
    }

    consumer.commitSync(); 2
}

1

The consumer polls the records.

2

commitSync commits the offsets of the last messages returned by the poll method and blocks the while(true) loop until a response is received.

The commitAsync method does not wait for the response from the broker. The consumer keeps on polling and processing records, and when the response from the broker is ready, a callback is executed to handle the response. This results in lower latency because the thread polling the records is not blocked. However, a lower offset than the real one can be committed because no order is guaranteed.

For example, if you send an asynchronous commit for offset 10 and it gets delayed because of a networking issue, then another commit for offset 15 can arrive before at the broker. The last commit request marks the offset, so offset 10 is committed although a record with offset 15 was read. This behavior causes duplicates.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        // Record processing
    }

    consumer.commitAsync(callback);
}

If you are using Quarkus to consume Kafka messages, then you can also select the commit strategy that an incoming channel uses by setting the mp.messaging.incoming.my-channel.commit-strategy property. By default, the framework disables autocommit. There are three possible values:

  • throttled: periodically commits the highest offset of the processed messages in the application. The period is specific by the auto.commit.interval.ms property. If an offset can not be committed, the application turns unhealthy. This strategy provides an at least once message delivery semantic, which means that duplicates are possible.

  • latest: commits the offset every time that a record is acknowledged, which reduces the application performance. This strategy provides an at least once message delivery semantic if the channel does not perform any asynchronous processing.

  • ignore: performs no commit. It delegates committing to the underlying Kafka client. This is the the default strategy when enable.auto.commit is true.

Solution 2. Transactions

If your application consumes messages from a topic, processes them, and produces the result to another topic, then you can use the transactional consume-produce feature that Kafka offers. A transaction makes the consuming, processing, and producing steps atomic, which means that either all records are consumed, processed, and produced or none are.

For example, consider that your application consumes records from the A topic, processes the records, and produces those records to the B topic. If your application has consumed, processed, and produced record 1, but it fails while processing record 2, then record 1 is recovered.

The producer does most of the work in the transaction. You should assign a transactional identifier to producers, this is used to uniquely identify a producer. If a producer crashes and leaves uncompleted transactions, then the transactional identifier is used to identify what producer started the transaction.

The following snippet shows a transaction implementation.

KafkaProducer producer = createProducer(“transactional.id”, “my-transactional-id”); 1

producer.initTransactions(); 2

KafkaConsumer consumer = createConsumer(
  “bootstrap.servers”, “localhost:9092”,
  “group.id”, “my-group-id”); 3

consumer.subscribe(singleton(“inputTopic”));

while (true) {
  ConsumerRecords records = consumer.poll(Long.MAX_VALUE);

  producer.beginTransaction(); 4
  for (ConsumerRecord record : records) {
    // Processing
    producer.send(producerRecord(“outputTopic”, record)); 5
  }
  producer.sendOffsetsToTransaction(currentOffsets(consumer), group); 6
  producer.commitTransaction(); 7
}

1

You create the producer with a unique transactional identifier.

2

The producer initializes the transaction. It cleans up any unnecessary resource based on the transactional identifier of the producer. For example, if the producer crashed and left some active resources.

3

You create the consumer.

4

For every batch of records that the poll method fetches, you start a transaction.

5

You send the record to the output topic. In a real-world application, you would have done some processing of the record.

6

You append the offsets of the processed records to the transaction.

7

You notify that the transaction is completed.

If something goes wrong between the begin and commit stages, the transaction is aborted. The application leaves the offset commit log and the topics intact. This protects the application against duplicate messages published to the output topic, and lost messages from the input topic.

Because of the consume, process, produce nature of Kafka transactions, Kafka Streams uses them internally.

Solution 3. Exactly-once Processing with Kafka Streams

Kafka Streams supports exactly-once processing (EOS). To achieve this message delivery semantic, Kafka Streams uses the idempotence and transaction features provided by Kafka.

You can enable exactly-once processing by setting the processing.guarantee configuration value to exactly_once or exactly_once_beta. The exactly_once_beta value improves the performance and requires a Kafka version higher than 2.5. The default value for this property is at_least_once.

Enabling the exactly once guarantee changes the configuration of some Kafka configuration values, such as enable.idempotence and commit.interval.ms.

Revision: ad482-1.8-cc2ae1c