Bookmark this page

Guided Exercise: Defining Data Formats and Structures

In this exercise you will produce and consume messages that use schemas as contracts.

The application used in this exercise simulates the intranet of a bank. In this application you can create bank accounts with an initial balance, and operate on those balances. It is a simple CRUD application built with Quarkus that uses Panache to persist the data in an in-memory H2 database.

During this exercise you will use messages to extend the application features. You will assign a type to each bank account based on the current balance.

You will find the solution files for this exercise in the AD482-apps repository, within the kafka-schemas/solutions directory. Notice that you might need to replace some strings in the application.properties file.

Outcomes

You should be able to produce and consume messages that use schemas as contracts.

To perform this exercise, ensure you have the following:

  • Access to a configured and running OpenShift cluster.

  • Access to an installed and running Kafka instance in the OpenShift cluster.

  • A configured Python virtual environment, including the grading scripts for this course.

  • The JDK installed.

From your workspace directory, activate the Python virtual environment.

[user@host AD482]$ source .venv/bin/activate

Important

On Windows, use the Activate.ps1 script to activate your Python virtual environment.

PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1

Use the lab command to prepare your system for this exercise.

(.venv) [user@host AD482]$ lab start kafka-schemas
...output omitted...

Copy the Service Registry URL value. You will use this value to connect to the Red Hat Integration Service Registry.

The lab command copies the exercise files from the AD482-apps/kafka-schemas/apps/red-hat-bank directory, which is in your local Git repository, into the kafka-schemas directory, which is in your workspace. It also copies the .env file in your workspace to the kafka-schemas directory.

