Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will scale up the application to see the behavior of Kafka Streams with two application instances.
The example application for this exercise reads temperature measurements for different locations from a Kafka topic.
Outcomes
You should be able to scale up a Stream application by repartitioning the events stored in a topic.
You will find the solution files for this exercise in the AD482-apps repository, within the collaboration-partition/solutions directory.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start collaboration-partition
...output omitted...
The lab command copies the exercise files, from the AD482-apps/collaboration-partition/apps directory, which is in your local Git repository, into the collaboration-partition directory, which is in your workspace.
It also copies the .env-streams file from your workspace to the temperature-monitor application directory as the .env file.
Procedure 4.3. Instructions
Move to the
collaboration-partitiondirectory.From your workspace directory, open a command line terminal, and move to the
collaboration-partitiondirectory.(.venv) [user@host AD482]$
cd collaboration-partition(.venv) [user@host collaboration-partition]$
Examine the application code, which is under the
temperature-monitordirectory.By using your editor of choice, open the
TransformTemperatureclass to observe the stream topology.The topology consumes the events available in the
temperatures-in-celsiustopic, converts the temperature from Celsius to Fahrenheit, and finally writes the transformation into themeasured-temperaturestopic. Thelab startcommand created both topics with one partition.
Move to the
temperature-monitordirectory and run two instances of the stream application to process the Celsius temperature measurements.Return to the command line terminal, move to the
temperature-monitordirectory, and start the application.(.venv) [user@host collaboration-partition]$
cd temperature-monitor(.venv) [user@host temperature-monitor]$./mvnw clean package quarkus:dev...output omitted...Open a new terminal window, navigate to the
collaboration-partition/temperature-monitordirectory of your workspace, and run another instance of the stream application.[user@host temperature-monitor]$
./mvnw clean package quarkus:dev...output omitted...Wait until both instances change their status from REBALANCING to RUNNING.
...output omitted... ... INFO [org.apa.kaf.str.KafkaStreams] ...
State transition from REBALANCING to RUNNING
Run the provided
produce_temperatures.pyPython script to generate 5 temperature measurements in Celsius degrees.The script sends an event composed of the following fields:
locationId: random integer between 1 and 10 that indicates the location of the temperature measurement.measure: random integer between -15 and 50 that simulates a temperature measurement in Celsius degrees.
Open a new terminal window, navigate to your workspace directory, and activate the Python virtual environment.
[user@host ~]$
cd AD482[user@host AD482]$source .venv/bin/activate(.venv) [user@host AD482]$Important
On Windows, use the
Activate.ps1script to activate your Python virtual environment.PS C:\Users\user\AD482>
./.venv/Scripts/Activate.ps1Run the
produce_temperatures.pyPython script. You must provide the topic and the number of events to produce. Produce 5 events.(.venv) [user@host AD482]$
python \ collaboration-partition/scripts/produce_temperatures.py \ temperatures-in-celsius 5Sending 5 temperatures to the 'temperatures-in-celsius' topic... Temperature sent - 46ºC (Location ID: 3) ...output omitted... Temperature sent - -11ºC (Location ID: 2)
Verify that only one of the instances is processing events because the number of partitions of the
temperatures-in-celsiustopic is 1.Return to the terminal windows that are running the streams application and verify that only one of the instances processes the events.
...output omitted... ... INFO [com.red.mon.str.TransformTemperature] ... Transforming 32ºC to ºF... ... INFO [com.red.mon.str.TransformTemperature] ... Temp. transformed 32ºC -> 89.6ºF (ID: 7)Terminate all the stream application instances by pressing CTRL + c.
Repartition the temperature measurements stream. Send the events to a new topic named
temperatures-in-celsius-repartitionedthat has 2 partitions. Use thelocationIdfield as key in the new topic, and perform the temperature transformations by consuming the events sent to this new topic.Note
The
lab startcommand created the topic with 2 partitions.Open the
TransformTemperatureclass and update the reading topic to usetemperatures-in-celsius-repartitioned....output omitted... // Reading topic static final String TEMPERATURES_TOPIC =
"temperatures-in-celsius-repartitioned"; ...output omitted...Rename the
RepartitionStream.java.TEMPLATEfile toRepartitionStream.java, and open the file.Implement the repartitioning in the
onStartmethod.The repartition implementation should look like the following:
// TODO: Implement the topology for the repartitioning
stream.map( (key, measure) -> { LOGGER.infov( "Repartitioning ID {0}, {1}ºC ...", measure.locationId, measure.measure ); return new KeyValue<>(measure.locationId, measure); } ).to( TEMPERATURES_REPARTITIONED_TOPIC, Produced.with(Serdes.Integer(), temperaturesEventSerde) );
Execute two instances of the application to scale up the transformation process.
Return to the command line terminal, and start two instances of the
temperature-monitorapplication. Wait until both instances change their status from REBALANCING to RUNNING....output omitted... ... INFO [org.apa.kaf.str.KafkaStreams] ...
State transition from REBALANCING to RUNNINGNotice that the repartition stream consumes the events already stored in the
temperatures-in-celsiustopic and sends them to thetemperatures-in-celsius-repartitioned. When the measurement events arrive to the repartitioned topic, the load is balanced between the two instances of the application.Return to the terminal window you used to run the
produce_temperatures.pyPython script, and produce 10 new temperature measurements.(.venv) [user@host AD482]$
python \ collaboration-partition/scripts/produce_temperatures.py \ temperatures-in-celsius 10Sending 10 temperatures to the 'temperatures-in-celsius' topic... Temperature sent - 13ºC (Location ID: 8) ...output omitted... Temperature sent - 20ºC (Location ID: 3)Verify that all the application instances are processing events.
...output omitted... ... INFO [com.red.mon.str.TransformTemperature] ... Transforming 6ºC to ºF... ... INFO [com.red.mon.str.TransformTemperature] ... Temp. transformed 6ºC -> 42.8ºF (ID: 3) ...output omitted...
Notice that, when the Python script sends new temperatures to the
temperatures-in-celsiustopic, theRepartitionStreamclass repartition the events to thetemperatures-in-celsius-repartitionedtopic. Thetemperatures-in-celsius-repartitionedtopic has two partitions and the load is balanced between the two instances of the application. Each instance is responsible for processing the events stored in one partition.
Stop the stream applications, and close the terminal windows you used for running the stream application.
This concludes the guided exercise.