Bookmark this page

Guided Exercise: Receiving Data with Consumers

In this exercise you will implement Kafka consumers with the Java Kafka client library, and the SmallRye Reactive Messaging framework in Quarkus.

Outcomes

You should be able to implement a Kafka consumer using the plain Java client and the SmallRye Reactive Messaging extension for Quarkus.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to prepare your system for this exercise.

(.venv) [user@host AD482]$ lab start kafka-consumers
...output omitted...
 · Verifying your Kafka settings ...output omitted...
    - Bootstrap Server: my-cluster-kafka-cluster.apps.cluster.example.com
    - Bootstrap Port: 443
...output omitted...

The script creates the humidity-conditions topic for you. You will create a consumer that reads data from this topic.

Copy the Bootstrap Server and Bootstrap Port values. You will use these values to connect to the Kafka cluster.

The lab command copies the exercise files, from the AD482-apps/kafka-consumers/apps directory, which is in your local Git repository, into the kafka-consumers directory, which is in your workspace.

Procedure 2.3. Instructions

  1. Implement a basic consumer by using the KafkaConsumer class of the Kafka Java client.

    1. Open the code for this exercise. From your AD482 workspace, navigate to the kafka-consumers/plain directory. This directory contains the basic Java application that you will use to implement the consumer.

    2. Specify the Kafka configuration properties. You will need these properties when creating new instances of the KafkaConsumer class.

      Open the src/main/java/com/redhat/telemetry/ConsumerApp.java file and complete the configureProperties method with the Kafka configuration properties, as follows:

      private static Properties configureProperties() {
          Properties props = new Properties();
      
          // TODO: Add Kafka configuration properties
          props.put(
              ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
              "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT" 1
          );
          props.put(ConsumerConfig.GROUP_ID_CONFIG, "humidityMonitoring"); 2
          props.put(
              ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.StringDeserializer" 3
          );
          props.put(
              ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
              "org.apache.kafka.common.serialization.IntegerDeserializer" 4
          );
          props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); 5
          props.put(
              SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
              "ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks" 6
          );
          props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password"); 7
      
          return props;
      }

      1

      Bootstrap server host and port

      2

      Consumer group ID

      3

      Record key deserializer

      4

      Record value deserializer

      5

      Communication protocol

      6

      Absolute path to the location of the TrustStore file

      7

      Password for the TrustStore file

      Make sure to replace the YOUR_KAFKA_BOOTSTRAP_HOST and YOUR_KAFKA_BOOTSTRAP_PORT strings with the values from the lab start script. You must also add the absolute paths to the key store and the trust store files that you created in the first exercise.

    3. In the same file, create an instance of KafkaConsumer in the main method and subscribe the consumer to the humidity-conditions topic.

      public static void main(String[] args) {
          // TODO: Create Kafka consumer
          Consumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties());
          consumer.subscribe(Collections.singletonList("humidity-conditions"));
      }

      Note how you create the consumer by passing the configuration properties that you previously specified.

    4. Add the code to continuously read messages from the humidity-conditions topic.

      public static void main(String[] args) {
          // TODO: Create Kafka consumer
          Consumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties());
          consumer.subscribe(Collections.singletonList("humidity-conditions"));
      
          while (true) {
              ConsumerRecords<Void, Integer> records = consumer.poll(
                  Duration.ofMinutes(1)
              );
      
              for (ConsumerRecord<Void, Integer> record : records) {
                  System.out.println("Received humidity value: " + record.value());
              }
          }
      }

      The consumer.poll blocks the execution waiting for new messages. You can control how long the poll method waits. In this case, the consumer blocks the execution for one minute.

  2. Package and start the consumer application.

    [user@host plain]$ ./mvnw package
    ...output omitted...
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    ...output omitted..
    [user@host plain]$ java -jar target/consumer-1.0-SNAPSHOT.jar
    ...output omitted...
    [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-humidityMonitoring-1, groupId=humidityMonitoring] Setting offset for partition humidity-conditions-0 to the committed offset FetchPosition ...output omitted...

    Leave the terminal window open.

  3. Verify that the consumer can read the messages produced in the humidity-conditions topic. In a new terminal window, run the provided produce_integers.py Python script to write humidity values into this topic.

    1. Open a new terminal window. Before running the Python producer script, make sure that the Python virtual environment from your workspace directory is activated.

      [user@host ~]$ cd AD482
      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$

      Important

      On Windows, use the Activate.ps1 script to activate your Python virtual environment.

      PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1
    2. Run the produce_integers.py Python script to write random humidity values to the humidity-conditions topic. This script is located in the kafka-consumers/scripts directory. Pass the topic name as the first argument to the script.

      (.venv) [user@host AD482]$ python kafka-consumers/scripts/produce_integers.py \
       humidity-conditions
      ...output omitted...
      Sent 98
      Sent 66
      Sent 91
    3. Switch back to the previous terminal window, where your Java consumer application is running. Verify that the consumer is receiving data.

      ...output omitted...
      Received humidity value: 98
      Received humidity value: 66
      Received humidity value: 91
  4. Stop the consumer Java application.

  5. Create a new Kafka consumer by using Quarkus and the SmallRye Reactive Messaging framework. This consumer should read data from the same humidity-conditions topic, aggregate the data, and expose the aggregated data through a GET endpoint.

    1. Navigate to the kafka-consumers/quarkus directory. This directory contains a basic Quarkus application.

    2. Open the src/main/resources/application.properties file to specify the Kafka configuration.

      To use the SmallRye Reactive Messaging framework, you must configure an incoming channel and a connector. The channel is how you identify messages from a given topic in a Java application. In this case, the incoming channel name is humidityConditions. The channel is attached to the humidity-conditions Kafka topic. The connector is a specific implementation to connect to a message broker. In this case, you use the Kafka connector.

      Set the configuration values as follows:

      kafka.bootstrap.servers = YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT 1
      
      mp.messaging.incoming.humidityConditions.topic = humidity-conditions 2
      
      mp.messaging.incoming.humidityConditions.value.deserializer = org.apache.kafka.common.serialization.IntegerDeserializer 3
      mp.messaging.incoming.humidityConditions.auto.offset.reset = latest 4
      mp.messaging.incoming.humidityConditions.group.id = humidityMonitoring 5
      
      kafka.security.protocol = SSL 6
      kafka.ssl.truststore.location = /ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks 7
      kafka.ssl.truststore.password = password 8
      
      mp.messaging.incoming.humidityConditions.connector = smallrye-kafka 9

      1

      Bootstrap server host and port.

      2

      Kafka topic to read data from.

      3

      Record value deserializer.

      4

      Offset reset policy. In this case, consumption starts at the latest offset.

      5

      Consumer group ID.

      6

      Communication protocol.

      7

      Absolute path to the location of the TrustStore file.

      8

      Password for the TrustStore file.

      9

      Kafka connector.

    3. Open the src/main/java/com/redhat/telemetry/HumidityConditionsConsumer.java file, and implement the consumer.

      @Singleton
      public class HumidityConditionsConsumer {
      
          public HumidityStats stats = new HumidityStats();
      
          @Incoming("humidityConditions") 1
          public void consume(int humidityValue) { 2
              stats.add(humidityValue); 3
              System.out.println("Received humidity value: " + humidityValue);
          }
      
      }

      1

      The @Incoming annotation specifies the SmallRye incoming data channel and starts the consumption process.

      2

      The method to be executed when the Kafka connector reads a new value from the topic. SmallRye passes the new value as the humidityValue parameter.

      3

      Handle the received value. In this case, the stats object processes the value.

    4. In the src/main/java/com/redhat/telemetry/HumidityStatsResource.java file, implement the REST endpoint to get the humidity stats.

      @Path("/humidity")
      public class HumidityStatsResource {
      
          @Inject
          HumidityConditionsConsumer consumer;
      
          @GET
          public HumidityStats getHumidityStats() {
              return consumer.stats;
          }
      }
    5. Run the Quarkus application.

      [user@host quarkus]$ ./mvnw clean package quarkus:dev
      ...output omitted...
      Received humidity value: 75
      Received humidity value: 97

      Recall that the producer is still running and writing messages. The consumer should start receiving those messages after a few seconds.

    6. Navigate to the http://localhost:8080/humidity route. The response should be similar to the following:

      {
          latest: 0.41,
          count: 43,
          sum: 21.650003,
          average: 0.5034885
      }

      If you make additional requests to the same route, then the response should change, as the consumer reads new values.

  6. Stop the producer and the Quarkus consumer.

Finish

Go back to your workspace. Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish kafka-consumers

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c