Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In an Apache Kafka cluster, producers and consumers communicate by sending messages to broker nodes. In the Kafka terminology, the terms record and message can have different meanings.
A record is an event that a producer sends to a Kafka cluster, and a consumer retrieves from a Kafka cluster. A message can be synonymous with a record. However, a message can also mean a generic concept of sending or receiving information. For example, when a consumer polls Kafka for records, it receives a batched message with multiple records.
In this course, to prevent confusion, a record is always synonymous with a Kafka event.
Producers group related records into topics. Consumers consume records from a set of one or more topics.
You can think of a topic as a database table. Each topic contains records, which are analogous to individual rows in a database table.
A Kafka record is an event with metadata, for example:
- (Optional) Key
The record key is an opaque byte array. Producers can use the record key for partition assignment within a topic. For example, producers can consistently send events with the same key to the same partition.
Kafka can also use record keys for pruning old events. For example, consider a partition that receives the following records:
Key:
id=1, value:balance: $500Key:
id=1, value:balance: $450Key:
id=1, value:balance: $300Kafka can compact the log by removing preceding records with the
id=1key. This strategy is called log compaction.Because keys increase the size of a record, they can be
null.
- Value
The record value, sometimes called a payload, is an opaque byte array. The value of a record is an event recorded by producers.
- Timestamp
Kafka uses the record timestamp field to order events.
A record contains other metadata, such as offset, checksum, and similar.
See the Javadoc for the org.apache.kafka.clients.producer.ProducerRecord class for a full list of metadata associated with a record.
Kafka brokers use topics to categorize records.
A topic is a logical group for related records.
For example, you can group records that represent a user clicking on a web page in the example-com-clicks topic.
Each topic has one or more partitions.
In Kafka, topics have the following properties:
- Multi-subscriber and Multi-producer
Multiple independent producers can send records to one topic.
Similarly, multiple independent consumers can consume the same message. Consumers specify what message to read based on message offset.
- Durable
In Kafka, topics are durable. Kafka brokers store the records for a specified duration (7 days or
604800000ms by default). Consequently, when a consumer fails, it can use several error recovery strategies, such as:Process all of the records in the topic again.
Process records from the last processed record.
Process records from a specified offset, for example in case you manage message offsets by using custom strategies.
You can specify custom record retention time for each topic.
- Unordered
Because you can split topics into multiple partitions, Kafka does not guarantee record order within a single topic. Consumers should not rely on record order.
- Pull-only
Producers push records to Kafka brokers. Consumers poll records from each partition of a topic. Consequently, it is both the consumers and producers that initiate connection with Kafka brokers. Kafka brokers do not push records to consumers. This enables Kafka to lower its overhead for each record and withstand consumer and producer scaling.
- Append-only
Producers can only append new records to a topic. Because Kafka brokers do not parse records in a topic, brokers cannot perform any functions on a record, such as
deleteormodify. Brokers delete records only after the configured retention period for a record expires.You can structure a stream of records as a table, which enables you to effectively modify and delete records. However, a topic retains all of the records. Structuring a stream as a table is a subject for further chapters.
The AMQ Streams Kafka distribution provides a topic operator for managing Kafka topics. The topic operator enables you to create and delete Kafka topics by using custom resource (CR) YAML files.
You can create a topic by using the KafkaTopic kind, for example:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: "example-topic"
labels:
strimzi.io/cluster: "example-cluster"
spec:
partitions: 3
replicas: 1
config:
retention.ms: 604800000 # 7 days
segment.bytes: 1073741824The topic operator communicates with the Kafka cluster deployed on RHOCP and manages the topic based on the YAML file descriptor.
The topic operator enables you to create, update, and delete Kafka topics by using the RHOCP API.
Each topic is split into one or more partitions. Kafka distributes partitions to brokers. Partitions physically store records on the file system. Each partition can be replicated to multiple brokers to gain resiliency against broker failure.
For example, consider the following orders topic split into 3 partitions, each partition has one replica:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata:name: "orders"labels: strimzi.io/cluster: "example-cluster" spec:partitions: 3replicas: 1config: retention.ms: 604800000 segment.bytes: 1073741824
In the preceding image, the producer sends records of orders with IDs from 1 to 9. The producer splits the records into partitions and sends each record to a separate partition. Note that each partition retains the order of messages.
The consumer subscribes to the orders topic.
This means the consumer polls all three brokers that contain the topic partitions for records.
Because the consumer polls partitions in random order, it receives records in random order.
Each topic can specify a replication factor. When you configure a replication factor, one broker node becomes the leader for each of the partitions in the topic.
The partition leader replicates the partition into a specified number of peer broker nodes. The peers that contain a replica of the partition are called followers.
When followers contain the same partition records as the leader, they become in-sync replicas (ISR).
For example, consider the preceding topic with replication factor 3:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: "orders"
labels:
strimzi.io/cluster: "example-cluster"
spec:
partitions: 3
replicas: 3
config:
retention.ms: 604800000
segment.bytes: 1073741824This means each of the three partitions is replicated on three nodes:
In the preceding example, ISR is 3 because there are three replicas that contain the same partition data.
Warning
Do not select a replication factor greater than the number of brokers in your cluster. One broker cannot replicate the same partition multiple times.
Consequently, selecting a replication factor greater than the number of brokers means the topic will not be created until the number of suitable broker followers increases.
When a replica fails to commit a record, for example due to space constraint, the ISR number drops to 2 and the replica set becomes unhealthy:
In such case, brokers 2 and 4 can still commit new records and are ready to serve records. However, the replica set stays unhealthy until there is a third replica that replicates the full commit log of records.
Replicating number N means a topic can overcome N-1 broker failures for each partition.
For example, if you configure the replication factor to 4, this means that N=4 and your Kafka cluster is resilient for up to three replica failures.
This is known as N-1 resiliency.
Producers send records to partition leaders. Partition leaders then propagate the record to each replica. Consumers poll records from the partition leaders by default.
Separating each topic into independent partitions is crucial for scalability of individual topics. Creating multiple partitions can bring benefits, such as:
Increasing topic scalability, because topic load is distributed among multiple brokers.
Increasing topic failure resiliency, because a broker failure influences a smaller portion of the topic.
Increasing topic throughput, because multiple instances of one consumer can connect to a subset of partitions from one topic.
Additionally, you can scale each partition individually by changing the replication number.
However, a high number of partitions come with a number of tradeoffs, such as:
Increasing ordering complexity, because consumers poll partitions in random order.
Increasing management overhead, because consumers and producers must keep track of all of the partitions. Additionally, ZooKeeper maintains information about each partition broker. Leader election for each partition can become costly.
The Kafka cluster also maintains information about each partition, such as message offset for each consumer in each partition.
Note that a topic with a single partition provides global topic ordering. Because consumers poll only a single broker node, the record order is preserved.
A single partition, however, also means that a topic is less resilient and scalable. Additionally, because you cannot further divide a partition, the whole topic must fit on a broker node. This might restrict which nodes are available for serving such a partition.
While this is a special case, a single-partition topic might be useful for specific use-cases. For example, preserving global order can be useful in applications that track the change from a previous value, such as determining if a stock is rising or falling in a stock market application.
Additionally, you might solve dependency on ordered records by using record keys and a suitable partition strategy in your producer application. For example, if your application must process records related to specific stock ticks in order, you can use the stock ID as the record key and ensure that the partitioner always sends stocks with the same IDs to the same topic partition. In that case, the topic does not preserve global order, but each consumer receives records for particular stock in order.
References
For more information about the AMQ Streams on the Red Hat OpenShift Container Platform, refer to the Using AMQ Streams on OpenShift documentation at https://access.redhat.com/documentation/en-us/red_hat_amq/2021.q2/html-single/using_amq_streams_on_openshift/index