Bookmark this page

Lab: Introducing Kafka and AMQ Streams Concepts

In this lab, you will implement the business logic of a small application for an electric company. The application has two components:

  • The WindTurbine class simulates the energy generated by a wind turbine. This system will send each measured value to a Kafka topic.

  • The ReportingSystem class processes the records sent by the wind turbines in batches of five. The reporting tool will sum the generated energy values of each block so the company can analyze the energy production.

Outcomes

You should be able to create a topic, send records to a topic with a producer, and process the records available in a topic with a consumer.

You will find the solution files for this exercise in the AD482-apps repository, within the kafka-basics/solutions directory. Notice that you might need to replace some strings in the Java files.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start kafka-basics

Copy the Bootstrap Server, Bootstrap Port, and Cluster Namespace values. You will use these values to connect to the Kafka cluster.

The lab command copies the exercise files, from the AD482-apps/kafka-basics/apps directory, which is in your local Git repository, into the kafka-basics directory, which is in your workspace.

Procedure 2.5. Instructions

  1. In your workspace, move to the kafka-basics directory. Create a topic that conforms to the following specifications:

    Table 2.1. Topic Specifications

    Name wind-turbine-production
    Partitions 2
    Retention time (ms) 604800000
    Segment bytes 1073741824
    Cluster my-cluster

    1. By using your editor of choice, open the resources/application-topic.yaml file, and use the Red Hat AMQ Streams topic operator to create the topic.

      The file content should look like the following:

      apiVersion: kafka.strimzi.io/v1beta2
      kind: KafkaTopic
      metadata:
        name: wind-turbine-production
        labels:
          strimzi.io/cluster: my-cluster
      spec:
        partitions: 2
        replicas: 1
        config:
          retention.ms: 604800000
          segment.bytes: 1073741824
    2. Verify that your project is set to RHT_OCP4_DEV_USER-kafka-cluster.

      (.venv) [user@host kafka-basics]$ oc project
      Using project "RHT_OCP4_DEV_USER-kafka-cluster" on server ...output omitted...
    3. Use the oc command to create the topic in the Kafka cluster project.

      (.venv) [user@host kafka-basics]$ oc create -f resources/application-topic.yaml
      kafkatopic.kafka.strimzi.io/wind-turbine-production created
  2. Move to the energy-meter directory, which contains the source code of the application. Configure the Kafka connection security settings in the Config class. Use the following configuration properties to connect to the Kafka cluster:

    • Security protocol: SSL

    • Location of your TrustStore file

    • Password: password

    1. Open the Config class and configure the connection protocol to use SSL in the configureConnectionSecurity method.

      // TODO: configure the connection protocol
      props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
    2. Set the absolute path to the TrustStore file.

      // TODO: configure the path to the truststore file
      props.put(
          SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
          "ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks"
      );
    3. Set the TrustStore password.

      // TODO: configure the truststore password
      props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "password");
  3. Implement the business logic of the WindTurbine class. Given a sequence of electricity production values stored in an attribute named energyProductionSequence, create the business logic to send each one of the values to the wind-turbine-production topic. Send each integer record without a key.

    Configure the WindTurbine class as a producer that conforms to the following configuration:

    Table 2.2. Producer Configuration

    Key serializer StringSerializer
    Value serializer IntegerSerializer

    You can use the printRecord helper method for debugging purposes.

    1. Open the WindTurbine class and examine the code. You can use the helper methods for debugging purposes.

    2. Set the bootstrap server in the configureProducer method.

      // TODO: configure the bootstrap server
      props.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
          "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT"
      );
    3. Set the key and value serializers in the configureProducer method.

      // TODO: configure the key serializer
      props.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer"
      );
      
      // TODO: configure the value serializer
      props.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.IntegerSerializer"
      );
    4. Implement the business logic in the main method to send the energy production values to the wind-turbine-production topic. Do not assign a key to the records.

      The method should look like the following:

      public static void main(String[] args) {
          // TODO: implement the business logic
          Producer<Void,Integer> producer = new KafkaProducer<>(
                  configureProperties()
          );
      
          for (int energyProduction : energyProductionSequence) {
              ProducerRecord<Void, Integer> record = new ProducerRecord<>(
                      "wind-turbine-production",
                      energyProduction
              );
      
              producer.send(record);
              printRecord(record);
          }
      
          producer.close();
      }
    5. Execute the application to send the electricity production values to the wind-turbine-production topic.

      (.venv) [user@host energy-meter]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.energy.meter.producer.WindTurbine"
      ...output omitted...
      Sent record:
              Topic = wind-turbine-production
              Partition = null
              Key = null
              Value = 700
      ...output omitted...
  4. Implement the business logic of the ReportingSystem class. This class consumes the measures available in the wind-turbine-production topic, sums the values of every 5 processed records, and stores the value in a file named report.txt.

    Configure the ReportingSystem class as a consumer that conforms to the following configuration:

    Table 2.3. Consumer Configuration

    Group ID reportingSystem
    Key deserializer StringDeserializer
    Value deserializer IntegerDeserializer
    Offset reset config earliest

    You can use the print helper methods for debugging purposes.

    1. Open the ReportingSystem class and examine the code. You can use the helper methods for debugging purposes.

    2. Set the bootstrap server in the configureConsumer method.

      // TODO: set the bootstrap server
      props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          "YOUR_KAFKA_BOOTSTRAP_HOST:YOUR_KAFKA_BOOTSTRAP_PORT"
      );
    3. Set the consumer group ID in the configureConsumer method.

      // TODO: set the consumer group ID
      props.put(
          ConsumerConfig.GROUP_ID_CONFIG,
          "reportingSystem"
      );
    4. Set the key and value deserializers in the configureConsumer method.

      // TODO: set the key deserializer
      props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringDeserializer"
      );
      
      // TODO: set the value deserializer
      props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.IntegerDeserializer"
      );
    5. Set the offset reset configuration to earliest in the configureConsumer method.

      // TODO: set the offset reset config
      props.put(
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
          "earliest"
      );
    6. Implement the business logic in the main method to consume the measures available in the wind-turbine-production topic, sum the values of every 5 processed records, and store the value in a file named report.txt. You must use the saveAggregationToFile helper method to save the sum value.

      The method should look like the following:

      public static void main( String[] args ) throws IOException {
          // TODO: implement the business logic
          Consumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties());
          consumer.subscribe(Collections.singletonList("wind-turbine-production"));
      
          while (true) {
              ConsumerRecords<Void, Integer> records = consumer.poll(Duration.ofSeconds(10));
      
              int aggregatedEnergy = 0;
              int processedRecords = 0;
      
              for (ConsumerRecord<Void, Integer> record : records) {
                  printRecord(record);
      
                  aggregatedEnergy += record.value();
                  processedRecords++;
      
                  if (processedRecords % 5 == 0) {
                      printAggregation(aggregatedEnergy);
                      saveAggregationToFile(aggregatedEnergy);
                      aggregatedEnergy = 0;
                  }
              }
          }
      }
    7. Execute the application to process the electricity production values available in the wind-turbine-production topic.

      (.venv) [user@host energy-meter]$ ./mvnw compile exec:java \
       "-Dexec.mainClass=com.redhat.energy.meter.consumer.ReportingSystem"
      ...output omitted...
      Received record:
              Topic = wind-turbine-production
              Partition = 1
              Key = null
              Value = 700
      Writing aggregation result to file: 2500
    8. Wait until the consumer processes 5 records and writes the sum to the report.txt file. Press CTRL + c to stop the application.

Evaluation

Go back to your workspace. Run the lab command to grade your work. Correct any reported failures and rerun the command until successful.

(.venv) [user@host AD482]$ lab grade kafka-basics

Finish

Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish kafka-basics

This concludes the lab.

Revision: ad482-1.8-cc2ae1c