Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will configure retries, timeouts, and idempotence.
Outcomes
You should be able to use timeouts, retries, and idempotence settings to control how Kafka producers handle errors when sending data.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start troubleshooting-retries
...output omitted...
The lab command copies the exercise files, from the AD482-apps/troubleshooting-retries/apps directory, which is in your local Git repository, into the troubleshooting-retries directory, which is in your workspace.
It also copies the .env file in your workspace to the troubleshooting-retries/timestamps directory.
Procedure 6.2. Instructions
Move to the
troubleshooting-retriesdirectory.From your workspace directory, open a terminal window, navigate to the
troubleshooting-retriesdirectory.(.venv) [user@host AD482]$
cd troubleshooting-retries(.venv) [user@host troubleshooting-retries]$The source code for this exercise is in this directory.
Start the consumer Python script, the producer Java application, and verify that the consumer receives messages.
Start the consumer Python script.
(.venv) [user@host troubleshooting-retries]$
python \ scripts/consume_timestamps.py timestampsConsuming messages from 'timestamps'...In a new command-line terminal, move to the
troubleshooting-retries/timestampsdirectory in your workspace. Compile and run the Java application by executing./mvnw compile exec:java "-Dexec.mainClass=com.redhat.telemetry.ProducerApp". The application produces 10 timestamp messages.[user@host timestamps]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.telemetry.ProducerApp"...output omitted... Message sent: 162797775623 Message sent: 162797775627 ...output omitted...Verify that the consumer receives the 10 produced messages. Under normal conditions, no duplicates should appear.
...output omitted... NEW VALUE: 1627910953982 Messages received: 10
Make message deliveries fail in the producer due to timeouts and verify that the consumer does not receive any messages.
Edit the
src/main/java/com/redhat/telemetry/ProducerApp.javafile to set the delivery timeout to 1 millisecond.// TODO: configure timeouts
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 1); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);The delivery timeout configuration must by greater than or equal to
linger.ms+request.timeout.ms, so you must set the request timeout config too. Thelinger.msvalue is 0 by default.Restart the Python consumer.
Recompile and run the Java app. Verify that the producer times out repeatedly.
[user@host timestamps]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.telemetry.ProducerApp"...output omitted... [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker my-cluster-kafka-bootstrap-your-user-kafka-cluster.apps.cluster.example.com:443 (id: -1 rack: null) disconnected [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker my-cluster-kafka-bootstrap-your-user-kafka-cluster.apps.cluster.example.com:443 (id: -1 rack: null) disconnectedVerify that the consumer does not receive any new messages.
Increase the delivery timeout to 60 seconds, but keep the request timeout low. This allows the producer to retry the delivery repeatedly, but will cause failures in each delivery retry, due to the low request timeout.
Edit the
src/main/java/com/redhat/telemetry/ProducerApp.javafile to setdelivery.timeout.msto 60000 andrequest.timeout.msto 150.// TODO: configure timeouts props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,
60000); props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,150);Recompile and restart the Java application and keep adjusting the
kafka.request.timeout.msvalue until you see both successful deliveries and errors.Important
The timeout is subject to the state of the cluster and the network, so you might need to adjust the
request.timeout.msconfiguration value. Tune theProducerConfig.REQUEST_TIMEOUT_MS_CONFIGproperty value until you see something like the following output. A combination of successful deliveries and errors.[user@host timestamps]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.telemetry.ProducerApp"...output omitted... [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition timestamps-0 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now Message sent: 1627977756160 Message sent: 1627977758162 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 43 on topic-partition timestamps-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION. Error Message: Disconnected from node 0 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition timestamps-0 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now Message sent: 1627977760167 ...output omitted...Verify that the consumer detects duplicates messages.
...output omitted... NEW VALUE:
1627980985868Messages received: 11 NEW VALUE:1627980985868DUPLICATE!Messages received: 12 NEW VALUE: 1627980989879 Messages received: 13The consumer receives more than 10 messages, because the producer duplicated some of them due to the retries. Although the broker stored some of the messages successfully, because the timeout is very low, the producer did not wait enough time to receive the ACK. Therefore, the producer retries sending the messages without an ACK.
If the Java application is still running, then stop it. Stop the Python consumer as well.
Enable idempotence and verify that the consumer does not receive duplicates.
Edit the
src/main/java/com/redhat/telemetry/ProducerApp.javafile to enable idempotence.// TODO: configure idempotence
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);Start the Python consumer again.
Recompile and start the Java application. Verify that some delivery errors still occur. If no errors happen, then decrease the
kafka.request.timeout.msvalue and try again. If no message is sent, then increase thekafka.request.timeout.msvalue and try again.Verify that the consumer receives no duplicates.
...output omitted... NEW VALUE: 1627981176772 Messages received: 9 NEW VALUE: 1627981178775 Messages received: 10Stop both applications.
Disable the retries and check that the producer does not acknowledge the delivery of the 10 messages. With the retries disabled, if the producer does not receive an acknowledgment, then it does not send the message again.
Edit the
src/main/java/com/redhat/telemetry/ProducerApp.javafile to setretriesto 0:// TODO: configure retries
props.put(ProducerConfig.RETRIES_CONFIG, 0);Deactivate idempotence. Idempotence must be false if retries are disabled.
// TODO: configure idempotence props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
false);Start the Python consumer again.
Recompile and start the Java application. Verify if any errors occur.
If no errors occur, then decrease the
kafka.request.timeout.msvalue and try again. If the producer is not able to send any messages, then increase thekafka.request.timeout.msvalue and try again.Verify that the producer was not able to send 10 messages successfully.
...output omitted... [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition timestamps-0 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now org.apache.kafka.common.errors.NetworkException: Disconnected from node 0 Sent: 1627980201314
Successfully sent messages: 6...output omitted...The producer considers a message as successfully sent when it receives the acknowledgment from the Kafka broker, regardless of whether the message arrives at the Kafka broker.
Therefore, although the Java producer can not acknowledge sending 10 messages, the Python consumer might receive all of them.
Verify that the consumer did not receive duplicates.
Stop both applications, if they are still running.
This concludes the guided exercise.