Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
After completing this section, you should be able to capture change event data with Debezium.
Distributed applications such as microservices interact with each other with minimum coupling. Whether these interactions occur as a synchronous direct HTTP-request, or asynchronously, microservices should use a common set of data. This common set of data usually represents a business model that microservices share.
Microservices can establish a common set of data by using a shared database for all the services.
Modern microservices, especially the ones that use Event-driven architecture (EDA), should provide less coupling and more independence.
Each microservice can use a separate database to have the least coupling with other microservices.
This database per service approach not only decreases the coupling between services, but also provides an encapsulation.
Note
Encapsulation (aka. Data encapsulation), is a mechanism whereby the implementation details of the data are kept hidden from the user. The user cannot directly modify the data, but must use a public entry point that the encapsulation must provide for external interaction. This entry point can be either a public method at the programming language level, or a REST API at the web API level.
Microservices exchange data due to particular business needs. For example, a microservice might require data from other services, or update a set of data that affects the data of another microservice. This means, a set of data that represents a business model can concurrently exist in more than one database.
This coexistence is likely to cause inconsistency issues. One of the common inconsistency problems is Dual Writes.
A dual write describes the situation when an application changes the data that represents a particular business model in two different systems without any additional layer for data consistency.
Consider a microservice that provides order management on a system. When a user creates an order, the order microservice stores the data in a database and indexes the data in Elasticsearch.
The order microservice should use a common transaction for both operations to ensure data consistency between them. But not every system, like Elasticsearch, supports distributed transactions (XA). This means that, even if the order microservice runs two operations in a single transaction, if the Elasticsearch index operation fails, then it does not trigger the transaction to roll back. Using separate transactions makes no difference because it already does not provide any data consistency.
In this scenario, when both updates are successful, the data stays consistent.
If the first operation fails, then the data is still consistent because the second operation will not run.
However, if the first operation is successful but the second one fails, then the first system contains the data but not the second one. This means the data becomes inconsistent. This is how dual write situations can lead to data inconsistencies.
One way to prevent dual writes in a system is to use the Change Data Capture (CDC) pattern.
Change Data Capture (CDC) is a data integration pattern that enables tracking and monitoring the data changes in databases. CDC tracks data changes, and then alerts other systems to respond to those changes. You can push the captured data change to any durable pub-sub system. Other systems consume the changes and determine the necessary action.
These actions can affect more than one system at the same time because these systems can subscribe to the data that the CDC system captures and publishes. The data might represent the same data model to be updated on each system. Change data capture helps maintain consistency for each system and this prevents any dual-write issue that can cause inconsistent data states.
There are two CDC approaches.
- Query-based (Polling-based) CDC
Query-based CDC repeatedly runs queries for retrieving any newly inserted or updated rows from the tables.
- Log-based CDC
Log-based CDC works by reacting to any changes to the database's log files. Example implementations are the MySQL binary log and MongoDB operations log.
There are many advantages of log-based CDC.
All data changes including delete actions can be captured.
Also, a log-based CDC system can capture old record states and further metadata.
One of the implementations of a log-based CDC is Debezium.
Debezium is an open-source project that provides a set of distributed services to capture changes in databases. It records all row-level changes within each database table in a change event stream. External systems read these change events in the same order in which they occurred.
Debezium uses Apache Kafka and provides Kafka Connect compatible database-specific source connectors.
In AMQ Streams on the Red Hat OpenShift Container Platform (RHOCP), you can define the source connectors by using the KafkaConnector custom resources.
With properly configured connector plug-ins in a Kafka Connect cluster on the RHOCP, Debezium can capture data from a database and write it to the topics that AMQ Streams on the RHOCP provides.
Debezium provides connector implementations for the following databases:
DB2
MongoDB
MySQL
Oracle
PostgreSQL
SQL Server
Each connector might require different configuration options.
The following is a sample of a Debezium MySQL connector, which is defined as a KafkaConnector custom resource.
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
labels:
strimzi.io/cluster: my-connect-cluster
name: mysql-debezium-connector
spec:
class: io.debezium.connector.mysql.MySqlConnector
config:
database.hostname: mysql
database.port: 3306
database.user: dbuser
database.password: dbpassword
database.dbname: neverendingblog
database.server.name: db
table.include.list: neverendingblog.posts
database.history.kafka.topic: dbhistory.posts
database.history.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092'
tasksMax: 1Debezium MySQL connector class that enables the MySQL connection for CDC. | |
MySQL server host address. | |
MySQL server port number. | |
MySQL user with the appropriate privileges. | |
MySQL user’s password. | |
MySQL database that includes the relevant tables to be connected to. | |
Logical name of the MySQL server or cluster. | |
Table name in the database whose data is to be captured by Debezium. | |
Kafka cluster address that the connector uses to write and recover Data Definition Language (DDL) statements to the database history topic. | |
List of Kafka brokers that the connector uses to write and recover DDL statements to the database history topic. |
Debezium enables applications to respond to data changes in databases.
The applications can react to insert, update, and delete events, such as:
Use the events to manage cache entries.
Update search indexes with the data.
Update a derived data store with the same information to implement Command Query Responsibility Separation (CQRS).
Aggregate the changes and produce a stream of patches for entities.
Drive streaming queries e.g. with Apache Flink or Kafka Streams.
Propagate data between microservices, e.g. using the Outbox pattern.
Debezium, as a CDC implementation, can be used with the Outbox pattern to create reliable event exchange between microservices.
The Outbox pattern is a design pattern that describes writing data (e.g. change events) to an outbox. This ensures reliable, at-least-once delivery to other services. Instead of directly publishing the change events to these services, the application writes them in a temporary storage (e.g. database table). While actual data operation happens in its own table, the application should write the change events to the outbox table. These two operations should share the same transaction.
Using the Outbox pattern with the CDC is efficient because the CDC framework can capture the stored event data in the outbox table and publish it to any system.
In the order management example, the order microservice had a data consistency problem because of the dual write situation. The order microservice updates a PostgreSQL database and Elasticsearch for the search service consistently.
By implementing the Outbox pattern and the CDC, the order microservice can create change events on the outbox table while operating on the actual table that has the order data. The application can handle both operations in a single transaction and prevent dual writes by sending them to a durable pub-sub system.
Debezium, as a CDC implementation, can capture the change event data from the PostgreSQL outbox table and publish them to Kafka to be consumed by other application services.
Debezium provides components that enable you to implement the Outbox pattern, such as:
- Outbox Event Router
Outbox Event Router is a Single Message Transformation (SMT), that provides support for the Outbox pattern implementation with Debezium. When the Debezium connector captures the event data from the outbox table, the Outbox Event Router SMT provides the relevant transformation to get the data and then route that data.
The following example shows a Kafka Connect connector definition that uses the Outbox Event Router SMT.
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector ...omitted... spec: class: io.debezium.connector.postgresql.PostgresConnector config: ...omitted... table.include.list:
outboxeventtombstones.on.delete: false
transforms: EventRoutertransforms.EventRouter.type: io.debezium.transforms.outbox.EventRoutertransforms.EventRouter.table.fields.additional.placement: type:header:eventTypetransforms.EventRouter.route.topic.replacement: ${routedByValue}.events...omitted...
The outbox table whose data is to be captured by Debezium.
Setting this value to false means that Debezium skips
deleteson the captured outbox table. Outbox tables should be designed to keep the data permanently so the captured event data should be deleted afterward.SMT definition for
EventRouter.Type of the Outbox Event Router SMT.
Gets the
typevalue from the header of the outbox message that is captured from the outbox table.Replaces the topic name to be emitted by Debezium. By default, it is
outbox.event.${routedByValue}, but in this case you set it as${routedByValue}.events. This is a variable that equals to theaggregatetypecolumn in the outbox table.- Outbox Quarkus Extension
The Outbox Quarkus Extension helps you to create the event model and entity by providing the
ExportedEventinterface. A Quarkus application that uses JPA with Hibernate or Panache also enables the automatic creation of the outbox table with specific column names. TheOutbox Event Routerapplies transformations by using these column names. The following example shows an order event that implements theExportedEventinterface of the Outbox Quarkus Extension.public class OrderCreatedEvent implements
ExportedEvent<String, JsonNode>{...omitted... @Override public String getAggregateId() {
return String.valueOf(orderId); } @Override public String getAggregateType() {
return "Order"; } @Override public JsonNode getPayload() {
return payload; } @Override public String getType() {
return "OrderCreated"; } @Override public Instant getTimestamp() {
return created; } }
OrderCreatedEventimplements theExportedEventwith typeJsonNode. This means the event payload that the application creates on the outbox table will be in JSON format.Aggregate ID. The ID of the event in the outbox table. In this case it is the order ID.
Aggregate type of the event. This can be set as anything related to the event. The
Outbox Event Routeruses this value with the${routedByValue}parameter which generally defines a part of the topic name.Payload of the event which will be in JSON format.
Event type. This can be set as anything related to what the event is for.
Event creation time.
Both components are optional.
You can create an entity that conforms to an outbox table, for which the Outbox Event Router can perform the related transformations.
Alternatively, you can implement the custom transformations instead of using the Outbox Event Router.
Debezium provides these components to make it easier to implement the Outbox pattern with CDC.