Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this lab, you will implement the business logic of a small application for an electric company. The application has two components:
The
WindTurbineclass simulates the energy generated by a wind turbine. This system will send each measured value to a Kafka topic.The
ReportingSystemclass processes the records sent by the wind turbines in batches of five. The reporting tool will sum the generated energy values of each block so the company can analyze the energy production.
Outcomes
You should be able to create a topic, send records to a topic with a producer, and process the records available in a topic with a consumer.
You will find the solution files for this exercise in the AD482-apps repository, within the kafka-basics/solutions directory.
Notice that you might need to replace some strings in the Java files.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start kafka-basics
Copy the Bootstrap Server, Bootstrap Port, and Cluster Namespace values.
You will use these values to connect to the Kafka cluster.
The lab command copies the exercise files, from the AD482-apps/kafka-basics/apps directory, which is in your local Git repository, into the kafka-basics directory, which is in your workspace.
Procedure 2.5. Instructions
In your workspace, move to the
kafka-basicsdirectory. Create a topic that conforms to the following specifications:Table 2.1. Topic Specifications
Name wind-turbine-productionPartitions 2Retention time (ms) 604800000Segment bytes 1073741824Cluster my-clusterBy using your editor of choice, open the
resources/application-topic.yamlfile, and use the Red Hat AMQ Streams topic operator to create the topic.The file content should look like the following:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: wind-turbine-production labels: strimzi.io/cluster: my-cluster spec: partitions: 2 replicas: 1 config: retention.ms: 604800000 segment.bytes: 1073741824Verify that your project is set to
.RHT_OCP4_DEV_USER-kafka-cluster(.venv) [user@host kafka-basics]$
oc projectUsing project "RHT_OCP4_DEV_USER-kafka-cluster" on server ...output omitted...Use the
occommand to create the topic in the Kafka cluster project.(.venv) [user@host kafka-basics]$
oc create -f resources/application-topic.yamlkafkatopic.kafka.strimzi.io/wind-turbine-production created
Move to the
energy-meterdirectory, which contains the source code of the application. Configure the Kafka connection security settings in theConfigclass. Use the following configuration properties to connect to the Kafka cluster:Security protocol: SSL
Location of your TrustStore file
Password:
password
Open the
Configclass and configure the connection protocol to use SSL in theconfigureConnectionSecuritymethod.// TODO: configure the connection protocol
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");Set the absolute path to the TrustStore file.
// TODO: configure the path to the truststore file
props.put( SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks" );Set the TrustStore password.
// TODO: configure the truststore password
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
Implement the business logic of the
WindTurbineclass. Given a sequence of electricity production values stored in an attribute namedenergyProductionSequence, create the business logic to send each one of the values to thewind-turbine-productiontopic. Send each integer record without a key.Configure the
WindTurbineclass as a producer that conforms to the following configuration:You can use the
printRecordhelper method for debugging purposes.Open the
WindTurbineclass and examine the code. You can use the helper methods for debugging purposes.Set the bootstrap server in the
configureProducermethod.// TODO: configure the bootstrap server
props.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT" );Set the key and value serializers in the
configureProducermethod.// TODO: configure the key serializer
props.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" );// TODO: configure the value serializerprops.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer" );Implement the business logic in the
mainmethod to send the energy production values to thewind-turbine-productiontopic. Do not assign a key to the records.The method should look like the following:
public static void main(String[] args) { // TODO: implement the business logicProducer<Void,Integer> producer = new KafkaProducer<>( configureProperties() ); for (int energyProduction : energyProductionSequence) { ProducerRecord<Void, Integer> record = new ProducerRecord<>( "wind-turbine-production", energyProduction ); producer.send(record); printRecord(record); } producer.close();}Execute the application to send the electricity production values to the
wind-turbine-productiontopic.(.venv) [user@host energy-meter]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.energy.meter.producer.WindTurbine"...output omitted... Sent record: Topic = wind-turbine-production Partition = null Key = null Value = 700 ...output omitted...
Implement the business logic of the
ReportingSystemclass. This class consumes the measures available in thewind-turbine-productiontopic, sums the values of every 5 processed records, and stores the value in a file namedreport.txt.Configure the
ReportingSystemclass as a consumer that conforms to the following configuration:Table 2.3. Consumer Configuration
Group ID reportingSystemKey deserializer StringDeserializerValue deserializer IntegerDeserializerOffset reset config earliestYou can use the print helper methods for debugging purposes.
Open the
ReportingSystemclass and examine the code. You can use the helper methods for debugging purposes.Set the bootstrap server in the
configureConsumermethod.// TODO: set the bootstrap server
props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT" );Set the consumer group ID in the
configureConsumermethod.// TODO: set the consumer group ID
props.put( ConsumerConfig.GROUP_ID_CONFIG, "reportingSystem" );Set the key and value deserializers in the
configureConsumermethod.// TODO: set the key deserializer
props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" );// TODO: set the value deserializerprops.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer" );Set the offset reset configuration to
earliestin theconfigureConsumermethod.// TODO: set the offset reset config
props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" );Implement the business logic in the
mainmethod to consume the measures available in thewind-turbine-productiontopic, sum the values of every 5 processed records, and store the value in a file namedreport.txt. You must use thesaveAggregationToFilehelper method to save the sum value.The method should look like the following:
public static void main( String[] args ) throws IOException { // TODO: implement the business logicConsumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties()); consumer.subscribe(Collections.singletonList("wind-turbine-production")); while (true) { ConsumerRecords<Void, Integer> records = consumer.poll(Duration.ofSeconds(10)); int aggregatedEnergy = 0; int processedRecords = 0; for (ConsumerRecord<Void, Integer> record : records) { printRecord(record); aggregatedEnergy += record.value(); processedRecords++; if (processedRecords % 5 == 0) { printAggregation(aggregatedEnergy); saveAggregationToFile(aggregatedEnergy); aggregatedEnergy = 0; } } }}Execute the application to process the electricity production values available in the
wind-turbine-productiontopic.(.venv) [user@host energy-meter]$
./mvnw compile exec:java \ "-Dexec.mainClass=com.redhat.energy.meter.consumer.ReportingSystem"...output omitted... Received record: Topic = wind-turbine-production Partition = 1 Key = null Value = 700 Writing aggregation result to file: 2500Wait until the consumer processes 5 records and writes the sum to the
report.txtfile. Press CTRL + c to stop the application.
This concludes the lab.