Bookmark this page

Receiving Data with Consumers

Objectives

After completing this section, you should be able to consume data from topics.

Describing Consumers

Apache Kafka allows you to read data from topics in a flexible way by scaling horizontally when needed.

An application reading data from a Kafka topic is called a consumer. From a publish/subscribe perspective, consumers act as subscribers or readers. In particular, consumers read from specific partitions, which are the structures keeping the records.

Consumer applications receive and process records from a set of topics. A consumer can read from several topics at the same time.

To achieve scalability, it is common to have several consumers reading from the same topic. Every consumer is assigned partitions to read from, which means that the reading workload is distributed among several consumers. For example, in the following diagram, a topic with three partitions is displayed. An external accounts application is reading transactions from the topic with two consumers Consumer 0 is reading from Partition 1 and Consumer 1 is reading from both Partition 0 and Partition 2. Both consumers belong to the same consumer group.

Figure 2.9: Two consumers reading from a topic with three partitions

Describing Consumer Groups

The previous approach allows different consumers to process records from different partitions within the same application logic. In the previous banking diagram, consider another application (for example, a payments application) that needs to process the transactions independently. This means that its consumers read all transactions in the topic regardless of other applications, so its consumers are independent from another application's consumers.

To achieve several consumers working together independently, Kafka introduces the concept of a consumer group. A consumer group contains several consumers reading together from a topic. Every consumer is assigned several partitions to read from. The following diagram displays a banking diagram with two applications: accounts and payments. Both applications read the records in the topic independently.

Figure 2.10: Consumers in different consumer groups reading from a topic

The main way to scale reading data in Kafka is by adding more consumers to a consumer group. If you include a new consumer, then Kafka redistributes the partitions assigned to each consumer. This process is known as rebalance and happens whenever a new consumer joins or leaves the consumer group. Kafka distributes partitions among consumers in the group based on the provided partition assignment strategy for the group.

Two consumers within the same consumer group cannot read from the same partition. Therefore, if there are more consumers reading from a topic than partitions in that topic, then some consumers are left unassigned and serve as a fallback in the case that any assigned consumer crashes. For example, if a topic has 5 consumers reading but there are only 3 partitions to read from, then Kafka leaves 2 consumers unassigned.

Every consumer group has a designated Kafka broker called the group coordinator. The group coordinator broker is responsible for receiving the heartbeats of every consumer. A heartbeat is a request sent by the consumer application to the group coordinator broker to notify that the consumer is still alive, this request is sent regularly. If the group coordinator broker stops receiving heartbeats for a long enough time, then the consumer is considered dead. In this case, the group coordinator triggers a rebalance. The partitions of the dead consumer are assigned to other consumers in the group.

Describing Partition Assignment Strategies

The partition assignment happens when the consumer joins the group or when another consumer leaves the group. There are several ways in which Kafka can assign partitions to consumers.

RangeAssignor

In this strategy, Kafka tries to assign the same partition number in every topic to a specific consumer. To achieve this, Kafka places consumers in lexicographic order by using the member_id parameter assigned by the broker (from 0 to N - 1, where N is the number of consumers in a consumer group). Then, it places topics and partitions within a topic in a numeric order. Kafka tries to assign matching numbers in the form Partition N - Consumer N. For example, consider the following diagram.

Figure 2.11: RangeAssignor partition assignment strategy

Kafka has assigned:

  • Partition 0 in Topic A to Consumer 0

  • Partition 1 in Topic A to Consumer 1

  • Partition 0 in Topic B to Consumer 0

  • Partition 1 in Topic B to Consumer 1

    Partitions with number 0 are assigned to consumers with number 0. Partitions with number 1 are assigned to consumers with number 1. This is a useful approach if the records in Partition 0 of both Topic A and Topic B topics are related. For example, records with identical keys can be sent to the same partition number within multiple topics. It allows you to perform aggregations in a specific consumer.

    However, this approach does not properly balance the partitions across consumers. Consumer 2 in the previous diagram has not been assigned although there are four partitions to read from.

    This is the default strategy.

RoundRobin

The aim of this strategy is to distribute the partitions evenly across the different consumers in the consumer group. Kafka places partitions and consumers in the same order as the RangeAssignor strategy, but it will start assigning every partition to a different consumer. When all consumers are completed, the process starts again from the beginning. This achieves even workload distribution among consumers.

Figure 2.12: RoundRobin partition assignment strategy

This is a useful approach if you want all consumers to be used. When a consumer leaves the group, however, the partition assignment algorithm will be run again. This means Kafka can move partitions from a healthy consumer to another consumer. For example, consider the following diagram where Consumer 1 leaves the consumer group.

Figure 2.13: Consumer leaving consumer group in RoundRobin partition assignment strategy

Partition 1 in Topic B was previously assigned to Consumer 0. However, because Consumer 1 left the group, the algorithm was run again without Consumer 1 and now the consumer assigned by the algorithm is Consumer 2. This reassignment was not necessary, because Partition 1 in Topic B was initially consumed by Consumer 0, which did not leave the group. This strategy does not avoid unnecessary partition reassignments among consumers.

StickyAssignor

This strategy is similar to the RoundRobin strategy, but partition reassignments among consumers are considered. For example, considering the previous example, only the partitions assigned to Consumer 1 (the consumer leaving the group) are moved to distribute them evenly.

Figure 2.14: StickyAssignor partition assignment strategy

