Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will learn how to prevent loss of events and event duplication.
The Java application for this exercise implements the following processing pipeline:
Consume
MovementReportedevents from themovement-reported-eventstopic.Process events asynchronously to calculate speed.
Produce the speed value to the
speed-valuestopic.
You must address data loss and duplication problems in this event processing pipeline.
You will find the solution files for this exercise in the AD482-apps repository, within the troubleshooting-duplication/solutions directory.
Outcomes
You should be able to prevent loss of events and event duplication by:
Deactivating the
autocommitKafka config property.Using transactions and committing offsets manually.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start troubleshooting-duplication
...output omitted...
The lab command copies the exercise files, from the AD482-apps/troubleshooting-duplication/apps directory, which is in your local Git repository, into the troubleshooting-duplication directory, which is in your workspace.
Procedure 6.3. Instructions
Move to the
troubleshooting-duplicationdirectory in your workspace.(.venv) [user@host AD482]$
cd troubleshooting-duplication(.venv) [user@host troubleshooting-duplication]$Examine the
movement-processor/src/main/java/com/redhat/vehicles/MovementProcessor.javafile. The application consumesMovementReportedevents, processes these events asynchronously by calculating speed, and sends the result to thespeed-valuestopic.Consumer offset autocommit is enabled by default.
The consumer is configured to commit the last consumed offset each second.
The consumer only reads one event on each
pollcall.The application simulates a delay in each processing step.
Note that, in the following steps, you use additional consumers to inspect the
speed-valuestopic and the committed offsets.Run the apps to start the data loss scenario.
Run the producer script. The producer script generates five movement events.
(.venv) [user@host troubleshooting-duplication]$
python \ scripts/produce_events.py \ movement-reported-events...output omitted... - Generated event to 'movement-reported-events' topic {'distance': 50, 'time': 5}Open a new terminal window and run the
kafka-console-consumercommand to keep track of consumed offsets. This command reads the__consumer_offsetstopic. This is a special topic that Kafka uses to keep track of the messages consumed by each consumer group.[user@host troubleshooting-duplication]$
oc exec \ -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-brokers:9092 \ --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \ --topic __consumer_offsetsKeep this terminal open for the rest of the exercise.
Move to the
movement-processordirectory and run the Java consumer application.(.venv) [user@host movement-processor]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"...output omitted... Processing { distance: 10, time: 5 } Speed 2.0 sent to topic Processing { distance: 20, time: 20 } Speed 1.0 sent to topic Processing { distance: 0, time: 0 } java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero ...output omitted...Note that the application crashes when processing the third event, due to a division by zero.
Switch to the terminal window where the
kafka-console-consumerscript is running. Notice how the consumer has committed the offsets for the first three events, even though the application has failed when processing the third event.[MovementReported-consumer-group,movement-reported-events,0]::OffsetAndMetadata(offset=1, ...output omitted...) [MovementReported-consumer-group,movement-reported-events,0]::OffsetAndMetadata(offset=2, ...output omitted...) [MovementReported-consumer-group,movement-reported-events,0]::OffsetAndMetadata(offset=3, ...output omitted...)
Run the Java consumer application again. Verify that the app consumes the 4th and 5th events. The consumer keeps reporting the last consumed offset, which should be five, every second.
(.venv) [user@host movement-processor]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"...output omitted... Processing { distance: 40, time: 10 } Speed 4.0 sent to topic Processing { distance: 50, time: 5 } Speed 10.0 sent to topic ...output omitted...Stop the Java consumer application.
Verify that the
speed-valuestopic has received values for all the events except for the 3rd one. Use thekafka-console-consumercommand in the Kafka cluster to read thespeed-valuestopic.(.venv) [user@host movement-processor]$
oc exec \ -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-brokers:9092 \ --value-deserializer org.apache.kafka.common.serialization.FloatDeserializer \ --topic speed-values --from-beginning2.0 1.0 4.0 10.0This means that the third event has been lost.
Terminate the
speed-valuesKafka console consumer.
Recreate the
speed-valuestopic.Delete the topic.
[user@host movement-processor]$
oc delete kafkatopic speed-valueskafkatopic.kafka.strimzi.io "speed-values" deletedImportant
The AMQ Streams topic operator might take a few seconds to fully delete the topic.
Make sure that the topic is deleted before recreating it. This is important to ensure that the recreated topic is empty. You can use the
oc get kafkatopiccommand to verify this.Recreate the topic.
(.venv) [user@host movement-processor]$
oc apply -f ../resources/topic.yamlkafkatopic.kafka.strimzi.io/speed-values created
Deactivate the autocommit feature to prevent the previous data loss scenario. Additionally, verify that not committing consumer offsets duplicates speed values when the application restarts.
Edit the
src/main/java/com/redhat/vehicles/MovementProcessor.javafile and change the consumer group ID.public static Properties getConsumerConfig() { ...output omitted... // TODO: change consumer group props.put(ConsumerConfig.GROUP_ID_CONFIG,"MovementReported-consumer-group-v2"); ...output omitted... return props; }The application must start consuming events from the offset 0 to recalculate all the speeds.
Changing the consumer group ID assigns the application to a new consumer group. This resets the offsets of the consumer application.
Important
If you do not change the group ID, then the Java consumer application starts consuming from the latest offset committed by the consumer: offset 5.
Deactivate the autocommit config.
public static Properties getConsumerConfig() { ...output omitted... // TODO: disable autocommit props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false); return props; }Run the Java consumer application. The app consumes the first two events and sends the calculated speed to the
speed-valuestopic. The app crashes when trying to process the third event.(.venv) [user@host movement-processor]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"...output omitted... Processing { distance: 10, time: 5 } Speed 2.0 sent to topic Processing { distance: 20, time: 20 } Speed 1.0 sent to topic Processing { distance: 0, time: 0 } java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero ...output omitted...Run the Java consumer application again. The application consumes the first two events again, because the application did not commit the offsets.
(.venv) [user@host movement-processor]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"...output omitted... Processing { distance: 10, time: 5 } Speed 2.0 sent to topic Processing { distance: 20, time: 20 } Speed 1.0 sent to topic ...output omitted...Verify that the
speed-valuestopic contains duplicated speed values, as a result of processing the first two events twice.(.venv) [user@host movement-processor]$
oc exec \ -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-brokers:9092 \ --value-deserializer org.apache.kafka.common.serialization.FloatDeserializer \ --topic speed-values --from-beginning2.0 1.0 2.0 1.0Terminate the
speed-valuesKafka console consumer.
Recreate the
speed-valuestopic.Delete the topic.
[user@host movement-processor]$
oc delete kafkatopic speed-valueskafkatopic.kafka.strimzi.io "speed-values" deletedRecreate the topic.
(.venv) [user@host movement-processor]$
oc apply -f ../resources/topic.yamlkafkatopic.kafka.strimzi.io/speed-values created
Use transactions to wrap event consumption, processing, offsets committing, and result production in a single, atomic operation.
Edit the
src/main/java/com/redhat/vehicles/MovementProcessor.javafile to change the consumer group ID.public static Properties getConsumerConfig() { ...output omitted... // TODO: change consumer group props.put( ConsumerConfig.GROUP_ID_CONFIG,"MovementReported-consumer-group-v3"); ...output omitted... return props; }Add a transactional ID to the producer config. A transactional ID is required when using transactions.
public static Properties getProducerConfig() { ....output omitted... // TODO: add transactional idprops.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "movement-processor");return props; }Initialize the transactional state after creating the producer instance.
public static void main(String[] args) throws InterruptedException, RuntimeException { ...output omitted... producer = new KafkaProducer<>(getProducerConfig()); // TODO: initialize transactionsproducer.initTransactions();...output omitted...Add the code to start the transaction after polling for new events. After the application processes the events and produces the result, send the offsets to the transaction and commit the transaction.
while (true) { ...output omitted... // TODO: begin transactionproducer.beginTransaction();List<CompletableFuture<Float>> futures = new ArrayList<>(); for (ConsumerRecord<Void, MovementReported> record : records) { MovementReported event = record.value(); futures.add(calculateSpeed(event) .whenComplete((speed, t) -> { produceResult(speed); }) .exceptionally(error -> { error.printStackTrace(); // TODO: abort transactionproducer.abortTransaction();System.exit(-1); return null; }) ); } // TODO: wait for async calculations to completewaitForCompletion(futures);// TODO: send offsets and commit transactionproducer.sendOffsetsToTransaction( getConsumedOffsets(consumer), consumer.groupMetadata() ); producer.commitTransaction();}Run the Java application. The application crashes when trying to process the third event.
(.venv) [user@host movement-processor]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"...output omitted... Processing { distance: 10, time: 5 } Speed 2.0 sent to topic Processing { distance: 20, time: 20 } Speed 1.0 sent to topic Processing { distance: 0, time: 0 } java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero ...output omitted...Switch to the terminal window where the
kafka-console-consumerscript is running for the__consumer_offsetstopic. Verify that theMovementReported-consumer-group-v3consumer group has committed only the first two offsets.[MovementReported-consumer-group-v3,movement-reported-events,0]::OffsetAndMetadata(offset=1, ...output omitted...) [MovementReported-consumer-group-v3,movement-reported-events,0]::OffsetAndMetadata(offset=2, ...output omitted...)
Run the Java application again. The application starts consuming the third event, and crashes again. Note how the application does not try to consume the first two events.
(.venv) [user@host movement-processor]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"...output omitted... Processing { distance: 0, time: 0 } java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero ...output omitted...
Fix the error and verify that the Java application has processed each event exactly once.
Edit the
src/main/java/com/redhat/vehicles/MovementProcessor.javafile to handle the division by zero case.private static CompletableFuture<Float> calculateSpeed(MovementReported event) { return CompletableFuture.supplyAsync(() -> { System.out.println("\n\nProcessing " + event); // TODO: handle division by 0if (event.time == 0) { return (float) 0; }float speed = event.distance / event.time; return speed; }); }Run the Java application again to process the remaining events.
(.venv) [user@host movement-processor]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.vehicles.MovementProcessor"...output omitted...Terminate the Java application after the consumer processes the events.
Verify that the
speed-valuestopic received exactly one speed value for each event.(.venv) [user@host movement-processor]$
oc exec \ -it my-cluster-kafka-0 -- ./bin/kafka-console-consumer.sh \ --bootstrap-server my-cluster-kafka-brokers:9092 \ --value-deserializer org.apache.kafka.common.serialization.FloatDeserializer \ --topic speed-values --from-beginning2.0 1.0 0.0 4.0 10.0Terminate all running scripts.
This concludes the guided exercise.