Bookmark this page

Sending Data with Producers

Objectives

After completing this section, you should be able to send data with producers to topics.

Defining Kafka Producers

One of the main operations in any messaging system is sending messages. In Apache Kafka, sending records is achieved by using a producer. Producers are applications or fragments of code that send records to topics. For example, if you have a website and you want to count the number of visits that every page has had, a producer would send a record for every page visit to a Kafka topic named visits for later processing.

Kafka provides a Producer API for this purpose. You can access this API by using a variety of clients, but the Java client is the only one maintained as part of the main Kafka project.

The Record Abstraction

In Java, a record is represented by the class ProducerRecord. This class contains the following fields:

Topic

The topic where the record is sent.

Partition (optional)

The partition where the record is sent.

If not provided, then the key field is used to choose the partition.

Key (optional)

The identifier of the record. The key is used to choose the partition where the record is sent. Records with the same key are sent to the same partition.

If both partition and key fields are not provided, then the partition is assigned in a round-robin fashion.

Value

The value or payload. In event-driven architectures, this is usually the event.

Timestamp (optional)

Represents the creation time of the record. If not provided, then the producer uses the current time.

The timestamp used by Kafka depends on the topic configuration:

  • If the topic is configured to use CreateTime, then the time stamp provided by the producer is used (this field takes effect).

  • If the topic is configured to use LogAppendTime, then the current time of the broker when the record is appended to the partition is used (this field is overwritten).

Headers (optional)

A list of key/value objects that might contain additional information relevant for your application.

A constructor for the ProducerRecord class has the following signature:

ProducerRecord​(String topic, Integer partition, K key, V value, Iterable<Header> headers)

Defining Serializers

In Kafka, the serialization process transforms an object, or a data structure, into a stream of bytes. This is necessary because Kafka messages are required to be an array of bytes. You serialize when you produce messages and you deserialize (the opposite operation) when you consume messages.

Kafka includes built-in serializers for simple data types (integer, string, float, long…​). However, you usually want to serialize custom objects because simple data types do not allow you to represent complex data models. For this purpose, there are several serialization libraries available (for example, Avro, which is covered later in this course).

The idea behind serialization is to provide Kafka with an array of bytes because that is the format that messages require. This array needs to have a fixed format so it can be converted back into an object whenever the record is consumed (this process is called deserialization).

For example, let's say you have the following class:

public Transaction {
    private int id;
    private int amount;
}

You can convert it into an array of bytes by appending the id and amount fields (the first bytes are used by id and the following ones by amount). Considering that an int holds 4 bytes, the resulting array will have 8 bytes (the first 4 bytes for id and the last 4 bytes for amount). Whenever the consumer has to deserialize the array of bytes, it uses the same format.

Serializers and deserializers implement the org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer interfaces (respectively). Creating custom serializers is covered later in this course.

Producers expect a configuration property named key.serializer to be completed with the serializer class used to send records.

Sending Records to Kafka

Although sending a record to a Kafka topic is a simple concept, many processes, methods, and configurations are involved.

Describing the Process

  1. The producer creates a record with at least a topic and a value.

  2. The producer serializes the record key, and value.

  3. The producer sends the record to a partitioner. A partitioner is a component of the producer responsible for choosing the partition where the record is sent. If the partition field in the ProducerRecord is provided, then the partitioner uses that partition. If it is not provided, then a partition is selected based on the key of the record. If no key is provided, then a round-robin strategy is used to select the partition.

  4. The producer does not directly send records to the Kafka broker. Instead, the producer groups records going to the same partition in batches. A batch is a group of records that are sent together to the Kafka broker. Sending individual records to Kafka would lead into a lot of networking traffic and overhead. At the same time, batches are usually compressed for more efficient data transfer and storage.

  5. Another thread sends the batches of records to the appropriate Kafka brokers.

  6. The broker receives the messages and sends back a response. If the records were written successfully, then the broker sends a response containing topic, partition, and offset. Because several records were sent, the offset starts from the last record written into the topic.

    If the records are not written successfully, then the broker sends an error. On errors, the producer might retry sending the records if the error is transient and the producer is configured to retry.

Figure 2.8: A producer sending records to Kafka

Ways of Producing Records

Depending on the type of messages that you are sending to Kafka and your application's needs, there are three ways of producing records.

Fire-and-forget

By using this method, you do not wait for the broker to send back a response. The message is sent to the broker and your application continues processing the code after sending the message. In case of an error, your application is not notified.

ProducerRecord<String, String> record = new ProducerRecord<>(
        "topic",
        "value"
    );

try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

In the previous code snippet, you create a record without a key. The send method returns a Future object, which is ignored to avoid waiting for a response, ensuring a "Fire-and-Forget" pattern. Although you can not capture broker errors with this approach, the try/catch block can capture other exceptions (for example, serialization or time out issues).

Not waiting for a response from the broker might increase the performance of your producer application. This is a useful approach if each individual message is not critical. For example, in an application that is sending messages to track web page traffic, expediency of the messages being sent is a higher priority than ensuring each message is delivered. Even with this approach, Kafka is highly available and it is not common that messages are dropped.

Synchronous

By using this method, the thread where you execute the action waits for the broker response. This leads to poor performance if the brokers are busy because the thread is blocked until the request is completed.

Using this approach, however, allows you to capture errors from the broker. Retriable errors are those caused by temporary issues (for example, a failed network connection). The producer can automatically retry these errors. Some errors are not caused by temporary issues (for example, if the message size is too large).

