Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will send telemetry measurements to Kafka.
You will create two producers:
A plain Java producer that sends a random integer to a topic named
total-connected-devices.A Quarkus producer that sends a record of temperatures to a topic composed of two partitions, and named
device-temperatures.
You will find the solution files for this exercise in the AD482-apps repository, within the kafka-producers/solutions directory.
Notice that you might need to replace some strings in the Java files.
Outcomes
You should be able to build producers that send data to Kafka topics.
From your workspace directory, with your Python virtual environment activated, use the lab command to prepare your system for this exercise.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1(.venv) [user@host AD482]$ lab start kafka-producers
Copy the Bootstrap Server and Bootstrap Port values.
You will use these values to connect to the Kafka cluster.
The lab command copies the exercise files, from the AD482-apps/kafka-producers/apps directory, which is in your local Git repository, into the kafka-producers directory, which is in your workspace.
Procedure 2.2. Instructions
From your workspace directory, navigate to the
kafka-producersdirectory.(.venv) [user@host AD482]$
cd kafka-producers(.venv) [user@host kafka-producers]$Verify that your project is set to
.RHT_OCP4_DEV_USER-kafka-cluster(.venv) [user@host kafka-producers]$
oc projectUsing project "RHT_OCP4_DEV_USER-kafka-cluster" on server ...output omitted...
Create a plain Java producer with the Kafka Java client to send events to the
total-connected-devicestopic.Examine and use the
resources/create-topic.yamltemplate to create a topic namedtotal-connected-devices.(.venv) [user@host kafka-producers]$
oc process \ -f resources/create-topic.yaml \ -p TOPIC_NAME='total-connected-devices' \ | oc apply -f -kafkatopic.kafka.strimzi.io/total-connected-devices createdNote
By default, the template configures the topic to have 1 partition, and a replication factor of 1.
Move to the
plaindirectory.(.venv) [user@host kafka-producers]$
cd plainBy using your editor of choice, open the
producer/src/main/java/com/redhat/telemetry/producer/ProducerApp.javafile, and examine the code. The file includes all the required imports and defines two methods. You will write the implementation of these methods in this guided exercise.package com.redhat.telemetry.producer; import java.util.Properties; import java.util.Random; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SslConfigs;
public class ProducerApp { public static Properties configureProperties() { Properties props = new Properties(); ...output omitted... return props; } public static void main(String[] args) { ...output omitted... } private static void printRecord(ProducerRecord record) { ...output omitted... } }
Write the implementation of the
configureProperties()method.To configure the Kafka settings, use the
Propertiesclass to represent a set of properties, and define:The bootstrap server
The record key serializer
The record value serializer
The type of connection
The location and passwords of the TrustStore file
The method should look like the following:
public static Properties configureProperties() { Properties props = new Properties(); // TODO: configure the bootstrap serverprops.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT");// TODO: configure the key and value serializersprops.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");// TODO: configure the SSL connectionprops.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");props.put( SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks");props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");return props; }
Write the implementation of the
main()method to send 10 random integers to thetotal-connected-devicestopic. Use theprintRecord()helper method to print the record sent to the topic in the console.The method should look like the following:
public static void main(String[] args) { // TODO: Implement the Kafka producerRandom random = new Random();Producer<Void,Integer> producer = new KafkaProducer<>(configureProperties() );for (int i = 0; i < 10; i++) {ProducerRecord<Void, Integer> record = new ProducerRecord<>("total-connected-devices",random.nextInt(100)); producer.send(record);printRecord(record); }producer.close();}
Creates a random object.
Creates a
KafkaProducerwith the configuration returned by theconfigureProperties()method. The default acknowledgment mode is1.Executes the logic of sending a random integer to the topic 10 times.
Creates a record.
Sets the topic that is going to receive the record.
Sets a random integer as the value of the record.
Sends the record to Kafka asynchronously.
Closes the connection to Kafka.
In your command-line terminal, navigate to the
kafka-producers/plaindirectory. Compile the application.(.venv) [user@host plain]$
./mvnw clean package...output omitted... [INFO]BUILD SUCCESS...output omitted...
In a new command-line terminal, execute the
viewerapplication and leave it open. This application listens for records sent to thetotal-connected-devicestopic. Then, run theproducerapplication in your other command-terminal and verify that theviewerapplication is receiving the records.Open a new command-line terminal and move to the
kafka-producers/plaindirectory in your workspace. Run the Java executable file for theviewerapplication. This application keeps on waiting for new messages to be consumed.(.venv) [user@host plain]$
java -jar viewer/target/viewer-1.0.jar...output omitted... Waiting for events... Waiting for events...In your other command-line terminal, run the Java executable file for the
producerapplication. This application sends several messages to Kafka.(.venv) [user@host plain]$
java -jar producer/target/producer-1.0.jar...output omitted... Sent record: Topic = total-connected-devices Partition = null Key = null Value = 89Verify that the
viewerapplication is receiving the records sent by theproducerapplication....output omitted... Received total-connected-devices: 24 Received total-connected-devices: 30 Received total-connected-devices: 91 Received total-connected-devices: 53 Received total-connected-devices: 82 Received total-connected-devices: 27 Received total-connected-devices: 89 Waiting for events...Press CTRL + c to stop the command. Close the command-line terminal.
Create a producer in Quarkus to send events to the
device-temperaturestopic, and use theDevice IDas the ID of the records.Navigate to the
kafka-producersdirectory.(.venv) [user@host plain]$
cd ..(.venv) [user@host kafka-producers]$Use the
resources/create-topic.yamltemplate to create a topic nameddevice-temperatureswith 2 partitions.(.venv) [user@host kafka-producers]$
oc process \ -f resources/create-topic.yaml \ -p TOPIC_NAME='device-temperatures' \ -p PARTITIONS=2 \ | oc apply -f -kafkatopic.kafka.strimzi.io/device-temperatures createdMove to the
quarkusdirectory.(.venv) [user@host kafka-producers]$
cd quarkusBy using your editor of choice, open the
src/main/resources/application.propertiesfile, and configure the following Kafka settings:The bootstrap server
The record key serializer
The record value serializer
The type of connection
The location and passwords of the TrustStore file
The Kafka connector
The file content should look like the following:
# TODO: configure the bootstrap server
kafka.bootstrap.servers = YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT# TODO: configure the key and value serializers
mp.messaging.outgoing.device-temperatures.key.serializer = org.apache.kafka.common.serialization.StringSerializermp.messaging.outgoing.device-temperatures.value.serializer = org.apache.kafka.common.serialization.IntegerSerializer# TODO: configure the SSL connection
kafka.security.protocol = SSLkafka.ssl.truststore.location = ABSOLUTE_PATH_TO_WORKSPACE_FOLDER/truststore.jkskafka.ssl.truststore.password = password# TODO: configure the Kafka connector
mp.messaging.outgoing.device-temperatures.connector = smallrye-kafka# TODO: disable Quarkus dev services
quarkus.kafka.devservices.enabled = falseBootstrap server host and port
Record key serializer
Record value serializer
Communication protocol
Absolute path to the location of the TrustStore file
Password for the TrustStore file
Kafka connector
Disable Quarkus Dev Services. When you run Quarkus in dev mode, a transient failure in the connection to the Kafka broker triggers the creation of a local Kafka cluster. To avoid this happening while testing the application, you must disable Dev Services.
Important
If you copy configuration values from the preceding code block, then make sure to remove trailing spaces.
Trailing spaces in configuration properties might cause unexpected errors, such as Quarkus not recognizing the security protocol or not finding the Kafka connector.
Open the
src/main/java/com/redhat/telemetry/producer/ProducerApp.javafile, and examine the code. The file includes all the required imports.package com.redhat.telemetry.producer; import java.time.Duration; import java.util.Random; import javax.enterprise.context.ApplicationScoped; import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.kafka.Record;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.jboss.logging.Logger; @ApplicationScoped public class ProducerApp { private static final Logger LOG = Logger.getLogger(ProducerApp.class); private final Random random = new Random(); // TODO: Implement the Kafka producer }
Write the implementation of the Kafka producer to send random integers to the
device-temperaturestopic.The method should look like the following:
// TODO: Implement the Kafka producer
@Outgoing("device-temperatures")public Multi<Record<String, Integer>> generate() { return Multi.createFrom().ticks().every(Duration.ofSeconds(1)).onOverflow().drop() .map(tick -> { String currentDevice = "device-" + random.nextInt(10); int currentMeasure = random.nextInt(100); LOG.infov("Device ID: {0}, measure: {1}", currentDevice, currentMeasure ); return Record.of(currentDevice, currentMeasure);}); }Start the producer application.
(.venv) [user@host quarkus]$
./mvnw clean package quarkus:dev...output omitted... ...output omitted... Device ID: device-8, measure: 94 ...output omitted... Device ID: device-2, measure: 20 ...output omitted... Device ID: device-5, measure: 9 ...output omitted... Device ID: device-8, measure: 92 ...output omitted...
Visualize the messages sent to the
device-temperaturestopic by using a helper script.Open a new terminal window, and navigate to your workspace directory. Make sure that the Python virtual environment is active.
[user@host AD482]$
source .venv/bin/activate(.venv) [user@host AD482]$Important
On Windows, use the
Activate.ps1script to activate your Python virtual environment.PS C:\Users\user\AD482>
./.venv/Scripts/Activate.ps1Run the
consume_integers.pyhelper script located in thekafka-producers/scriptsdirectory to read all the messages available in thedevice-temperaturestopic. It might take several seconds to connect to the Kafka cluster and start pulling messages from the topic.(.venv) [user@host AD482]$
python kafka-producers/scripts/consume_integers.py \ device-temperatures...output omitted... >>>> Partition: 1 Key: device-8 Measurement: 94 >>>> Partition: 1 Key: device-8 Measurement: 92 >>>> ...output omitted...Notice that all the messages with the same key have the same partition assigned.
Stop the command, and close the terminal.
Return to the Quarkus terminal, stop the producer application, and navigate to the workspace directory.
This concludes the guided exercise.