Describing Polling

Consumers actively request data from the corresponding broker. Consumers receive records in batches. The maximum number of records in the batch can be configured by setting the max.poll.records consumer property. The following snippet shows a typical implementation of a consumer in the Java programming language.

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

Consumer<Void,Integer> consumer = new KafkaConsumer<>(configureProperties());
consumer.subscribe(Collections.singletonList("topic"));

while (true) { 1
    System.out.println("Waiting for events...");

    ConsumerRecords<Void, Integer> records =
        consumer.poll(Duration.ofMillis(10000)); 2

    for (ConsumerRecord<Void, Integer> record : records) { 3
        System.out.println("Record received: " + record.value());
    }
}

1

An infinite loop is used to keep on polling data from Kafka indefinitely.

2

The poll method is responsible for retrieving the records from the corresponding topic partition. If records are available, then the poll method returns immediately. If no records are available, then the poll method waits for the provided time out.

3

You can loop over the records and process them appropriately.

Describing Commit Strategies

For every partition and every consumer group, Kafka keeps a consumer offset. This offset is used to determine the last record read by a consumer. This partition offset is independent for every consumer group, allowing every consumer group to read records independently. Kafka stores different record offset values for two different consumer groups.

Keeping this offset is useful when a partition rebalance is triggered. For example, consider two consumers, A and B in the same consumer group. A is reading from Partition 0 and B is reading from Partition 1. If a rebalance occurs and A is assigned Partition 1, then the consumer starts reading from the offset of Consumer B for Partition 1. The consumer offset avoids reading duplicate records.

The consumer is responsible for sending an offset commit to Kafka. Committing means acknowledging that the records have been received successfully. The consumer sends the offset of the last record that has been received correctly.

You can send an automatic commit by setting the enable.auto.commit=true property. With this approach, the consumer sends a request to Kafka with the last record offset received periodically. The default time is five seconds.

Using an automatic commit, however, might cause race condition issues. For example, a consumer can take longer than five seconds to process a record. If it throws an exception after six seconds, the record has already been committed. Data duplication and data loss is covered later in this course.

Reading Data with Consumers

The are a variety of ways to implement a Kafka consumer.

The official Kafka Java client

By using the official supported Java client, which is developed as part of the Apache Kafka main project, you can create Consumer objects and poll data from Kafka.

This method has been used in the previous examples of the lecture and is used in the corresponding Guided Exercise.

Other open-source clients

Although Java is the main programming language supported by the Kafka project, there are several programming languages that you can use to implement a consumer.

Go, Python, Node.js and other languages are supported.

Higher-level messaging frameworks

Aside from plain clients, Kafka can also be integrated with frameworks. By using a framework you can integrate Kafka with other features offered by frameworks (database connection, authentication…​).

The framework used throughout the exercises in this course is Quarkus. The Quarkus Extension for Apache Kafka is built on top of the Smallrye Reactive Messaging. The Smallrye Reactive Messaging library also supports other messaging systems.

There are also other frameworks that support Kafka, such as Spring or Micronaut in the Java ecosystem.

The kafka-console-consumer CLI

Kafka provides a built-in consumer shell script that is located in the bin directory of every broker machine installed in the cluster.

Although this method is not usually used in production, it is a way to debug your Kafka application because it does not require any additional setup.

The Strimzi Kafka CLI

The Strimzi Kafka CLI is a CLI that enables both the Apache Kafka developers and administrators to adopt Strimzi, which is an Apache Kafka Kubernetes operator.

Strimzi Kafka CLI provides a command-line utility to easily execute the shell scripts that Kafka brokers provide in the bin directory (kafka-console-producer.sh, kafka-console-consumer.sh…​).

Introducing the Quarkus Kafka extension

By using the Quarkus Kafka extension, you do not have to take care of creating the Consumer object or declaring the poll loop. Instead, on the start-up of the application, Kafka will:

  1. Read the application.properties file in the resources directory. Here, you declare the consumer configuration. The configuration relies on incoming channels.

    mp.messaging.incoming.channel.property=value

    The channel parameter identifies the incoming channel used. The property parameter is replaced by the property name of the configuration that you want to set for the incoming channel.

  2. Create a consumer under the hood based on the configuration properties provided.

  3. Send the consumed records to the method annotated with @Incoming. In this method, you receive every record individually.

The following snippet shows a method annotated with @Incoming responsible for processing records of double type.

import org.eclipse.microprofile.reactive.messaging.Incoming;
import javax.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class PriceConsumer {

    @Incoming("prices") 1
    public void consume(double price) {
        // Process record
    }
}

1

The consumer name provided in the configuration is used as a parameter. In this example, the prices consumer is used.

A possible configuration for the previous snippet:

kafka.bootstrap.servers=my-cluster-kafka-bootstrap-youruser-kafka-cluster.apps.cluster.example.com:443
kafka.security.protocol=SSL
kafka.ssl.truststore.location=/ABSOLUTE_PATH_TO_YOUR_WORKSPACE_FOLDER/truststore.jks
kafka.ssl.truststore.password=password

mp.messaging.incoming.prices.topic=store-prices 1
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.DoubleDeserializer 2
mp.messaging.incoming.prices.group.id=prices 3

1

The topic name to read from.

2

The deserializer class for the values. In this example, the double deserializer is used for prices.

3

The consumer group to which the consumer belongs.

Revision: ad482-1.8-cc2ae1c