Bookmark this page

Capturing Change Event Data with Debezium

Objectives

After completing this section, you should be able to capture change event data with Debezium.

Defining Data Inconsistencies in Distributed Systems

Distributed applications such as microservices interact with each other with minimum coupling. Whether these interactions occur as a synchronous direct HTTP-request, or asynchronously, microservices should use a common set of data. This common set of data usually represents a business model that microservices share.

Microservices can establish a common set of data by using a shared database for all the services.

Modern microservices, especially the ones that use Event-driven architecture (EDA), should provide less coupling and more independence. Each microservice can use a separate database to have the least coupling with other microservices. This database per service approach not only decreases the coupling between services, but also provides an encapsulation.

Note

Encapsulation (aka. Data encapsulation), is a mechanism whereby the implementation details of the data are kept hidden from the user. The user cannot directly modify the data, but must use a public entry point that the encapsulation must provide for external interaction. This entry point can be either a public method at the programming language level, or a REST API at the web API level.

Figure 5.9: Database per service

Microservices exchange data due to particular business needs. For example, a microservice might require data from other services, or update a set of data that affects the data of another microservice. This means, a set of data that represents a business model can concurrently exist in more than one database.

This coexistence is likely to cause inconsistency issues. One of the common inconsistency problems is Dual Writes.

Describing Dual Writes

A dual write describes the situation when an application changes the data that represents a particular business model in two different systems without any additional layer for data consistency.

Consider a microservice that provides order management on a system. When a user creates an order, the order microservice stores the data in a database and indexes the data in Elasticsearch.

The order microservice should use a common transaction for both operations to ensure data consistency between them. But not every system, like Elasticsearch, supports distributed transactions (XA). This means that, even if the order microservice runs two operations in a single transaction, if the Elasticsearch index operation fails, then it does not trigger the transaction to roll back. Using separate transactions makes no difference because it already does not provide any data consistency.

In this scenario, when both updates are successful, the data stays consistent.

Figure 5.10: Successful transaction with consistent data

If the first operation fails, then the data is still consistent because the second operation will not run.

Figure 5.11: Failing transaction with consistent data

However, if the first operation is successful but the second one fails, then the first system contains the data but not the second one. This means the data becomes inconsistent. This is how dual write situations can lead to data inconsistencies.

Figure 5.12: Successful transaction with inconsistent data

One way to prevent dual writes in a system is to use the Change Data Capture (CDC) pattern.

Defining Change Data Capture (CDC)

Change Data Capture (CDC) is a data integration pattern that enables tracking and monitoring the data changes in databases. CDC tracks data changes, and then alerts other systems to respond to those changes. You can push the captured data change to any durable pub-sub system. Other systems consume the changes and determine the necessary action.

Figure 5.13: Change data capture

These actions can affect more than one system at the same time because these systems can subscribe to the data that the CDC system captures and publishes. The data might represent the same data model to be updated on each system. Change data capture helps maintain consistency for each system and this prevents any dual-write issue that can cause inconsistent data states.

There are two CDC approaches.

Query-based (Polling-based) CDC

Query-based CDC repeatedly runs queries for retrieving any newly inserted or updated rows from the tables.

Log-based CDC

Log-based CDC works by reacting to any changes to the database's log files. Example implementations are the MySQL binary log and MongoDB operations log.

There are many advantages of log-based CDC. All data changes including delete actions can be captured. Also, a log-based CDC system can capture old record states and further metadata.

One of the implementations of a log-based CDC is Debezium.

Describing CDC with Debezium

Debezium is an open-source project that provides a set of distributed services to capture changes in databases. It records all row-level changes within each database table in a change event stream. External systems read these change events in the same order in which they occurred.

Debezium uses Apache Kafka and provides Kafka Connect compatible database-specific source connectors.

Figure 5.14: Change data capture with Debezium

In AMQ Streams on the Red Hat OpenShift Container Platform (RHOCP), you can define the source connectors by using the KafkaConnector custom resources. With properly configured connector plug-ins in a Kafka Connect cluster on the RHOCP, Debezium can capture data from a database and write it to the topics that AMQ Streams on the RHOCP provides.

Debezium provides connector implementations for the following databases:

  • DB2

  • MongoDB

  • MySQL

  • Oracle

  • PostgreSQL

  • SQL Server

Each connector might require different configuration options. The following is a sample of a Debezium MySQL connector, which is defined as a KafkaConnector custom resource.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: my-connect-cluster
  name: mysql-debezium-connector
spec:
  class: io.debezium.connector.mysql.MySqlConnector 1
  config:
    database.hostname: mysql 2
    database.port: 3306 3
    database.user: dbuser 4
    database.password: dbpassword 5
    database.dbname: neverendingblog 6
    database.server.name: db 7
    table.include.list: neverendingblog.posts 8
    database.history.kafka.topic: dbhistory.posts 9
    database.history.kafka.bootstrap.servers: 'my-cluster-kafka-bootstrap:9092' 10
  tasksMax: 1

1

Debezium MySQL connector class that enables the MySQL connection for CDC.

2

MySQL server host address.

3

MySQL server port number.

4

MySQL user with the appropriate privileges.

5

MySQL user’s password.

6

