Bookmark this page

Guided Exercise: Configuring Producer Retries and Idempotence

In this exercise you will configure retries, timeouts, and idempotence.

Outcomes

You should be able to use timeouts, retries, and idempotence settings to control how Kafka producers handle errors when sending data.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start troubleshooting-retries
...output omitted...

The lab command copies the exercise files, from the AD482-apps/troubleshooting-retries/apps directory, which is in your local Git repository, into the troubleshooting-retries directory, which is in your workspace. It also copies the .env file in your workspace to the troubleshooting-retries/timestamps directory.

Procedure 6.2. Instructions

  1. Move to the troubleshooting-retries directory.

    1. From your workspace directory, open a terminal window, navigate to the troubleshooting-retries directory.

      (.venv) [user@host AD482]$ cd troubleshooting-retries
      (.venv) [user@host troubleshooting-retries]$

      The source code for this exercise is in this directory.

  2. Start the consumer Python script, the producer Java application, and verify that the consumer receives messages.

    1. Start the consumer Python script.

      (.venv) [user@host troubleshooting-retries]$ python \
      scripts/consume_timestamps.py timestamps
      Consuming messages from 'timestamps'...
    2. In a new command-line terminal, move to the troubleshooting-retries/timestamps directory in your workspace. Compile and run the Java application by executing ./mvnw compile exec:java "-Dexec.mainClass=com.redhat.telemetry.ProducerApp". The application produces 10 timestamp messages.

      [user@host timestamps]$ ./mvnw compile exec:java \
      "-Dexec.mainClass=com.redhat.telemetry.ProducerApp"
      ...output omitted...
      Message sent: 162797775623
      Message sent: 162797775627
      ...output omitted...
    3. Verify that the consumer receives the 10 produced messages. Under normal conditions, no duplicates should appear.

      ...output omitted...
      NEW VALUE: 1627910953982
      Messages received: 10
  3. Make message deliveries fail in the producer due to timeouts and verify that the consumer does not receive any messages.

    1. Edit the src/main/java/com/redhat/telemetry/ProducerApp.java file to set the delivery timeout to 1 millisecond.

      // TODO: configure timeouts
      props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 1);
      props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);

      The delivery timeout configuration must by greater than or equal to linger.ms + request.timeout.ms, so you must set the request timeout config too. The linger.ms value is 0 by default.

    2. Restart the Python consumer.

    3. Recompile and run the Java app. Verify that the producer times out repeatedly.

      [user@host timestamps]$ ./mvnw compile exec:java \
      "-Dexec.mainClass=com.redhat.telemetry.ProducerApp"
      ...output omitted...
      [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker my-cluster-kafka-bootstrap-your-user-kafka-cluster.apps.cluster.example.com:443 (id: -1 rack: null) disconnected
      [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Bootstrap broker my-cluster-kafka-bootstrap-your-user-kafka-cluster.apps.cluster.example.com:443 (id: -1 rack: null) disconnected
    4. Verify that the consumer does not receive any new messages.

  4. Increase the delivery timeout to 60 seconds, but keep the request timeout low. This allows the producer to retry the delivery repeatedly, but will cause failures in each delivery retry, due to the low request timeout.

    1. Edit the src/main/java/com/redhat/telemetry/ProducerApp.java file to set delivery.timeout.ms to 60000 and request.timeout.ms to 150.

      // TODO: configure timeouts
      props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 60000);
      props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 150);
    2. Recompile and restart the Java application and keep adjusting the kafka.request.timeout.ms value until you see both successful deliveries and errors.

      Important

      The timeout is subject to the state of the cluster and the network, so you might need to adjust the request.timeout.ms configuration value. Tune the ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG property value until you see something like the following output. A combination of successful deliveries and errors.

      [user@host timestamps]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.telemetry.ProducerApp"
      ...output omitted...
      [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition timestamps-0 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
      Message sent: 1627977756160
      Message sent: 1627977758162
      [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Got error produce response with correlation id 43 on topic-partition timestamps-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION. Error Message: Disconnected from node 0
      [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition timestamps-0 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
      Message sent: 1627977760167
      ...output omitted...
    3. Verify that the consumer detects duplicates messages.

      ...output omitted...
      NEW VALUE: 1627980985868
      Messages received: 11
      
      NEW VALUE: 1627980985868
      DUPLICATE!
      Messages received: 12
      
      NEW VALUE: 1627980989879
      Messages received: 13

      The consumer receives more than 10 messages, because the producer duplicated some of them due to the retries. Although the broker stored some of the messages successfully, because the timeout is very low, the producer did not wait enough time to receive the ACK. Therefore, the producer retries sending the messages without an ACK.

    4. If the Java application is still running, then stop it. Stop the Python consumer as well.

  5. Enable idempotence and verify that the consumer does not receive duplicates.

    1. Edit the src/main/java/com/redhat/telemetry/ProducerApp.java file to enable idempotence.

      // TODO: configure idempotence
      props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
    2. Start the Python consumer again.

    3. Recompile and start the Java application. Verify that some delivery errors still occur. If no errors happen, then decrease the kafka.request.timeout.ms value and try again. If no message is sent, then increase the kafka.request.timeout.ms value and try again.

    4. Verify that the consumer receives no duplicates.

      ...output omitted...
      NEW VALUE: 1627981176772
      Messages received: 9
      
      NEW VALUE: 1627981178775
      Messages received: 10
    5. Stop both applications.

  6. Disable the retries and check that the producer does not acknowledge the delivery of the 10 messages. With the retries disabled, if the producer does not receive an acknowledgment, then it does not send the message again.

    1. Edit the src/main/java/com/redhat/telemetry/ProducerApp.java file to set retries to 0:

      // TODO: configure retries
      props.put(ProducerConfig.RETRIES_CONFIG, 0);
    2. Deactivate idempotence. Idempotence must be false if retries are disabled.

      // TODO: configure idempotence
      props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
    3. Start the Python consumer again.

    4. Recompile and start the Java application. Verify if any errors occur.

      If no errors occur, then decrease the kafka.request.timeout.ms value and try again. If the producer is not able to send any messages, then increase the kafka.request.timeout.ms value and try again.

    5. Verify that the producer was not able to send 10 messages successfully.

      ...output omitted...
      [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.producer.internals.Sender - [Producer clientId=producer-1] Received invalid metadata error in produce request on partition timestamps-0 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
      org.apache.kafka.common.errors.NetworkException: Disconnected from node 0
      Sent: 1627980201314
      Successfully sent messages: 6
      ...output omitted...

      The producer considers a message as successfully sent when it receives the acknowledgment from the Kafka broker, regardless of whether the message arrives at the Kafka broker.

      Therefore, although the Java producer can not acknowledge sending 10 messages, the Python consumer might receive all of them.

    6. Verify that the consumer did not receive duplicates.

    7. Stop both applications, if they are still running.

Finish

Go back to your workspace. Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

[student@workstation AD482]$ lab finish troubleshooting-retries

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c