Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise you will produce and consume messages that use schemas as contracts.
The application used in this exercise simulates the intranet of a bank. In this application you can create bank accounts with an initial balance, and operate on those balances. It is a simple CRUD application built with Quarkus that uses Panache to persist the data in an in-memory H2 database.
During this exercise you will use messages to extend the application features. You will assign a type to each bank account based on the current balance.
You will find the solution files for this exercise in the AD482-apps repository, within the kafka-schemas/solutions directory.
Notice that you might need to replace some strings in the application.properties file.
Outcomes
You should be able to produce and consume messages that use schemas as contracts.
To perform this exercise, ensure you have the following:
Access to a configured and running OpenShift cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to prepare your system for this exercise.
(.venv) [user@host AD482]$ lab start kafka-schemas
...output omitted...
Copy the Service Registry URL value.
You will use this value to connect to the Red Hat Integration Service Registry.
The lab command copies the exercise files from the AD482-apps/kafka-schemas/apps/red-hat-bank directory, which is in your local Git repository, into the kafka-schemas directory, which is in your workspace.
It also copies the .env file in your workspace to the kafka-schemas directory.
Procedure 2.4. Instructions
From your workspace, move to the
kafka-schemasdirectory, which contains the source code of the application. Produce messages to thebank-account-balance-changetopic when you create new bank accounts. Use a schema registry to decouple the data schemas from the application.Navigate to the
kafka-schemasdirectory.Open a web browser, and navigate to the
Service Registry URLprompted by thelab startcommand. Notice that the Service Registry has no artifacts stored.Leave this browser tab open.
Note
The Service Registry might take some time to be available on the provided
Service Registry URL.By using your editor of choice, open the
pom.xmlfile and add thequarkus-apicurio-registry-avrodependency.The file content should look like the following:
...output omitted... <dependencies> <!-- TODO: Add the registry dependency -->
<dependency> <groupId>io.quarkus</groupId> <artifactId>quarkus-apicurio-registry-avro</artifactId> </dependency>...output omitted...Define the AVRO schema for a
NewBankAccountmessage. This message must include the bank account ID, and the current balance. Both fields must use aLongtype.Create the
src/main/avro/new-bank-account.avscfile, and add the AVRO schema.The file content should look like the following:
{ "namespace": "com.redhat.training.bank.message", "type": "record", "name": "NewBankAccount", "fields": [ { "name": "id", "type": "long" }, { "name": "balance", "type": "long" } ] }During the compilation phase, Quarkus transforms the AVRO schema into a Java class, and stores it in the
target/generated-sources/avscdirectory. For this exercise, Quarkus generates thetarget/generated-sources/avsc/com/redhat/training/bank/message/NewBankAccount.javafile at compilation time.You use this class to publish a notification message when you create a new bank account.
Open the
src/main/java/com/redhat/training/bank/resource/AccountResource.javafile, and add animportfor the generated Java class.The file content should look like the following:
...output omitted... // TODO: Import the NewBankAccount message
import com.redhat.training.bank.message.NewBankAccount;import com.redhat.training.bank.model.BankAccount; ...output omitted...In the same file, add an
Emitterfor thenew-bank-account-outchannel. TheEmittermust send aNewBankAccountmessage.The file content should look like the following:
...output omitted... // TODO: Add an emitter for the bank account creation messages
@Inject @Channel("new-bank-account-out") Emitter<NewBankAccount> emitter;...output omitted...Implement the
sendMessageAboutNewBankAccountmethod in theAccountResourceclass to sendNewBankAccountmessages to Kafka.The method should look like the following:
private void sendMessageAboutNewBankAccount(BankAccount bankAccount) { LOGGER.info( "New Bank Account - ID: " + bankAccount.id + ", Balance: " + bankAccount.balance ); // TODO: Send a message about the new bank accountemitter.send( new NewBankAccount( bankAccount.id, bankAccount.balance ) );}Configure the outgoing channel for the new bank account messages. Open the
src/main/resources/application.propertiesfile, and configure the outgoing channel to use a service registry.The file content should look like the following:
...output omitted... # TODO: configure the outgoing channel
mp.messaging.connector.smallrye-kafka.apicurio.registry.url = http://YOUR_SERVICE_REGISTRY_URL/apis/registry/v2mp.messaging.outgoing.new-bank-account-out.connector = smallrye-kafkamp.messaging.outgoing.new-bank-account-out.topic = new-bank-accountmp.messaging.outgoing.new-bank-account-out.apicurio.registry.auto-register = trueURL of the Red Hat Integration Service Registry.
Option to enable the publishing of artifacts in the Service Registry.
Note
Quarkus auto detects the appropriate serializer based on the
@Channeldeclaration, the structure of the message, and the presence of the Registry libraries. In this case, the selected serializer for the outgoing channel isio.apicurio.registry.serde.avro.AvroKafkaSerializer.
Consume the AVRO serialized messages from the
new-bank-accounttopic, and update the account type. Use a schema registry to decouple the data schemas from the application.Process the messages, and update the bank account records stored in the database.
Find the database record based on the id sent on the event. If there is no database record with the specified ID, then log a message with the error.
On found database records, if the balance is lower than 100000, then assign the value
regularto theprofilefield. Otherwise, assign the valuepremium.
Open the
src/main/java/com/redhat/training/bank/consumer/NewBankAccountConsumer.javafile and implement the consumer.The file content should look like the following:
...output omitted... @ApplicationScoped public class NewBankAccountConsumer { private static final Logger LOGGER = Logger.getLogger(NewBankAccountConsumer.class); // TODO: Create the consumer implementation
@Incoming("new-bank-account-in") @Blocking @Transactional public void processMessage(NewBankAccount message) { BankAccount entity = BankAccount.findById(message.getId()); if (entity != null) { entity.profile = message.getBalance() < 100000 ? "regular" : "premium"; LOGGER.info( "Updated Bank Account - ID: " + message.getId() + " - Type: " + entity.profile ); } else { LOGGER.info("Bank Account not found!"); } }}Configure the incoming channel for the new bank account messages. Open the
src/main/resources/application.propertiesfile, and configure the incoming channel....output omitted... # TODO: configure the incoming channel
mp.messaging.incoming.new-bank-account-in.connector = smallrye-kafka mp.messaging.incoming.new-bank-account-in.topic = new-bank-account mp.messaging.incoming.new-bank-account-in.enable.auto.commit = false mp.messaging.incoming.new-bank-account-in.auto.offset.reset = earliest mp.messaging.incoming.new-bank-account-in.apicurio.registry.use-specific-avro-reader = true
Run the application and test the integration with Kafka and the Service Registry
Return to the command line terminal, and start the application.
(.venv) [user@host kafka-schemas]$
./mvnw clean package quarkus:dev...output omitted... Listening on: http://localhost:8080 ...output omitted...Open a new tab in the browser, and navigate to
http://localhost:8080.In the area, type
200000in the field, and click . Notice that the front end displays the created account at the bottom of the page without a type.The application stores the new bank account in the database, and after that sends a message about creation for future processing by your created consumer. For that reason the bank account type is empty.
Return to the command line terminal and verify that the consumer processed the message.
...output omitted... ...output omitted... New Bank Account - ID: 3, Balance: 200000 ...output omitted...
Updated Bank Account - ID: 3 - Type: premiumReturn to the browser window and refresh the page. Notice that the event consumer updated the type of account and now the front end displays the account type. This is because the application consumed the event and updated the account type in the database
Return to the Service Registry tab and refresh the page. Notice that there is an artifact named
NewBankAccount.Click , and then click the tab. Verify that the content displayed matches the schema stored in the
src/main/avro/new-bank-account.avscfile.Return to the command line, press CTRL + c to stop the application, and return to the workspace directory.
This concludes the guided exercise.