Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
Apache Kafka allows you to read data from topics in a flexible way by scaling horizontally when needed.
An application reading data from a Kafka topic is called a consumer. From a publish/subscribe perspective, consumers act as subscribers or readers. In particular, consumers read from specific partitions, which are the structures keeping the records.
Consumer applications receive and process records from a set of topics. A consumer can read from several topics at the same time.
To achieve scalability, it is common to have several consumers reading from the same topic.
Every consumer is assigned partitions to read from, which means that the reading workload is distributed among several consumers.
For example, in the following diagram, a topic with three partitions is displayed.
An external accounts application is reading transactions from the topic with two consumers
Consumer 0 is reading from Partition 1 and Consumer 1 is reading from both Partition 0 and Partition 2.
Both consumers belong to the same consumer group.
The previous approach allows different consumers to process records from different partitions within the same application logic. In the previous banking diagram, consider another application (for example, a payments application) that needs to process the transactions independently. This means that its consumers read all transactions in the topic regardless of other applications, so its consumers are independent from another application's consumers.
To achieve several consumers working together independently, Kafka introduces the concept of a consumer group.
A consumer group contains several consumers reading together from a topic.
Every consumer is assigned several partitions to read from.
The following diagram displays a banking diagram with two applications: accounts and payments.
Both applications read the records in the topic independently.
The main way to scale reading data in Kafka is by adding more consumers to a consumer group. If you include a new consumer, then Kafka redistributes the partitions assigned to each consumer. This process is known as rebalance and happens whenever a new consumer joins or leaves the consumer group. Kafka distributes partitions among consumers in the group based on the provided partition assignment strategy for the group.
Two consumers within the same consumer group cannot read from the same partition. Therefore, if there are more consumers reading from a topic than partitions in that topic, then some consumers are left unassigned and serve as a fallback in the case that any assigned consumer crashes. For example, if a topic has 5 consumers reading but there are only 3 partitions to read from, then Kafka leaves 2 consumers unassigned.
Every consumer group has a designated Kafka broker called the group coordinator. The group coordinator broker is responsible for receiving the heartbeats of every consumer. A heartbeat is a request sent by the consumer application to the group coordinator broker to notify that the consumer is still alive, this request is sent regularly. If the group coordinator broker stops receiving heartbeats for a long enough time, then the consumer is considered dead. In this case, the group coordinator triggers a rebalance. The partitions of the dead consumer are assigned to other consumers in the group.
The partition assignment happens when the consumer joins the group or when another consumer leaves the group. There are several ways in which Kafka can assign partitions to consumers.
- RangeAssignor
In this strategy, Kafka tries to assign the same partition number in every topic to a specific consumer. To achieve this, Kafka places consumers in lexicographic order by using the
member_idparameter assigned by the broker (from 0 to N - 1, whereNis the number of consumers in a consumer group). Then, it places topics and partitions within a topic in a numeric order. Kafka tries to assign matching numbers in the formPartition N - Consumer N. For example, consider the following diagram.Figure 2.11: RangeAssignor partition assignment strategyKafka has assigned:
Partition 0 in Topic A to
Consumer 0Partition 1 in Topic A to
Consumer 1Partition 0 in Topic B to
Consumer 0Partition 1 in Topic B to
Consumer 1Partitions with number 0 are assigned to consumers with number 0. Partitions with number 1 are assigned to consumers with number 1. This is a useful approach if the records in
Partition 0of bothTopic AandTopic Btopics are related. For example, records with identical keys can be sent to the same partition number within multiple topics. It allows you to perform aggregations in a specific consumer.However, this approach does not properly balance the partitions across consumers.
Consumer 2in the previous diagram has not been assigned although there are four partitions to read from.This is the default strategy.
- RoundRobin
The aim of this strategy is to distribute the partitions evenly across the different consumers in the consumer group. Kafka places partitions and consumers in the same order as the
RangeAssignorstrategy, but it will start assigning every partition to a different consumer. When all consumers are completed, the process starts again from the beginning. This achieves even workload distribution among consumers.Figure 2.12: RoundRobin partition assignment strategyThis is a useful approach if you want all consumers to be used. When a consumer leaves the group, however, the partition assignment algorithm will be run again. This means Kafka can move partitions from a healthy consumer to another consumer. For example, consider the following diagram where
Consumer 1leaves the consumer group.Figure 2.13: Consumer leaving consumer group in RoundRobin partition assignment strategyPartition 1inTopic Bwas previously assigned toConsumer 0. However, becauseConsumer 1left the group, the algorithm was run again withoutConsumer 1and now the consumer assigned by the algorithm isConsumer 2. This reassignment was not necessary, becausePartition 1inTopic Bwas initially consumed byConsumer 0, which did not leave the group. This strategy does not avoid unnecessary partition reassignments among consumers.- StickyAssignor
This strategy is similar to the
RoundRobinstrategy, but partition reassignments among consumers are considered. For example, considering the previous example, only the partitions assigned toConsumer 1(the consumer leaving the group) are moved to distribute them evenly.Figure 2.14: StickyAssignor partition assignment strategy
Consumers actively request data from the corresponding broker.
Consumers receive records in batches.
The maximum number of records in the batch can be configured by setting the max.poll.records consumer property.
The following snippet shows a typical implementation of a consumer in the Java programming language.
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Consumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties());
consumer.subscribe(Collections.singletonList("topic"));
while (true) {
System.out.println("Waiting for events...");
ConsumerRecords<Void, Integer> records =
consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<Void, Integer> record : records) {
System.out.println("Record received: " + record.value());
}
}An infinite loop is used to keep on polling data from Kafka indefinitely. | |
The | |
You can loop over the records and process them appropriately. |
For every partition and every consumer group, Kafka keeps a consumer offset. This offset is used to determine the last record read by a consumer. This partition offset is independent for every consumer group, allowing every consumer group to read records independently. Kafka stores different record offset values for two different consumer groups.
Keeping this offset is useful when a partition rebalance is triggered.
For example, consider two consumers, A and B in the same consumer group.
A is reading from Partition 0 and B is reading from Partition 1.
If a rebalance occurs and A is assigned Partition 1, then the consumer starts reading from the offset of Consumer B for Partition 1.
The consumer offset avoids reading duplicate records.
The consumer is responsible for sending an offset commit to Kafka. Committing means acknowledging that the records have been received successfully. The consumer sends the offset of the last record that has been received correctly.
You can send an automatic commit by setting the enable.auto.commit=true property.
With this approach, the consumer sends a request to Kafka with the last record offset received periodically.
The default time is five seconds.
Using an automatic commit, however, might cause race condition issues. For example, a consumer can take longer than five seconds to process a record. If it throws an exception after six seconds, the record has already been committed. Data duplication and data loss is covered later in this course.
The are a variety of ways to implement a Kafka consumer.
- The official Kafka Java client
By using the official supported Java client, which is developed as part of the Apache Kafka main project, you can create
Consumerobjects and poll data from Kafka.This method has been used in the previous examples of the lecture and is used in the corresponding Guided Exercise.
- Other open-source clients
Although Java is the main programming language supported by the Kafka project, there are several programming languages that you can use to implement a consumer.
Go, Python, Node.js and other languages are supported.
- Higher-level messaging frameworks
Aside from plain clients, Kafka can also be integrated with frameworks. By using a framework you can integrate Kafka with other features offered by frameworks (database connection, authentication…).
The framework used throughout the exercises in this course is Quarkus. The Quarkus Extension for Apache Kafka is built on top of the Smallrye Reactive Messaging. The Smallrye Reactive Messaging library also supports other messaging systems.
There are also other frameworks that support Kafka, such as Spring or Micronaut in the Java ecosystem.
- The kafka-console-consumer CLI
Kafka provides a built-in consumer shell script that is located in the
bindirectory of every broker machine installed in the cluster.Although this method is not usually used in production, it is a way to debug your Kafka application because it does not require any additional setup.
- The Strimzi Kafka CLI
The Strimzi Kafka CLI is a CLI that enables both the Apache Kafka developers and administrators to adopt Strimzi, which is an Apache Kafka Kubernetes operator.
Strimzi Kafka CLI provides a command-line utility to easily execute the shell scripts that Kafka brokers provide in the
bindirectory (kafka-console-producer.sh,kafka-console-consumer.sh…).
By using the Quarkus Kafka extension, you do not have to take care of creating the Consumer object or declaring the poll loop.
Instead, on the start-up of the application, Kafka will:
Read the
application.propertiesfile in theresourcesdirectory. Here, you declare the consumer configuration. The configuration relies on incoming channels.mp.messaging.incoming.
channel.property=valueThe
channelparameter identifies the incoming channel used. Thepropertyparameter is replaced by the property name of the configuration that you want to set for the incoming channel.Create a consumer under the hood based on the configuration properties provided.
Send the consumed records to the method annotated with
@Incoming. In this method, you receive every record individually.
The following snippet shows a method annotated with @Incoming responsible for processing records of double type.
import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class PriceConsumer {
@Incoming("prices")
public void consume(double price) {
// Process record
}
}The consumer name provided in the configuration is used as a parameter.
In this example, the |
A possible configuration for the previous snippet:
kafka.bootstrap.servers=my-cluster-kafka-bootstrap-youruser-kafka-cluster.apps.cluster.example.com:443 kafka.security.protocol=SSL kafka.ssl.truststore.location=/ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks kafka.ssl.truststore.password=password mp.messaging.incoming.prices.topic=store-pricesmp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer
mp.messaging.incoming.prices.group.id=prices