Bookmark this page

Guided Exercise: Preventing Duplication and Data Loss

In this exercise you will learn how to prevent loss of events and event duplication.

The Java application for this exercise implements the following processing pipeline:

  • Consume MovementReported events from the movement-reported-events topic.

  • Process events asynchronously to calculate speed.

  • Produce the speed value to the speed-values topic.

You must address data loss and duplication problems in this event processing pipeline.

You will find the solution files for this exercise in the AD482-apps repository, within the troubleshooting-duplication/solutions directory.

Outcomes

You should be able to prevent loss of events and event duplication by:

  • Deactivating the autocommit Kafka config property.

  • Using transactions and committing offsets manually.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start troubleshooting-duplication
...output omitted...

The lab command copies the exercise files, from the AD482-apps/troubleshooting-duplication/apps directory, which is in your local Git repository, into the troubleshooting-duplication directory, which is in your workspace.

Procedure 6.3. Instructions

  1. Move to the troubleshooting-duplication directory in your workspace.

    (.venv) [user@host AD482]$ cd troubleshooting-duplication
    (.venv) [user@host troubleshooting-duplication]$
  2. Examine the movement-processor/src/main/java/com/redhat/vehicles/MovementProcessor.java file. The application consumes MovementReported events, processes these events asynchronously by calculating speed, and sends the result to the speed-values topic.

    • Consumer offset autocommit is enabled by default.

    • The consumer is configured to commit the last consumed offset each second.

    • The consumer only reads one event on each poll call.

    • The application simulates a delay in each processing step.

    Note that, in the following steps, you use additional consumers to inspect the speed-values topic and the committed offsets.

  3. Run the apps to start the data loss scenario.

    1. Run the producer script. The producer script generates five movement events.

      (.venv) [user@host troubleshooting-duplication]$ python \
       scripts/produce_events.py \
       movement-reported-events
      ...output omitted...
      - Generated event to 'movement-reported-events' topic
      {'distance': 50, 'time': 5}
    2. Open a new terminal window and run the kafka-console-consumer command to keep track of consumed offsets. This command reads the __consumer_offsets topic. This is a special topic that Kafka uses to keep track of the messages consumed by each consumer group.

      [user@host troubleshooting-duplication]$ oc exec \
       -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \
       --bootstrap-server my-cluster-kafka-brokers:9092 \
       --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
       --topic __consumer_offsets

      Keep this terminal open for the rest of the exercise.

    3. Move to the movement-processor directory and run the Java consumer application.

      (.venv) [user@host movement-processor]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"
      ...output omitted...
      Processing  { distance: 10, time: 5 }
      Speed 2.0 sent to topic
      
      
      Processing  { distance: 20, time: 20 }
      Speed 1.0 sent to topic
      
      
      Processing  { distance: 0, time: 0 }
      java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
      ...output omitted...

      Note that the application crashes when processing the third event, due to a division by zero.

    4. Switch to the terminal window where the kafka-console-consumer script is running. Notice how the consumer has committed the offsets for the first three events, even though the application has failed when processing the third event.

      [MovementReported-consumer-group,movement-reported-events,0]::OffsetAndMetadata(offset=1, ...output omitted...)
      [MovementReported-consumer-group,movement-reported-events,0]::OffsetAndMetadata(offset=2, ...output omitted...)
      [MovementReported-consumer-group,movement-reported-events,0]::OffsetAndMetadata(offset=3, ...output omitted...)
    5. Run the Java consumer application again. Verify that the app consumes the 4th and 5th events. The consumer keeps reporting the last consumed offset, which should be five, every second.

      (.venv) [user@host movement-processor]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"
      ...output omitted...
      
      Processing  { distance: 40, time: 10 }
      Speed 4.0 sent to topic
      
      Processing  { distance: 50, time: 5 }
      Speed 10.0 sent to topic
      
      ...output omitted...
    6. Stop the Java consumer application.

    7. Verify that the speed-values topic has received values for all the events except for the 3rd one. Use the kafka-console-consumer command in the Kafka cluster to read the speed-values topic.

      (.venv) [user@host movement-processor]$ oc exec \
       -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \
       --bootstrap-server my-cluster-kafka-brokers:9092 \
       --value-deserializer org.apache.kafka.common.serialization.FloatDeserializer \
       --topic speed-values --from-beginning
      2.0
      1.0
      4.0
      10.0

      This means that the third event has been lost.

      Terminate the speed-values Kafka console consumer.

  4. Recreate the speed-values topic.

    1. Delete the topic.

      [user@host movement-processor]$ oc delete kafkatopic speed-values
      kafkatopic.kafka.strimzi.io "speed-values" deleted

      Important

      The AMQ Streams topic operator might take a few seconds to fully delete the topic.

      Make sure that the topic is deleted before recreating it. This is important to ensure that the recreated topic is empty. You can use the oc get kafkatopic command to verify this.

    2. Recreate the topic.

      (.venv) [user@host movement-processor]$ oc apply -f ../resources/topic.yaml
      kafkatopic.kafka.strimzi.io/speed-values created
  5. Deactivate the autocommit feature to prevent the previous data loss scenario. Additionally, verify that not committing consumer offsets duplicates speed values when the application restarts.

    1. Edit the src/main/java/com/redhat/vehicles/MovementProcessor.java file and change the consumer group ID.

      public static Properties getConsumerConfig() {
          ...output omitted...
      
          // TODO: change consumer group
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "MovementReported-consumer-group-v2");
      
          ...output omitted...
      
          return props;
      }

      The application must start consuming events from the offset 0 to recalculate all the speeds.

      Changing the consumer group ID assigns the application to a new consumer group. This resets the offsets of the consumer application.

      Important

      If you do not change the group ID, then the Java consumer application starts consuming from the latest offset committed by the consumer: offset 5.

    2. Deactivate the autocommit config.

      public static Properties getConsumerConfig() {
          ...output omitted...
      
          // TODO: disable autocommit
          props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      
          return props;
      }
    3. Run the Java consumer application. The app consumes the first two events and sends the calculated speed to the speed-values topic. The app crashes when trying to process the third event.

      (.venv) [user@host movement-processor]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"
      ...output omitted...
      Processing  { distance: 10, time: 5 }
      Speed 2.0 sent to topic
      
      
      Processing  { distance: 20, time: 20 }
      Speed 1.0 sent to topic
      
      
      Processing  { distance: 0, time: 0 }
      java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
      ...output omitted...
    4. Run the Java consumer application again. The application consumes the first two events again, because the application did not commit the offsets.

      (.venv) [user@host movement-processor]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"
      ...output omitted...
      Processing  { distance: 10, time: 5 }
      Speed 2.0 sent to topic
      
      
      Processing  { distance: 20, time: 20 }
      Speed 1.0 sent to topic
      
      ...output omitted...
    5. Verify that the speed-values topic contains duplicated speed values, as a result of processing the first two events twice.

      (.venv) [user@host movement-processor]$ oc exec \
       -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \
       --bootstrap-server my-cluster-kafka-brokers:9092 \
       --value-deserializer org.apache.kafka.common.serialization.FloatDeserializer \
       --topic speed-values --from-beginning
      2.0
      1.0
      2.0
      1.0

      Terminate the speed-values Kafka console consumer.

  6. Recreate the speed-values topic.

    1. Delete the topic.

      [user@host movement-processor]$ oc delete kafkatopic speed-values
      kafkatopic.kafka.strimzi.io "speed-values" deleted
    2. Recreate the topic.

      (.venv) [user@host movement-processor]$ oc apply -f ../resources/topic.yaml
      kafkatopic.kafka.strimzi.io/speed-values created
  7. Use transactions to wrap event consumption, processing, offsets committing, and result production in a single, atomic operation.

    1. Edit the src/main/java/com/redhat/vehicles/MovementProcessor.java file to change the consumer group ID.

      public static Properties getConsumerConfig() {
          ...output omitted...
      
          // TODO: change consumer group
          props.put(
              ConsumerConfig.GROUP_ID_CONFIG,
              "MovementReported-consumer-group-v3"
          );
      
          ...output omitted...
      
          return props;
      }
    2. Add a transactional ID to the producer config. A transactional ID is required when using transactions.

      public static Properties getProducerConfig() {
          ....output omitted...
      
          // TODO: add transactional id
          props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "movement-processor");
      
          return props;
      }
    3. Initialize the transactional state after creating the producer instance.

      public static void main(String[] args) throws InterruptedException, RuntimeException {
          ...output omitted...
          producer = new KafkaProducer<>(getProducerConfig());
      
          // TODO: initialize transactions
          producer.initTransactions();
      
          ...output omitted...
    4. Add the code to start the transaction after polling for new events. After the application processes the events and produces the result, send the offsets to the transaction and commit the transaction.

      while (true) {
          ...output omitted...
      
          // TODO: begin transaction
          producer.beginTransaction();
      
          List<CompletableFuture<Float>> futures = new ArrayList<>();
      
          for (ConsumerRecord<Void, MovementReported> record : records) {
              MovementReported event = record.value();
      
              futures.add(calculateSpeed(event)
                  .whenComplete((speed, t) -> {
                      produceResult(speed);
                  })
                  .exceptionally(error -> {
                      error.printStackTrace();
                      // TODO: abort transaction
                      producer.abortTransaction();
                      System.exit(-1);
                      return null;
                  })
              );
          }
      
          // TODO: wait for async calculations to complete
          waitForCompletion(futures);
      
          // TODO: send offsets and commit transaction
          producer.sendOffsetsToTransaction(
              getConsumedOffsets(consumer),
              consumer.groupMetadata()
          );
          producer.commitTransaction();
      }
    5. Run the Java application. The application crashes when trying to process the third event.

      (.venv) [user@host movement-processor]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"
      ...output omitted...
      
      Processing  { distance: 10, time: 5 }
      Speed 2.0 sent to topic
      
      
      Processing  { distance: 20, time: 20 }
      Speed 1.0 sent to topic
      
      Processing  { distance: 0, time: 0 }
      java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
      
      ...output omitted...
    6. Switch to the terminal window where the kafka-console-consumer script is running for the __consumer_offsets topic. Verify that the MovementReported-consumer-group-v3 consumer group has committed only the first two offsets.

      [MovementReported-consumer-group-v3,movement-reported-events,0]::OffsetAndMetadata(offset=1, ...output omitted...)
      [MovementReported-consumer-group-v3,movement-reported-events,0]::OffsetAndMetadata(offset=2, ...output omitted...)
    7. Run the Java application again. The application starts consuming the third event, and crashes again. Note how the application does not try to consume the first two events.

      (.venv) [user@host movement-processor]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"
      ...output omitted...
      Processing  { distance: 0, time: 0 }
      java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
      ...output omitted...
  8. Fix the error and verify that the Java application has processed each event exactly once.

    1. Edit the src/main/java/com/redhat/vehicles/MovementProcessor.java file to handle the division by zero case.

      private static CompletableFuture<Float> calculateSpeed(MovementReported event) {
          return CompletableFuture.supplyAsync(() -> {
              System.out.println("\n\nProcessing " + event);
      
              // TODO: handle division by 0
              if (event.time == 0) {
                  return (float) 0;
              }
      
              float speed = event.distance / event.time;
              return speed;
          });
      }
    2. Run the Java application again to process the remaining events.

      (.venv) [user@host movement-processor]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"
      ...output omitted...
    3. Terminate the Java application after the consumer processes the events.

    4. Verify that the speed-values topic received exactly one speed value for each event.

      (.venv) [user@host movement-processor]$ oc exec \
       -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \
       --bootstrap-server my-cluster-kafka-brokers:9092 \
       --value-deserializer org.apache.kafka.common.serialization.FloatDeserializer \
       --topic speed-values --from-beginning
      2.0
      1.0
      0.0
      4.0
      10.0
    5. Terminate all running scripts.

Finish

Go back to your workspace. Use the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

[student@workstation AD482]$ lab finish troubleshooting-duplication

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c