Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will perform stateless transformations on streams of events to extend an application that simulates the intranet of a bank.
In this application you can create bank accounts with an initial balance, and operate on those balances. It is a simple CRUD application built with Quarkus that uses Panache to persist the data in an in-memory H2 database.
You will extend the application by assigning a type to each bank account based on the current balance, and by sending event alerts to topics.
Outcomes
You should be able to perform stateless transformations on streams of events.
You can find the solution files for this exercise in the AD482-apps repository, within the collaboration-stateless/solutions directory.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to start the scenario for this exercise.
(.venv) [user@host AD482]$ lab start collaboration-stateless
The lab command copies the exercise files, from the AD482-apps/collaboration-stateless/apps/red-hat-bank directory, which is in your local Git repository, into the collaboration-stateless directory, which is in your workspace.
Procedure 4.1. Instructions
Navigate to the
collaboration-statelessdirectory.(.venv) [user@host AD482]$
cd collaboration-stateless(.venv) [user@host collaboration-stateless]$Perform a stateless transformation on the
BankAccountWasCreatedstream of events. Process all theBankAccountWasCreatedevents and update the account type based on the initial balance by using theupdateAccountTypeFromEventmethod.Open the
com.redhat.training.bank.stream.BankAccountWasCreatedPipelineclass and examine the code.This class subscribes to the
StartupEventandShutdownEventevents. You must implement the business logic in theonStartandonStopmethods.The
updateAccountTypeFromEventmethod updates the type of the new bank account based on the initial balance.If balance is lower than 100000, then the account type should be
regular,Otherwise, the type should be
premium.
Note
This application does not use beans with the
@Producesannotation to avoid an ambiguous dependency problem with the CDI when you have multiple beans for the streams.Edit the
onStartmethod and implement the business logic that updates the account type. Use theforeachstateless transformation to loop through the events available in thebank-account-creationtopic, and update the account type by using theupdateAccountTypeFromEventmethod.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Update the account type on each eventbuilder.stream( BANK_ACCOUNT_WAS_CREATED_TOPIC, Consumed.with(Serdes.Long(), eventSerde) ).foreach((key, creation) -> { updateAccountTypeFromEvent(creation); });...output omitted... }Create a
KafkaStreamobject, assign it to thestreamsobject property, and execute thestartmethod.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Create a Kafka streams and start itstreams = new KafkaStreams( builder.build(), generateStreamConfig() ); streams.start();}Close the streams processor in the
onStopmethod.The method should look like the following:
void onStop(@Observes ShutdownEvent shutdownEvent) { // TODO: Close the stream on shutdownstreams.close();}
Test the event processing implementation.
Return to the command line terminal, and start the application.
(.venv) [user@host collaboration-stateless]$
./mvnw clean package quarkus:dev...output omitted... ...output omitted... Listening on: http://localhost:8080 ...output omitted...Open a new web browser, and navigate to
http://localhost:8080. This is the Intranet page.In the area, type
3333in the field, and click . Notice that the front end displays the created account at the bottom of the page without a type.The application stores the new bank account in the database, and sends the creation event to the
bank-account-creationtopic. The processing pipeline that you just created processes this event. For that reason the bank account type is empty.Return to the command line terminal and verify that the processor executed the stateless transformation.
...output omitted... ...output omitted... BankAccountWasCreated - ID: 3, Balance: 3333 ...output omitted...
Updated Bank Account - ID: 3 - Type: regularReturn to the browser window and refresh the page. Notice that the processor updated the type of account in the background, and now the front end displays the account type. This is an example of eventual consistency. The event stored in the topic guarantees that the bank account data will be consistent at some point in the future, but not necessarily immediately.
Perform a stateless transformation on the
AmountWasDepositedstream of events. Filter all the events for a deposit greater than1000, map each filtered event to aHighValueDepositWasDetectedevent, and send it to thehigh-value-deposit-alerttopic. You can use the helper methods for debugging purposes.Open the
com.redhat.training.bank.stream.AmountWasDepositedPipelineclass and examine the code. You must implement the business logic in theonStartandonStopmethods.Edit the
onStartmethod and implement a topology that performs the following transformations on the events available in thebank-account-deposittopic:Filter all
AmountWasDepositedevents with anamountgreater than1000.Map the filtered
AmountWasDepositedevents toHighValueDepositWasDetectedevents.Send the new events to the
high-value-deposit-alerttopic, and use the depositbankAccountIdfield as the record key for the events.
The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Build the stream topologybuilder.stream( AMOUNT_WAS_DEPOSITED_TOPIC, Consumed.with(Serdes.Long(), depositEventSerde) ).filter( (key, deposit) -> deposit.amount > 1000 ).map((key, deposit) -> { logHighValueDepositAlert(deposit.bankAccountId, deposit.amount); return new KeyValue<>( deposit.bankAccountId, new HighValueDepositWasDetected(deposit.bankAccountId, deposit.amount) ); }).to( HIGH_VALUE_DEPOSIT_TOPIC, Produced.with(Serdes.Long(), highValueEventSerde) );...output omitted... }Create a
KafkaStreamobject, assign it to thestreamsobject property, and execute thestartmethod.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Create a Kafka streams and start itstreams = new KafkaStreams( builder.build(), generateStreamConfig() ); streams.start();}Close the streams processor in the
onStopmethod.The method should look like the following:
void onStop(@Observes ShutdownEvent shutdownEvent) { // TODO: Close the stream on shutdownstreams.close();}
Test the event processing implementation.
Open a new tab in the browser, and navigate to
http://localhost:8080/alerts.html. In this Alerting Events page you will see all the events fired by the Quarkus application. Keep this tab open.Return to the Intranet tab in your browser. If Quarkus did not detect the changes in your application, then refresh the page to force Quarkus to rebuild the application.
In the area, type
1500in the field, and click .Return to the command line terminal and verify that the processor executed the stateless transformation.
...output omitted... ...output omitted... AmountWasDeposited - Account ID: 1, Amount: 1500 ...output omitted...
HighValueDepositWasDetected - Account ID:1 Amount:1500Return to the Alerting Events tab in your browser, and notice that the front end displays the
HighValueDepositWasDetectedevent.
Perform a stateless transformation on the
AmountWasWithdrawnstream of events. Perform an inverse filtering on events with an amount lower than or equal to50, and split the filtered events by using the following rules:If the withdrawn amount is in the range
(50 - 1000], then send aLowRiskWithdrawnWasDetectedevent to thelow-risk-withdrawn-alerttopic.If the withdrawn amount is in the range
(1000 - 3000], then send aModerateRiskWithdrawnWasDetectedevent to themoderate-risk-withdrawn-alerttopic.In any other cases send a
HighRiskWithdrawnWasDetectedevent to thehigh-risk-withdrawn-alerttopic.
Open the
com.redhat.training.bank.stream.AmountWasWithdrawnPipelineclass and examine the code. You must implement theonStartandonStopmethods. You can use the helper methods for debugging purposes.Edit the
onStartmethod and implement an inverse filter to avoid processing low amount withdrawals. Filter events with an amount greater than50.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Add inverse filterKStream<Long, AmountWasWithdrawn> mainStream = builder.stream( AMOUNT_WAS_WITHDRAWN_TOPIC, Consumed.with(Serdes.Long(), withdrawalEventSerde) ).filterNot((key, withdrawal) -> withdrawal.amount <= 50);...output omitted... }Split the stream of events by using the following rules:
If the withdrawn amount is in the range
(50 - 1000], then send aLowRiskWithdrawnWasDetectedevent to thelow-risk-withdrawn-alerttopic.If the withdrawn amount is in the range
(1000 - 3000], then send aModerateRiskWithdrawnWasDetectedevent to themoderate-risk-withdrawn-alerttopic.In any other cases send a
HighRiskWithdrawnWasDetectedto thehigh-risk-withdrawn-alerttopic.
Process each branch completely independently, and decouple the processing logic of each one of the branches to the
processLowAmountEvents,processModerateAmountEvents, andprocessHighAmountEventsmethods.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Split the streammainStream.split() .branch( (key, withdrawal) -> withdrawal.amount > 50 && withdrawal.amount <= 1000, Branched.withConsumer(this::processLowAmountEvents) ) .branch( (key, withdrawal) -> withdrawal.amount > 1000 && withdrawal.amount <= 3000, Branched.withConsumer(this::processModerateAmountEvents) ) .branch( (key, withdrawal) -> true, Branched.withConsumer(this::processHighAmountEvents) );...output omitted... }Implement the processing logic for the branch with amounts in the range
(50 - 1000]. Edit theprocessLowAmountEventsmethod. Map the low risk branch to aLowRiskWithdrawnWasDetectedevent, and send it to thelow-risk-withdrawn-alerttopic.The method should look like the following:
private void processLowAmountEvents(KStream<Long, AmountWasWithdrawn> stream) { // TODO: process the low amount branchstream.map((key, deposit) -> { logLowRiskWithdrawn(deposit.bankAccountId, deposit.amount); return new KeyValue<>( deposit.bankAccountId, new LowRiskWithdrawnWasDetected( deposit.bankAccountId, deposit.amount ) ); }).to( LOW_RISK_WITHDRAWN_TOPIC, Produced.with(Serdes.Long(), lowRiskEventSerde) );}Implement the processing logic for the branch with amounts in the range
(1000 - 3000]. Edit theprocessModerateAmountEventsmethod. Map the moderate risk branch to aModerateRiskWithdrawnWasDetectedevent, and send it to themoderate-risk-withdrawn-alerttopic.The method should look like the following:
private void processModerateAmountEvents(KStream<Long, AmountWasWithdrawn> stream) { // TODO: process the moderate amount branchstream.map((key, deposit) -> { logModerateRiskWithdrawn(deposit.bankAccountId, deposit.amount); return new KeyValue<>( deposit.bankAccountId, new ModerateRiskWithdrawnWasDetected( deposit.bankAccountId, deposit.amount ) ); }).to( MODERATE_RISK_WITHDRAWN_TOPIC, Produced.with(Serdes.Long(), moderateRiskEventSerde) );}Implement the processing logic for the branch that covers any other cases. Edit the
processHighAmountEventsmethod. Map the high risk branch to aHighRiskWithdrawnWasDetectedevent and send it to thehigh-risk-withdrawn-alerttopic.The method should look like the following:
private void processHighAmountEvents(KStream<Long, AmountWasWithdrawn> stream) { // TODO: process the high amount branchstream.map((key, deposit) -> { logHighRiskWithdrawn(deposit.bankAccountId, deposit.amount); return new KeyValue<>( deposit.bankAccountId, new HighRiskWithdrawnWasDetected( deposit.bankAccountId, deposit.amount ) ); }).to( HIGH_RISK_WITHDRAWN_TOPIC, Produced.with(Serdes.Long(), highRiskEventSerde) );}Edit the
onStartmethod. Create aKafkaStreamobject, assign it to thestreamsobject property, and execute thestartmethod.The method should look like the following:
void onStart(@Observes StartupEvent startupEvent) { ...output omitted... // TODO: Create a Kafka streams and start itstreams = new KafkaStreams( builder.build(), generateStreamConfig() ); streams.start();}Close the streams processor in the
onStopmethod.The method should look like the following:
void onStop(@Observes ShutdownEvent shutdownEvent) { // TODO: Close the stream on shutdownstreams.close();}
Test the event processing implementation.
Return to the Alerting Events tab in your browser and refresh the page.
Return to the Intranet tab in your browser. In the area, type
35000in the field, and click .In the area, type
100in the field, and click .In the area, type
1500in the field, and click .In the area, type
6000in the field, and click .Return to the Alerting Events tab in your browser and verify the existence of the following events for the bank account with ID
2:A
HighValueDepositWasDetectedevent with amount35000.A
LowRiskWithdrawnWasDetectedevent with amount100.A
ModerateRiskWithdrawnWasDetectedevent with amount1500.A
HighRiskWithdrawnWasDetectedevent with amount6000.
Return to the command line, press CTRL + c to stop the application, and return to the workspace directory.
This concludes the guided exercise.