Procedure 2.4. Instructions

  1. From your workspace, move to the kafka-schemas directory, which contains the source code of the application. Produce messages to the bank-account-balance-change topic when you create new bank accounts. Use a schema registry to decouple the data schemas from the application.

    1. Navigate to the kafka-schemas directory.

    2. Open a web browser, and navigate to the Service Registry URL prompted by the lab start command. Notice that the Service Registry has no artifacts stored.

      Leave this browser tab open.

      Note

      The Service Registry might take some time to be available on the provided Service Registry URL.

    3. By using your editor of choice, open the pom.xml file and add the quarkus-apicurio-registry-avro dependency.

      The file content should look like the following:

      ...output omitted...
      <dependencies>
        <!-- TODO: Add the registry dependency -->
        <dependency>
          <groupId>io.quarkus</groupId>
          <artifactId>quarkus-apicurio-registry-avro</artifactId>
        </dependency>
      ...output omitted...
    4. Define the AVRO schema for a NewBankAccount message. This message must include the bank account ID, and the current balance. Both fields must use a Long type.

      Create the src/main/avro/new-bank-account.avsc file, and add the AVRO schema.

      The file content should look like the following:

      {
        "namespace": "com.redhat.training.bank.message",
        "type": "record",
        "name": "NewBankAccount",
        "fields": [
          { "name": "id", "type": "long" },
          { "name": "balance", "type": "long" }
        ]
      }

      During the compilation phase, Quarkus transforms the AVRO schema into a Java class, and stores it in the target/generated-sources/avsc directory. For this exercise, Quarkus generates the target/generated-sources/avsc/com/redhat/training/bank/message/NewBankAccount.java file at compilation time.

      You use this class to publish a notification message when you create a new bank account.

    5. Open the src/main/java/com/redhat/training/bank/resource/AccountResource.java file, and add an import for the generated Java class.

      The file content should look like the following:

      ...output omitted...
      
      // TODO: Import the NewBankAccount message
      import com.redhat.training.bank.message.NewBankAccount;
      import com.redhat.training.bank.model.BankAccount;
      ...output omitted...
    6. In the same file, add an Emitter for the new-bank-account-out channel. The Emitter must send a NewBankAccount message.

      The file content should look like the following:

      ...output omitted...
      
          // TODO: Add an emitter for the bank account creation messages
          @Inject @Channel("new-bank-account-out")
          Emitter<NewBankAccount> emitter;
      
      ...output omitted...
    7. Implement the sendMessageAboutNewBankAccount method in the AccountResource class to send NewBankAccount messages to Kafka.

      The method should look like the following:

      private void sendMessageAboutNewBankAccount(BankAccount bankAccount)
      {
          LOGGER.info(
                  "New Bank Account - ID: " + bankAccount.id
                  + ", Balance: " + bankAccount.balance
          );
      
          // TODO: Send a message about the new bank account
          emitter.send(
                  new NewBankAccount(
                          bankAccount.id,
                          bankAccount.balance
                  )
          );
      }
    8. Configure the outgoing channel for the new bank account messages. Open the src/main/resources/application.properties file, and configure the outgoing channel to use a service registry.

      The file content should look like the following:

      ...output omitted...
      
      # TODO: configure the outgoing channel
      mp.messaging.connector.smallrye-kafka.apicurio.registry.url = http://YOUR_SERVICE_REGISTRY_URL/apis/registry/v2 1
      mp.messaging.outgoing.new-bank-account-out.connector = smallrye-kafka
      mp.messaging.outgoing.new-bank-account-out.topic = new-bank-account
      mp.messaging.outgoing.new-bank-account-out.apicurio.registry.auto-register = true 2

      1

      URL of the Red Hat Integration Service Registry.

      2

      Option to enable the publishing of artifacts in the Service Registry.

      Note

      Quarkus auto detects the appropriate serializer based on the @Channel declaration, the structure of the message, and the presence of the Registry libraries. In this case, the selected serializer for the outgoing channel is io.apicurio.registry.serde.avro.AvroKafkaSerializer.

  2. Consume the AVRO serialized messages from the new-bank-account topic, and update the account type. Use a schema registry to decouple the data schemas from the application.

    1. Process the messages, and update the bank account records stored in the database.

      • Find the database record based on the id sent on the event. If there is no database record with the specified ID, then log a message with the error.

      • On found database records, if the balance is lower than 100000, then assign the value regular to the profile field. Otherwise, assign the value premium.

      Open the src/main/java/com/redhat/training/bank/consumer/NewBankAccountConsumer.java file and implement the consumer.

      The file content should look like the following:

      ...output omitted...
      
      @ApplicationScoped
      public class NewBankAccountConsumer {
          private static final Logger LOGGER = Logger.getLogger(NewBankAccountConsumer.class);
      
          // TODO: Create the consumer implementation
          @Incoming("new-bank-account-in")
          @Blocking
          @Transactional
          public void processMessage(NewBankAccount message) {
      
              BankAccount entity = BankAccount.findById(message.getId());
      
              if (entity != null) {
                  entity.profile = message.getBalance() < 100000
                          ? "regular" : "premium";
                  LOGGER.info(
                          "Updated Bank Account - ID: "
                          + message.getId() + " - Type: " + entity.profile
                  );
              } else {
                  LOGGER.info("Bank Account not found!");
              }
          }
      }
    2. Configure the incoming channel for the new bank account messages. Open the src/main/resources/application.properties file, and configure the incoming channel.

      ...output omitted...
      
      # TODO: configure the incoming channel
      mp.messaging.incoming.new-bank-account-in.connector = smallrye-kafka
      mp.messaging.incoming.new-bank-account-in.topic = new-bank-account
      mp.messaging.incoming.new-bank-account-in.enable.auto.commit = false
      mp.messaging.incoming.new-bank-account-in.auto.offset.reset = earliest
      mp.messaging.incoming.new-bank-account-in.apicurio.registry.use-specific-avro-reader = true
  3. Run the application and test the integration with Kafka and the Service Registry

    1. Return to the command line terminal, and start the application.

      (.venv) [user@host kafka-schemas]$ ./mvnw clean package quarkus:dev
      ...output omitted... Listening on: http://localhost:8080
      ...output omitted...
    2. Open a new tab in the browser, and navigate to http://localhost:8080.

    3. In the Create a Bank Account area, type 200000 in the Initial Balance field, and click Create. Notice that the front end displays the created account at the bottom of the page without a type.

      The application stores the new bank account in the database, and after that sends a message about creation for future processing by your created consumer. For that reason the bank account type is empty.

    4. Return to the command line terminal and verify that the consumer processed the message.

      ...output omitted...
      ...output omitted... New Bank Account - ID: 3, Balance: 200000
      ...output omitted... Updated Bank Account - ID: 3 - Type: premium
    5. Return to the browser window and refresh the page. Notice that the event consumer updated the type of account and now the front end displays the account type. This is because the application consumed the event and updated the account type in the database

    6. Return to the Service Registry tab and refresh the page. Notice that there is an artifact named NewBankAccount.

    7. Click NewBankAccount (new-bank-account-value), and then click the Content tab. Verify that the content displayed matches the schema stored in the src/main/avro/new-bank-account.avsc file.

    8. Return to the command line, press CTRL + c to stop the application, and return to the workspace directory.

Finish

Go back to your workspace. Run the lab command to complete this exercise. This is important to ensure that resources from previous exercises do not impact upcoming exercises.

(.venv) [user@host AD482]$ lab finish kafka-schemas

This concludes the guided exercise.

Revision: ad482-1.8-cc2ae1c