Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will implement Kafka consumers with the Java Kafka client library, and the SmallRye Reactive Messaging framework in Quarkus.
Outcomes
You should be able to implement a Kafka consumer using the plain Java client and the SmallRye Reactive Messaging extension for Quarkus.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to prepare your system for this exercise.
(.venv) [user@host AD482]$ lab start kafka-consumers
...output omitted...
· Verifying your Kafka settings ...output omitted...
- Bootstrap Server: my-cluster-kafka-cluster.apps.cluster.example.com
- Bootstrap Port: 443
...output omitted...
The script creates the humidity-conditions topic for you.
You will create a consumer that reads data from this topic.
Copy the Bootstrap Server and Bootstrap Port values.
You will use these values to connect to the Kafka cluster.
The lab command copies the exercise files, from the AD482-apps/kafka-consumers/apps directory, which is in your local Git repository, into the kafka-consumers directory, which is in your workspace.
Procedure 2.3. Instructions
Implement a basic consumer by using the
KafkaConsumerclass of the Kafka Java client.Open the code for this exercise. From your
AD482workspace, navigate to thekafka-consumers/plaindirectory. This directory contains the basic Java application that you will use to implement the consumer.Specify the Kafka configuration properties. You will need these properties when creating new instances of the
KafkaConsumerclass.Open the
src/main/java/com/redhat/telemetry/ConsumerApp.javafile and complete theconfigurePropertiesmethod with the Kafka configuration properties, as follows:private static Properties configureProperties() { Properties props = new Properties(); // TODO: Add Kafka configuration propertiesprops.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT");props.put(ConsumerConfig.GROUP_ID_CONFIG, "humidityMonitoring");props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");props.put( SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks");props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");return props; }
Bootstrap server host and port
Consumer group ID
Record key deserializer
Record value deserializer
Communication protocol
Absolute path to the location of the TrustStore file
Password for the TrustStore file
Make sure to replace the
YOUR_KAFKA_BOOTSTRAP_HOSTandYOUR_KAFKA_BOOTSTRAP_PORTstrings with the values from thelab startscript. You must also add the absolute paths to the key store and the trust store files that you created in the first exercise.In the same file, create an instance of
KafkaConsumerin themainmethod and subscribe the consumer to thehumidity-conditionstopic.public static void main(String[] args) { // TODO: Create Kafka consumerConsumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties()); consumer.subscribe(Collections.singletonList("humidity-conditions"));}Note how you create the consumer by passing the configuration properties that you previously specified.
Add the code to continuously read messages from the
humidity-conditionstopic.public static void main(String[] args) { // TODO: Create Kafka consumer Consumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties()); consumer.subscribe(Collections.singletonList("humidity-conditions"));while (true) { ConsumerRecords<Void, Integer> records = consumer.poll( Duration.ofMinutes(1) ); for (ConsumerRecord<Void, Integer> record : records) { System.out.println("Received humidity value: " + record.value()); } }}The
consumer.pollblocks the execution waiting for new messages. You can control how long thepollmethod waits. In this case, the consumer blocks the execution for one minute.
Package and start the consumer application.
[user@host plain]$
./mvnw package...output omitted... [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ ...output omitted.. [user@host plain]$java -jar target/consumer-1.0-SNAPSHOT.jar...output omitted... [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-humidityMonitoring-1, groupId=humidityMonitoring] Setting offset for partition humidity-conditions-0 to the committed offset FetchPosition ...output omitted...Leave the terminal window open.
Verify that the consumer can read the messages produced in the
humidity-conditionstopic. In a new terminal window, run the providedproduce_integers.pyPython script to write humidity values into this topic.Open a new terminal window. Before running the Python producer script, make sure that the Python virtual environment from your workspace directory is activated.
[user@host ~]$
cd AD482[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$Important
On Windows, use the
Activate.ps1script to activate your Python virtual environment.PS C:\Users\user\AD482>
./.venv/Scripts/Activate.ps1Run the
produce_integers.pyPython script to write random humidity values to thehumidity-conditionstopic. This script is located in thekafka-consumers/scriptsdirectory. Pass the topic name as the first argument to the script.(.venv) [user@host AD482]$
python kafka-consumers/scripts/produce_integers.py \ humidity-conditions...output omitted... Sent 98 Sent 66 Sent 91Switch back to the previous terminal window, where your Java consumer application is running. Verify that the consumer is receiving data.
...output omitted... Received humidity value: 98 Received humidity value: 66 Received humidity value: 91
Stop the consumer Java application.
Create a new Kafka consumer by using Quarkus and the SmallRye Reactive Messaging framework. This consumer should read data from the same
humidity-conditionstopic, aggregate the data, and expose the aggregated data through a GET endpoint.Navigate to the
kafka-consumers/quarkusdirectory. This directory contains a basic Quarkus application.Open the
src/main/resources/application.propertiesfile to specify the Kafka configuration.To use the SmallRye Reactive Messaging framework, you must configure an incoming channel and a connector. The channel is how you identify messages from a given topic in a Java application. In this case, the incoming channel name is
humidityConditions. The channel is attached to thehumidity-conditionsKafka topic. The connector is a specific implementation to connect to a message broker. In this case, you use the Kafka connector.Set the configuration values as follows:
kafka.bootstrap.servers =
YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORTmp.messaging.incoming.humidityConditions.topic = humidity-conditions
mp.messaging.incoming.humidityConditions.value.deserializer = org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.humidityConditions.auto.offset.reset = latest
mp.messaging.incoming.humidityConditions.group.id = humidityMonitoring
kafka.security.protocol = SSL
kafka.ssl.truststore.location = /
ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jkskafka.ssl.truststore.password = password
mp.messaging.incoming.humidityConditions.connector = smallrye-kafka
Bootstrap server host and port.
Kafka topic to read data from.
Record value deserializer.
Offset reset policy. In this case, consumption starts at the latest offset.
Consumer group ID.
Communication protocol.
Absolute path to the location of the TrustStore file.
Password for the TrustStore file.
Kafka connector.
Open the
src/main/java/com/redhat/telemetry/HumidityConditionsConsumer.javafile, and implement the consumer.@Singleton public class HumidityConditionsConsumer { public HumidityStats stats = new HumidityStats();@Incoming("humidityConditions")public void consume(int humidityValue) {stats.add(humidityValue);System.out.println("Received humidity value: " + humidityValue); }}The
@Incomingannotation specifies the SmallRye incoming data channel and starts the consumption process.The method to be executed when the Kafka connector reads a new value from the topic. SmallRye passes the new value as the
humidityValueparameter.Handle the received value. In this case, the
statsobject processes the value.In the
src/main/java/com/redhat/telemetry/HumidityStatsResource.javafile, implement the REST endpoint to get the humidity stats.@Path("/humidity") public class HumidityStatsResource {@Inject HumidityConditionsConsumer consumer; @GET public HumidityStats getHumidityStats() { return consumer.stats; }}Run the Quarkus application.
[user@host quarkus]$
./mvnw clean package quarkus:dev...output omitted... Received humidity value: 75 Received humidity value: 97Recall that the producer is still running and writing messages. The consumer should start receiving those messages after a few seconds.
Navigate to the
http://localhost:8080/humidityroute. The response should be similar to the following:{ latest: 0.41, count: 43, sum: 21.650003, average: 0.5034885 }If you make additional requests to the same route, then the response should change, as the consumer reads new values.
Stop the producer and the Quarkus consumer.
This concludes the guided exercise.