Bookmark this page

Guided Exercise: Implementing Stateless Event Processing

In this exercise you will perform stateless transformations on streams of events to extend an application that simulates the intranet of a bank.

In this application you can create bank accounts with an initial balance, and operate on those balances. It is a simple CRUD application built with Quarkus that uses Panache to persist the data in an in-memory H2 database.

You will extend the application by assigning a type to each bank account based on the current balance, and by sending event alerts to topics.

Outcomes

You should be able to perform stateless transformations on streams of events.

You can find the solution files for this exercise in the AD482-apps repository, within the collaboration-stateless/solutions directory.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to start the scenario for this exercise.

(.venv) [user@host AD482]$ lab start collaboration-stateless

The lab command copies the exercise files, from the AD482-apps/collaboration-stateless/apps/red-hat-bank directory, which is in your local Git repository, into the collaboration-stateless directory, which is in your workspace.

Procedure 4.1. Instructions

  1. Navigate to the collaboration-stateless directory.

    (.venv) [user@host AD482]$ cd collaboration-stateless
    (.venv) [user@host collaboration-stateless]$
  2. Perform a stateless transformation on the BankAccountWasCreated stream of events. Process all the BankAccountWasCreated events and update the account type based on the initial balance by using the updateAccountTypeFromEvent method.

    1. Open the com.redhat.training.bank.stream.BankAccountWasCreatedPipeline class and examine the code.

      This class subscribes to the StartupEvent and ShutdownEvent events. You must implement the business logic in the onStart and onStop methods.

      The updateAccountTypeFromEvent method updates the type of the new bank account based on the initial balance.

      • If balance is lower than 100000, then the account type should be regular,

      • Otherwise, the type should be premium.

      Note

      This application does not use beans with the @Produces annotation to avoid an ambiguous dependency problem with the CDI when you have multiple beans for the streams.

    2. Edit the onStart method and implement the business logic that updates the account type. Use the foreach stateless transformation to loop through the events available in the bank-account-creation topic, and update the account type by using the updateAccountTypeFromEvent method.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Update the account type on each event
          builder.stream(
              BANK_ACCOUNT_WAS_CREATED_TOPIC,
              Consumed.with(Serdes.Long(), eventSerde)
          ).foreach((key, creation) -> {
              updateAccountTypeFromEvent(creation);
          });
      
          ...output omitted...
      }
    3. Create a KafkaStream object, assign it to the streams object property, and execute the start method.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Create a Kafka streams and start it
          streams = new KafkaStreams(
              builder.build(),
              generateStreamConfig()
          );
      
          streams.start();
      }
    4. Close the streams processor in the onStop method.

      The method should look like the following:

      void onStop(@Observes ShutdownEvent shutdownEvent) {
          // TODO: Close the stream on shutdown
          streams.close();
      }
  3. Test the event processing implementation.

    1. Return to the command line terminal, and start the application.

      (.venv) [user@host collaboration-stateless]$ ./mvnw clean package quarkus:dev
      ...output omitted...
      ...output omitted... Listening on: http://localhost:8080
      ...output omitted...
    2. Open a new web browser, and navigate to http://localhost:8080. This is the Intranet page.

    3. In the Create a Bank Account area, type 3333 in the Initial Balance field, and click Create. Notice that the front end displays the created account at the bottom of the page without a type.

      The application stores the new bank account in the database, and sends the creation event to the bank-account-creation topic. The processing pipeline that you just created processes this event. For that reason the bank account type is empty.

    4. Return to the command line terminal and verify that the processor executed the stateless transformation.

      ...output omitted...
      ...output omitted... BankAccountWasCreated - ID: 3, Balance: 3333
      ...output omitted... Updated Bank Account - ID: 3 - Type: regular
    5. Return to the browser window and refresh the page. Notice that the processor updated the type of account in the background, and now the front end displays the account type. This is an example of eventual consistency. The event stored in the topic guarantees that the bank account data will be consistent at some point in the future, but not necessarily immediately.

  4. Perform a stateless transformation on the AmountWasDeposited stream of events. Filter all the events for a deposit greater than 1000, map each filtered event to a HighValueDepositWasDetected event, and send it to the high-value-deposit-alert topic. You can use the helper methods for debugging purposes.

    1. Open the com.redhat.training.bank.stream.AmountWasDepositedPipeline class and examine the code. You must implement the business logic in the onStart and onStop methods.

    2. Edit the onStart method and implement a topology that performs the following transformations on the events available in the bank-account-deposit topic:

      • Filter all AmountWasDeposited events with an amount greater than 1000.

      • Map the filtered AmountWasDeposited events to HighValueDepositWasDetected events.

      • Send the new events to the high-value-deposit-alert topic, and use the deposit bankAccountId field as the record key for the events.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Build the stream topology
          builder.stream(
              AMOUNT_WAS_DEPOSITED_TOPIC,
              Consumed.with(Serdes.Long(), depositEventSerde)
          ).filter(
              (key, deposit) -> deposit.amount > 1000
          ).map((key, deposit) -> {
              logHighValueDepositAlert(deposit.bankAccountId, deposit.amount);
      
              return new KeyValue<>(
                  deposit.bankAccountId,
                  new HighValueDepositWasDetected(deposit.bankAccountId, deposit.amount)
              );
          }).to(
              HIGH_VALUE_DEPOSIT_TOPIC,
              Produced.with(Serdes.Long(), highValueEventSerde)
          );
      
          ...output omitted...
      }
    3. Create a KafkaStream object, assign it to the streams object property, and execute the start method.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Create a Kafka streams and start it
          streams = new KafkaStreams(
              builder.build(),
              generateStreamConfig()
          );
      
          streams.start();
      }
    4. Close the streams processor in the onStop method.

      The method should look like the following:

      void onStop(@Observes ShutdownEvent shutdownEvent) {
          // TODO: Close the stream on shutdown
          streams.close();
      }
  5. Test the event processing implementation.

    1. Open a new tab in the browser, and navigate to http://localhost:8080/alerts.html. In this Alerting Events page you will see all the events fired by the Quarkus application. Keep this tab open.

    2. Return to the Intranet tab in your browser. If Quarkus did not detect the changes in your application, then refresh the page to force Quarkus to rebuild the application.

    3. In the Account #1 area, type 1500 in the Amount field, and click Deposit.

    4. Return to the command line terminal and verify that the processor executed the stateless transformation.

      ...output omitted...
      ...output omitted... AmountWasDeposited - Account ID: 1, Amount: 1500
      ...output omitted... HighValueDepositWasDetected - Account ID:1 Amount:1500
    5. Return to the Alerting Events tab in your browser, and notice that the front end displays the HighValueDepositWasDetected event.

  6. Perform a stateless transformation on the AmountWasWithdrawn stream of events. Perform an inverse filtering on events with an amount lower than or equal to 50, and split the filtered events by using the following rules:

    • If the withdrawn amount is in the range (50 - 1000], then send a LowRiskWithdrawnWasDetected event to the low-risk-withdrawn-alert topic.

    • If the withdrawn amount is in the range (1000 - 3000], then send a ModerateRiskWithdrawnWasDetected event to the moderate-risk-withdrawn-alert topic.

    • In any other cases send a HighRiskWithdrawnWasDetected event to the high-risk-withdrawn-alert topic.

    1. Open the com.redhat.training.bank.stream.AmountWasWithdrawnPipeline class and examine the code. You must implement the onStart and onStop methods. You can use the helper methods for debugging purposes.

    2. Edit the onStart method and implement an inverse filter to avoid processing low amount withdrawals. Filter events with an amount greater than 50.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Add inverse filter
          KStream<Long, AmountWasWithdrawn> mainStream = builder.stream(
              AMOUNT_WAS_WITHDRAWN_TOPIC,
              Consumed.with(Serdes.Long(), withdrawalEventSerde)
          ).filterNot((key, withdrawal) -> withdrawal.amount <= 50);
      
          ...output omitted...
      }
    3. Split the stream of events by using the following rules:

      • If the withdrawn amount is in the range (50 - 1000], then send a LowRiskWithdrawnWasDetected event to the low-risk-withdrawn-alert topic.

      • If the withdrawn amount is in the range (1000 - 3000], then send a ModerateRiskWithdrawnWasDetected event to the moderate-risk-withdrawn-alert topic.

      • In any other cases send a HighRiskWithdrawnWasDetected to the high-risk-withdrawn-alert topic.

      Process each branch completely independently, and decouple the processing logic of each one of the branches to the processLowAmountEvents, processModerateAmountEvents, and processHighAmountEvents methods.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Split the stream
          mainStream.split()
              .branch(
                  (key, withdrawal) -> withdrawal.amount > 50
                          && withdrawal.amount <= 1000,
                  Branched.withConsumer(this::processLowAmountEvents)
              )
              .branch(
                  (key, withdrawal) -> withdrawal.amount > 1000
                          && withdrawal.amount <= 3000,
                  Branched.withConsumer(this::processModerateAmountEvents)
              )
              .branch(
                  (key, withdrawal) -> true,
                  Branched.withConsumer(this::processHighAmountEvents)
              );
      
          ...output omitted...
      }
    4. Implement the processing logic for the branch with amounts in the range (50 - 1000]. Edit the processLowAmountEvents method. Map the low risk branch to a LowRiskWithdrawnWasDetected event, and send it to the low-risk-withdrawn-alert topic.

      The method should look like the following:

      private void processLowAmountEvents(KStream<Long, AmountWasWithdrawn> stream) {
          // TODO: process the low amount branch
          stream.map((key, deposit) -> {
              logLowRiskWithdrawn(deposit.bankAccountId, deposit.amount);
      
              return new KeyValue<>(
                  deposit.bankAccountId,
                  new LowRiskWithdrawnWasDetected(
                          deposit.bankAccountId,
                          deposit.amount
                  )
              );
          }).to(
              LOW_RISK_WITHDRAWN_TOPIC,
              Produced.with(Serdes.Long(), lowRiskEventSerde)
          );
      }
    5. Implement the processing logic for the branch with amounts in the range (1000 - 3000]. Edit the processModerateAmountEvents method. Map the moderate risk branch to a ModerateRiskWithdrawnWasDetected event, and send it to the moderate-risk-withdrawn-alert topic.

      The method should look like the following:

      private void processModerateAmountEvents(KStream<Long, AmountWasWithdrawn> stream) {
          // TODO: process the moderate amount branch
          stream.map((key, deposit) -> {
              logModerateRiskWithdrawn(deposit.bankAccountId, deposit.amount);
      
              return new KeyValue<>(
                  deposit.bankAccountId,
                  new ModerateRiskWithdrawnWasDetected(
                          deposit.bankAccountId,
                          deposit.amount
                  )
              );
          }).to(
              MODERATE_RISK_WITHDRAWN_TOPIC,
              Produced.with(Serdes.Long(), moderateRiskEventSerde)
          );
      }
    6. Implement the processing logic for the branch that covers any other cases. Edit the processHighAmountEvents method. Map the high risk branch to a HighRiskWithdrawnWasDetected event and send it to the high-risk-withdrawn-alert topic.

      The method should look like the following:

      private void processHighAmountEvents(KStream<Long, AmountWasWithdrawn> stream) {
          // TODO: process the high amount branch
          stream.map((key, deposit) -> {
              logHighRiskWithdrawn(deposit.bankAccountId, deposit.amount);
      
              return new KeyValue<>(
                  deposit.bankAccountId,
                  new HighRiskWithdrawnWasDetected(
                          deposit.bankAccountId,
                          deposit.amount
                  )
              );
          }).to(
              HIGH_RISK_WITHDRAWN_TOPIC,
              Produced.with(Serdes.Long(), highRiskEventSerde)
          );
      }
    7. Edit the onStart method. Create a KafkaStream object, assign it to the streams object property, and execute the start method.

      The method should look like the following:

      void onStart(@Observes StartupEvent startupEvent) {
          ...output omitted...
      
          // TODO: Create a Kafka streams and start it
          streams = new KafkaStreams(
              builder.build(),
              generateStreamConfig()
          );
      
          streams.start();
      }
    8. Close the streams processor in the onStop method.

      The method should look like the following:

      void onStop(@Observes ShutdownEvent shutdownEvent) {
          // TODO: Close the stream on shutdown
          streams.close();
      }
  7. Test the event processing implementation.

    1. Return to the Alerting Events tab in your browser and refresh the page.

    2. Return to the Intranet tab in your browser. In the Account #2 area, type 35000 in the Amount field, and click Deposit.

    3. In the Account #2 area, type 100 in the Amount field, and click Withdraw.

    4. In the Account #2 area, type 1500 in the Amount field, and click Withdraw.

    5. In the Account #2 area, type 6000 in the Amount field, and click Withdraw.

    6. Return to the Alerting Events tab in your browser and verify the existence of the following events for the bank account with ID 2:

      • A HighValueDepositWasDetected event with amount 35000.

      • A LowRiskWithdrawnWasDetected event with amount 100.

      • A ModerateRiskWithdrawnWasDetected event with amount 1500.

      • A HighRiskWithdrawnWasDetected event with amount 6000.

    7. Return to the command line, press CTRL + c to stop the application, and return to the workspace directory.

Finish

Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish collaboration-stateless

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c