ProducerRecord<String, String> record = new ProducerRecord<>(
        "topic",
        "value"
    );

try {
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}

In the previous snippet, you execute get on the Future object returned by the send method. This blocks the current thread until the producer receives a response from the broker.

Asynchronous

By using this method, the thread does not wait for a response from the broker. Instead, you define a callback to be executed when the response is ready. The producer thread continues sending messages and is interrupted to execute the callback whenever it is ready.

private class ProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        }
    }
}

ProducerRecord<String, String> record = new ProducerRecord<>(
        "topic",
        "value"
    );

producer.send(record, new ProducerCallback());

The previous snippet defines a callback that implements the org.apache.kafka.clients.producer.Callback interface. If an error has occurred, then the exception on the onCompletion method will not be null.

You should not include any heavy tasks inside the callback because it blocks the producer thread from sending messages or processing other callbacks. If long tasks are required, then you should use a separate thread.

This approach is a mix of the previous methods and it is commonly used in production environments.

Acknowledging Records

When records are sent to Kafka, the leader partition receives those records first and then replicates them among the other replicas. Kafka allows you to configure when a producer should consider that a record has been successfully written into a topic. The producer has a configuration property named acks, which you can set to any of following values:

acks=0

The producer does not wait for a response from the broker. The message is marked as written as soon as it is sent. If an error occurs (for example, the leader is not available), then the producer does not receive a response. Not waiting for a response means that the producer is available for sending new messages faster.

acks=1 (default)

The producer receives a response as soon as the leader receives the message. The producer does not wait for the message to be written in other replicas. If the leader crashes and another replica without the message is elected, then the message is lost.

acks=all

The producer receives a response whenever the message is written to all available replicas. This ensures that the message is replicated and it is not lost if the leader crashes and a new leader election is needed.

If the leader is the only replica, however, then Kafka relies on the min.insync.replicas property. This property states how many replicas should be in-sync (with the same replicated data) before a producer request with ack=all is allowed. For example, if min.insync.replicas=2 and the leader is the only replica available, then the request is rejected. Otherwise, the behavior is the same as ack=1 and losing messages without replication is possible.

Introducing the Quarkus Extension

Throughout this course, most activities use the Quarkus framework to produce and consume data from Kafka. The Quarkus Kafka extension is built on top of the SmallRye Reactive Messaging framework, which allows you to connect to several messaging platforms (Kafka, Apache Camel, RabbitMQ…​). For this reason, the name of the abstractions used by SmallRye are generic and they intend to match several messaging systems. The following are the main concepts used by SmallRye and its equivalent in Kafka.

Messages

A message is the abstraction of a Kafka record.

Channel

An abstraction for a stream of records in Kafka. Channels are mapped to Kafka topics.

There are outgoing and incoming channels. An outgoing channel represents a Kafka producer and is used to send messages to the mapped topic. An incoming channel represents a Kafka consumer and is used to read messages from the mapped topic.

Connectors

A connector is a component that binds a channel to a remote broker (Kafka, RabbitMQ…​) In this course, the connector is always SmallRye Kafka.

Defining a Producer

The Quarkus Kafka extension relies on configuration and annotations to create producers on start-up. A method annotated with @Outgoing that returns a Multi<T> object is considered a producer. The Multi class implements the Publisher interface and it is used to send messages to Kafka. The following snippet shows a producer sending a random double number every second.

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;

@ApplicationScoped
public class KafkaPriceProducer {

    private final Random random = new Random();

    @Outgoing("prices") 1
    public Multi<Double> generate() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .map(x -> random.nextDouble());
    }

}

1

@Outgoing requires the channel name to send the messages. Note that the channel is mapped to a Kafka topic.

The previous code needs some configuration to work. This configuration is placed in the application.properties file in the resources directory. The properties are specified by channel. Therefore, they are of the form:

mp.messaging.outgoing.CHANNEL.PROPERTY=VALUE

In the following snippet you set several properties for the prices channel. In this case, the channel is outgoing.

kafka.bootstrap.servers=my-cluster-kafka-bootstrap-youruser-kafka-cluster.apps.cluster.example.com:443
kafka.security.protocol=SSL
kafka.ssl.truststore.location=ABSOLUTE_PATH/truststore.jks
kafka.ssl.truststore.password=password

mp.messaging.outgoing.prices.key.serializer = org.apache.kafka.common.serialization.StringSerializer 1
mp.messaging.outgoing.prices.value.serializer = org.apache.kafka.common.serialization.IntegerSerializer 2
mp.messaging.outgoing.prices.connector = smallrye-kafka 3
mp.messaging.outgoing.prices.topic = store-prices 4

1

Serializer for the record keys.

2

Serializer for the record values.

3

The connector used by the channel. In this case, the connector for Kafka. If not specified, then an in-memory solution is used.

4

The name of the topic used to send messages. If not specified, then the name of the channel is used.

 

References

Producer Javadoc

Producer Record Javadoc

Producer Partitioner Javadoc

Quarkus - Sending Messages

Neha Narkhede, Gwen Shapira, Todd Palino. (2017) Kafka: The Definitive Guide. O'Reilly. ISBN 978-1-491-99065-0.

Adam Bellemare. (2020) Building Event-Driven Microservices. O'Reilly. ISBN 978-1-492-05789-5.

Revision: ad482-1.8-cc2ae1c