Bookmark this page

Guided Exercise: Partitioning Stream Processing for Scalability

In this exercise you will scale up the application to see the behavior of Kafka Streams with two application instances.

The example application for this exercise reads temperature measurements for different locations from a Kafka topic.

Outcomes

You should be able to scale up a Stream application by repartitioning the events stored in a topic.

You will find the solution files for this exercise in the AD482-apps repository, within the collaboration-partition/solutions directory.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start collaboration-partition
...output omitted...

The lab command copies the exercise files, from the AD482-apps/collaboration-partition/apps directory, which is in your local Git repository, into the collaboration-partition directory, which is in your workspace. It also copies the .env-streams file from your workspace to the temperature-monitor application directory as the .env file.

Procedure 4.3. Instructions

  1. Move to the collaboration-partition directory.

    1. From your workspace directory, open a command line terminal, and move to the collaboration-partition directory.

      (.venv) [user@host AD482]$ cd collaboration-partition
      (.venv) [user@host collaboration-partition]$
  2. Examine the application code, which is under the temperature-monitor directory.

    1. By using your editor of choice, open the TransformTemperature class to observe the stream topology.

      The topology consumes the events available in the temperatures-in-celsius topic, converts the temperature from Celsius to Fahrenheit, and finally writes the transformation into the measured-temperatures topic. The lab start command created both topics with one partition.

  3. Move to the temperature-monitor directory and run two instances of the stream application to process the Celsius temperature measurements.

    1. Return to the command line terminal, move to the temperature-monitor directory, and start the application.

      (.venv) [user@host collaboration-partition]$ cd temperature-monitor
      (.venv) [user@host temperature-monitor]$ ./mvnw clean package quarkus:dev
      ...output omitted...
    2. Open a new terminal window, navigate to the collaboration-partition/temperature-monitor directory of your workspace, and run another instance of the stream application.

      [user@host temperature-monitor]$ ./mvnw clean package quarkus:dev
      ...output omitted...
    3. Wait until both instances change their status from REBALANCING to RUNNING.

      ...output omitted...
      ... INFO [org.apa.kaf.str.KafkaStreams] ... State transition from REBALANCING to RUNNING
  4. Run the provided produce_temperatures.py Python script to generate 5 temperature measurements in Celsius degrees.

    The script sends an event composed of the following fields:

    • locationId: random integer between 1 and 10 that indicates the location of the temperature measurement.

    • measure: random integer between -15 and 50 that simulates a temperature measurement in Celsius degrees.

    1. Open a new terminal window, navigate to your workspace directory, and activate the Python virtual environment.

      [user@host ~]$ cd AD482
      [user@host AD482]$ source .venv/bin/activate
      (.venv) [user@host AD482]$

      Important

      On Windows, use the Activate.ps1 script to activate your Python virtual environment.

      PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1
    2. Run the produce_temperatures.py Python script. You must provide the topic and the number of events to produce. Produce 5 events.

      (.venv) [user@host AD482]$ python \
       collaboration-partition/scripts/produce_temperatures.py \
       temperatures-in-celsius 5
      Sending 5 temperatures to the 'temperatures-in-celsius' topic...
      Temperature sent - 46ºC  (Location ID: 3)
      ...output omitted...
      Temperature sent - -11ºC  (Location ID: 2)
  5. Verify that only one of the instances is processing events because the number of partitions of the temperatures-in-celsius topic is 1.

    1. Return to the terminal windows that are running the streams application and verify that only one of the instances processes the events.

      ...output omitted...
      ... INFO [com.red.mon.str.TransformTemperature] ... Transforming 32ºC to ºF...
      ... INFO [com.red.mon.str.TransformTemperature] ... Temp. transformed 32ºC -> 89.6ºF (ID: 7)
    2. Terminate all the stream application instances by pressing CTRL + c.

  6. Repartition the temperature measurements stream. Send the events to a new topic named temperatures-in-celsius-repartitioned that has 2 partitions. Use the locationId field as key in the new topic, and perform the temperature transformations by consuming the events sent to this new topic.

    Note

    The lab start command created the topic with 2 partitions.

    1. Open the TransformTemperature class and update the reading topic to use temperatures-in-celsius-repartitioned.

      ...output omitted...
      
      // Reading topic
      static final String TEMPERATURES_TOPIC = "temperatures-in-celsius-repartitioned";
      
      ...output omitted...
    2. Rename the RepartitionStream.java.TEMPLATE file to RepartitionStream.java, and open the file.

    3. Implement the repartitioning in the onStart method.

      The repartition implementation should look like the following:

      // TODO: Implement the topology for the repartitioning
      stream.map(
          (key, measure) -> {
              LOGGER.infov(
                  "Repartitioning ID {0}, {1}ºC ...",
                  measure.locationId, measure.measure
              );
      
              return new KeyValue<>(measure.locationId, measure);
          }
      ).to(
          TEMPERATURES_REPARTITIONED_TOPIC,
          Produced.with(Serdes.Integer(), temperaturesEventSerde)
      );
  7. Execute two instances of the application to scale up the transformation process.

    1. Return to the command line terminal, and start two instances of the temperature-monitor application. Wait until both instances change their status from REBALANCING to RUNNING.

      ...output omitted...
      ... INFO [org.apa.kaf.str.KafkaStreams] ... State transition from REBALANCING to RUNNING

      Notice that the repartition stream consumes the events already stored in the temperatures-in-celsius topic and sends them to the temperatures-in-celsius-repartitioned. When the measurement events arrive to the repartitioned topic, the load is balanced between the two instances of the application.

    2. Return to the terminal window you used to run the produce_temperatures.py Python script, and produce 10 new temperature measurements.

      (.venv) [user@host AD482]$ python \
       collaboration-partition/scripts/produce_temperatures.py \
       temperatures-in-celsius 10
      Sending 10 temperatures to the 'temperatures-in-celsius' topic...
      Temperature sent - 13ºC  (Location ID: 8)
      ...output omitted...
      Temperature sent - 20ºC  (Location ID: 3)
    3. Verify that all the application instances are processing events.

      ...output omitted...
      ... INFO [com.red.mon.str.TransformTemperature] ... Transforming 6ºC to ºF...
      ... INFO [com.red.mon.str.TransformTemperature] ... Temp. transformed 6ºC -> 42.8ºF (ID: 3)
      ...output omitted...

      Notice that, when the Python script sends new temperatures to the temperatures-in-celsius topic, the RepartitionStream class repartition the events to the temperatures-in-celsius-repartitioned topic. The temperatures-in-celsius-repartitioned topic has two partitions and the load is balanced between the two instances of the application. Each instance is responsible for processing the events stored in one partition.

  8. Stop the stream applications, and close the terminal windows you used for running the stream application.

Finish

Go back to your workspace. Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish collaboration-partition

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c