MySQL database that includes the relevant tables to be connected to.

7

Logical name of the MySQL server or cluster.

8

Table name in the database whose data is to be captured by Debezium.

9

Kafka cluster address that the connector uses to write and recover Data Definition Language (DDL) statements to the database history topic.

10

List of Kafka brokers that the connector uses to write and recover DDL statements to the database history topic.

Debezium enables applications to respond to data changes in databases. The applications can react to insert, update, and delete events, such as:

  • Use the events to manage cache entries.

  • Update search indexes with the data.

  • Update a derived data store with the same information to implement Command Query Responsibility Separation (CQRS).

  • Aggregate the changes and produce a stream of patches for entities.

  • Drive streaming queries e.g. with Apache Flink or Kafka Streams.

  • Propagate data between microservices, e.g. using the Outbox pattern.

Debezium, as a CDC implementation, can be used with the Outbox pattern to create reliable event exchange between microservices.

Defining the Outbox Pattern

The Outbox pattern is a design pattern that describes writing data (e.g. change events) to an outbox. This ensures reliable, at-least-once delivery to other services. Instead of directly publishing the change events to these services, the application writes them in a temporary storage (e.g. database table). While actual data operation happens in its own table, the application should write the change events to the outbox table. These two operations should share the same transaction.

Figure 5.15: Outbox pattern

Using the Outbox pattern with the CDC is efficient because the CDC framework can capture the stored event data in the outbox table and publish it to any system.

In the order management example, the order microservice had a data consistency problem because of the dual write situation. The order microservice updates a PostgreSQL database and Elasticsearch for the search service consistently.

By implementing the Outbox pattern and the CDC, the order microservice can create change events on the outbox table while operating on the actual table that has the order data. The application can handle both operations in a single transaction and prevent dual writes by sending them to a durable pub-sub system.

Figure 5.16: Outbox pattern with CDC

Debezium, as a CDC implementation, can capture the change event data from the PostgreSQL outbox table and publish them to Kafka to be consumed by other application services.

Implementing the Outbox Pattern with Debezium

Debezium provides components that enable you to implement the Outbox pattern, such as:

Outbox Event Router

Outbox Event Router is a Single Message Transformation (SMT), that provides support for the Outbox pattern implementation with Debezium. When the Debezium connector captures the event data from the outbox table, the Outbox Event Router SMT provides the relevant transformation to get the data and then route that data.

The following example shows a Kafka Connect connector definition that uses the Outbox Event Router SMT.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
...omitted...
spec:
  class: io.debezium.connector.postgresql.PostgresConnector
  config:
    ...omitted...
    table.include.list: outboxevent 1
    tombstones.on.delete: false 2
    transforms: EventRouter 3
    transforms.EventRouter.type: io.debezium.transforms.outbox.EventRouter 4
    transforms.EventRouter.table.fields.additional.placement: type:header:eventType 5
    transforms.EventRouter.route.topic.replacement: ${routedByValue}.events 6
...omitted...

1

The outbox table whose data is to be captured by Debezium.

2

Setting this value to false means that Debezium skips deletes on the captured outbox table. Outbox tables should be designed to keep the data permanently so the captured event data should be deleted afterward.

3

SMT definition for EventRouter.

4

Type of the Outbox Event Router SMT.

5

Gets the type value from the header of the outbox message that is captured from the outbox table.

6

Replaces the topic name to be emitted by Debezium. By default, it is outbox.event.${routedByValue}, but in this case you set it as ${routedByValue}.events. This is a variable that equals to the aggregatetype column in the outbox table.

Outbox Quarkus Extension

The Outbox Quarkus Extension helps you to create the event model and entity by providing the ExportedEvent interface. A Quarkus application that uses JPA with Hibernate or Panache also enables the automatic creation of the outbox table with specific column names. The Outbox Event Router applies transformations by using these column names. The following example shows an order event that implements the ExportedEvent interface of the Outbox Quarkus Extension.

public class OrderCreatedEvent implements ExportedEvent<String, JsonNode> { 1

    ...omitted...

    @Override
    public String getAggregateId() { 2
        return String.valueOf(orderId);
    }

    @Override
    public String getAggregateType() { 3
        return "Order";
    }

    @Override
    public JsonNode getPayload() { 4
        return payload;
    }

    @Override
    public String getType() { 5
        return "OrderCreated";
    }

    @Override
    public Instant getTimestamp() { 6
        return created;
    }
}

1

OrderCreatedEvent implements the ExportedEvent with type JsonNode. This means the event payload that the application creates on the outbox table will be in JSON format.

2

Aggregate ID. The ID of the event in the outbox table. In this case it is the order ID.

3

Aggregate type of the event. This can be set as anything related to the event. The Outbox Event Router uses this value with the ${routedByValue} parameter which generally defines a part of the topic name.

4

Payload of the event which will be in JSON format.

5

Event type. This can be set as anything related to what the event is for.

6

Event creation time.

Both components are optional. You can create an entity that conforms to an outbox table, for which the Outbox Event Router can perform the related transformations. Alternatively, you can implement the custom transformations instead of using the Outbox Event Router. Debezium provides these components to make it easier to implement the Outbox pattern with CDC.

Revision: ad482-1.8-cc2ae1c