Bookmark this page

Guided Exercise: Sending Data with Producers

In this exercise you will send telemetry measurements to Kafka.

You will create two producers:

  • A plain Java producer that sends a random integer to a topic named total-connected-devices.

  • A Quarkus producer that sends a record of temperatures to a topic composed of two partitions, and named device-temperatures.

You will find the solution files for this exercise in the AD482-apps repository, within the kafka-producers/solutions directory. Notice that you might need to replace some strings in the Java files.

Outcomes

You should be able to build producers that send data to Kafka topics.

From your workspace directory, with your Python virtual environment activated, use the lab command to prepare your system for this exercise.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1
(.venv) [user@host AD482]$ lab start kafka-producers

Copy the Bootstrap Server and Bootstrap Port values. You will use these values to connect to the Kafka cluster.

The lab command copies the exercise files, from the AD482-apps/kafka-producers/apps directory, which is in your local Git repository, into the kafka-producers directory, which is in your workspace.

Procedure 2.2. Instructions

  1. From your workspace directory, navigate to the kafka-producers directory.

    (.venv) [user@host AD482]$ cd kafka-producers
    (.venv) [user@host kafka-producers]$
    1. Verify that your project is set to RHT_OCP4_DEV_USER-kafka-cluster.

      (.venv) [user@host kafka-producers]$ oc project
      Using project "RHT_OCP4_DEV_USER-kafka-cluster" on server ...output omitted...
  2. Create a plain Java producer with the Kafka Java client to send events to the total-connected-devices topic.

    1. Examine and use the resources/create-topic.yaml template to create a topic named total-connected-devices.

      (.venv) [user@host kafka-producers]$ oc process \
       -f resources/create-topic.yaml \
       -p TOPIC_NAME='total-connected-devices' \
       | oc apply -f -
      kafkatopic.kafka.strimzi.io/total-connected-devices created

      Note

      By default, the template configures the topic to have 1 partition, and a replication factor of 1.

    2. Move to the plain directory.

      (.venv) [user@host kafka-producers]$ cd plain
    3. By using your editor of choice, open the producer/src/main/java/com/redhat/telemetry/producer/ProducerApp.java file, and examine the code. The file includes all the required imports and defines two methods. You will write the implementation of these methods in this guided exercise.

      package com.redhat.telemetry.producer;
      
      import java.util.Properties;
      import java.util.Random;
      
      import org.apache.kafka.clients.CommonClientConfigs; 1
      import org.apache.kafka.clients.producer.KafkaProducer; 2
      import org.apache.kafka.clients.producer.Producer; 3
      import org.apache.kafka.clients.producer.ProducerConfig; 4
      import org.apache.kafka.clients.producer.ProducerRecord; 5
      import org.apache.kafka.common.config.SslConfigs; 6
      
      public class ProducerApp {
          public static Properties configureProperties() {
              Properties props = new Properties();
      
              ...output omitted...
      
              return props;
          }
      
          public static void main(String[] args) {
              ...output omitted...
          }
      
          private static void printRecord(ProducerRecord record) {
              ...output omitted...
          }
      }

      1

      Configuration settings shared by Kafka client applications.

      2

      Kafka client that publishes records to a Kafka cluster.

      3

      Interface for the KafkaProducer class.

      4

      Configuration for the Kafka producer.

      5

      Record published to a Kafka cluster.

      6

      Configuration settings for the SSL protocol.

    4. Write the implementation of the configureProperties() method.

      To configure the Kafka settings, use the Properties class to represent a set of properties, and define:

      • The bootstrap server

      • The record key serializer

      • The record value serializer

      • The type of connection

      • The location and passwords of the TrustStore file

      The method should look like the following:

      public static Properties configureProperties() {
          Properties props = new Properties();
      
          // TODO: configure the bootstrap server
          props.put(
                  ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                  "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT" 1
          );
      
          // TODO: configure the key and value serializers
          props.put(
                  ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.StringSerializer" 2
          );
          props.put(
                  ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  "org.apache.kafka.common.serialization.IntegerSerializer" 3
          );
      
          // TODO: configure the SSL connection
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); 4
          props.put(
                  SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
                  "ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks" 5
          );
          props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); 6
      
          return props;
      }

      1

      Bootstrap server host and port

      2

      Record key serializer

      3

      Record value serializer

      4

      Communication protocol

      5

      Absolute path to the location of the TrustStore file

      6

      Password for the TrustStore file

    5. Write the implementation of the main() method to send 10 random integers to the total-connected-devices topic. Use the printRecord() helper method to print the record sent to the topic in the console.

      The method should look like the following:

      public static void main(String[] args) {
          // TODO: Implement the Kafka producer
          Random random = new Random(); 1
          Producer<Void,Integer> producer = new KafkaProducer<>( 2
                   configureProperties()
          );
      
          for (int i = 0; i < 10; i++) { 3
              ProducerRecord<Void, Integer> record = new ProducerRecord<>( 4
                      "total-connected-devices", 5
                      random.nextInt(100) 6
              );
      
              producer.send(record); 7
              printRecord(record);
          }
      
          producer.close(); 8
      }

      1

      Creates a random object.

      2

      Creates a KafkaProducer with the configuration returned by the configureProperties() method. The default acknowledgment mode is 1.

      3

      Executes the logic of sending a random integer to the topic 10 times.

      4

      Creates a record.

      5

      Sets the topic that is going to receive the record.

      6

      Sets a random integer as the value of the record.

      7

      Sends the record to Kafka asynchronously.

      8

      Closes the connection to Kafka.

    6. In your command-line terminal, navigate to the kafka-producers/plain directory. Compile the application.

      (.venv) [user@host plain]$ ./mvnw clean package
      ...output omitted...
      [INFO] BUILD SUCCESS
      ...output omitted...
  3. In a new command-line terminal, execute the viewer application and leave it open. This application listens for records sent to the total-connected-devices topic. Then, run the producer application in your other command-terminal and verify that the viewer application is receiving the records.

    1. Open a new command-line terminal and move to the kafka-producers/plain directory in your workspace. Run the Java executable file for the viewer application. This application keeps on waiting for new messages to be consumed.

      (.venv) [user@host plain]$ java -jar viewer/target/viewer-1.0.jar
      ...output omitted...
      Waiting for events...
      Waiting for events...
    2. In your other command-line terminal, run the Java executable file for the producer application. This application sends several messages to Kafka.

      (.venv) [user@host plain]$ java -jar producer/target/producer-1.0.jar
      ...output omitted...
      Sent record:
              Topic = total-connected-devices
              Partition = null
              Key = null
              Value = 89
    3. Verify that the viewer application is receiving the records sent by the producer application.

      ...output omitted...
      Received total-connected-devices: 24
      Received total-connected-devices: 30
      Received total-connected-devices: 91
      Received total-connected-devices: 53
      Received total-connected-devices: 82
      Received total-connected-devices: 27
      Received total-connected-devices: 89
      Waiting for events...

      Press CTRL + c to stop the command. Close the command-line terminal.

  4. Create a producer in Quarkus to send events to the device-temperatures topic, and use the Device ID as the ID of the records.

    1. Navigate to the kafka-producers directory.

      (.venv) [user@host plain]$ cd ..
      (.venv) [user@host kafka-producers]$
    2. Use the resources/create-topic.yaml template to create a topic named device-temperatures with 2 partitions.

      (.venv) [user@host kafka-producers]$ oc process \
       -f resources/create-topic.yaml \
       -p TOPIC_NAME='device-temperatures' \
       -p PARTITIONS=2 \
       | oc apply -f -
      kafkatopic.kafka.strimzi.io/device-temperatures created
    3. Move to the quarkus directory.

      (.venv) [user@host kafka-producers]$ cd quarkus
    4. By using your editor of choice, open the src/main/resources/application.properties file, and configure the following Kafka settings:

      • The bootstrap server

      • The record key serializer

      • The record value serializer

      • The type of connection

      • The location and passwords of the TrustStore file

      • The Kafka connector

      The file content should look like the following:

      # TODO: configure the bootstrap server
      kafka.bootstrap.servers = YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT 1
      
      # TODO: configure the key and value serializers
      mp.messaging.outgoing.device-temperatures.key.serializer = org.apache.kafka.common.serialization.StringSerializer 2
      mp.messaging.outgoing.device-temperatures.value.serializer = org.apache.kafka.common.serialization.IntegerSerializer 3
      
      # TODO: configure the SSL connection
      kafka.security.protocol = SSL 4
      kafka.ssl.truststore.location = ABSOLUTE_PATH_TO_WORKSPACE_FOLDER/truststore.jks 5
      kafka.ssl.truststore.password = password 6
      
      # TODO: configure the Kafka connector
      mp.messaging.outgoing.device-temperatures.connector = smallrye-kafka 7
      
      # TODO: disable Quarkus dev services
      quarkus.kafka.devservices.enabled = false 8

      1

      Bootstrap server host and port

      2

      Record key serializer

      3

      Record value serializer

      4

      Communication protocol

      5

      Absolute path to the location of the TrustStore file

      6

      Password for the TrustStore file

      7

      Kafka connector

      8

      Disable Quarkus Dev Services. When you run Quarkus in dev mode, a transient failure in the connection to the Kafka broker triggers the creation of a local Kafka cluster. To avoid this happening while testing the application, you must disable Dev Services.

      Important

      If you copy configuration values from the preceding code block, then make sure to remove trailing spaces.

      Trailing spaces in configuration properties might cause unexpected errors, such as Quarkus not recognizing the security protocol or not finding the Kafka connector.

    5. Open the src/main/java/com/redhat/telemetry/producer/ProducerApp.java file, and examine the code. The file includes all the required imports.

      package com.redhat.telemetry.producer;
      
      import java.time.Duration;
      import java.util.Random;
      
      import javax.enterprise.context.ApplicationScoped;
      
      import io.smallrye.mutiny.Multi; 1
      import io.smallrye.reactive.messaging.kafka.Record; 2
      import org.eclipse.microprofile.reactive.messaging.Outgoing; 3
      import org.jboss.logging.Logger;
      
      @ApplicationScoped
      public class ProducerApp {
      
          private static final Logger LOG = Logger.getLogger(ProducerApp.class);
      
          private final Random random = new Random();
      
          // TODO: Implement the Kafka producer
      
      }

      1

      Kafka publisher from the SmallRye Mutiny library

      2

      Representation of a Kafka record

      3

      Annotation for outgoing messages

    6. Write the implementation of the Kafka producer to send random integers to the device-temperatures topic.

      The method should look like the following:

      // TODO: Implement the Kafka producer
      @Outgoing("device-temperatures") 1
      public Multi<Record<String, Integer>> generate() {
          return Multi.createFrom().ticks().every(Duration.ofSeconds(1)) 2
                  .onOverflow().drop()
                  .map(tick -> {
                      String currentDevice = "device-" + random.nextInt(10);
                      int currentMeasure = random.nextInt(100);
      
                      LOG.infov("Device ID: {0}, measure: {1}",
                              currentDevice,
                              currentMeasure
                      );
      
                      return Record.of(currentDevice, currentMeasure); 3
                  });
      }

      1

      Annotation indicating to which topic the application sends messages.

      2

      Return of a Mutiny stream that emits a temperature measurement every second.

      3

      Kafka record composed of a device ID, and a value.

    7. Start the producer application.

      (.venv) [user@host quarkus]$ ./mvnw clean package quarkus:dev
      ...output omitted...
      ...output omitted... Device ID: device-8, measure: 94
      ...output omitted... Device ID: device-2, measure: 20
      ...output omitted... Device ID: device-5, measure: 9
      ...output omitted... Device ID: device-8, measure: 92
      ...output omitted...
  5. Visualize the messages sent to the device-temperatures topic by using a helper script.

    1. Open a new terminal window, and navigate to your workspace directory. Make sure that the Python virtual environment is active.

      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$

      Important

      On Windows, use the Activate.ps1 script to activate your Python virtual environment.

      PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1
    2. Run the consume_integers.py helper script located in the kafka-producers/scripts directory to read all the messages available in the device-temperatures topic. It might take several seconds to connect to the Kafka cluster and start pulling messages from the topic.

      (.venv) [user@host AD482]$ python kafka-producers/scripts/consume_integers.py \
       device-temperatures
      ...output omitted...
      >>>>
              Partition: 1
              Key: device-8
              Measurement: 94
      >>>>
              Partition: 1
              Key: device-8
              Measurement: 92
      >>>>
      ...output omitted...

      Notice that all the messages with the same key have the same partition assigned.

      Stop the command, and close the terminal.

    3. Return to the Quarkus terminal, stop the producer application, and navigate to the workspace directory.

Finish

Go back to your workspace. Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish kafka-producers

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c