Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
One of the main operations in any messaging system is sending messages.
In Apache Kafka, sending records is achieved by using a producer.
Producers are applications or fragments of code that send records to topics.
For example, if you have a website and you want to count the number of visits that every page has had, a producer would send a record for every page visit to a Kafka topic named visits for later processing.
Kafka provides a Producer API for this purpose. You can access this API by using a variety of clients, but the Java client is the only one maintained as part of the main Kafka project.
In Java, a record is represented by the class ProducerRecord.
This class contains the following fields:
- Topic
The topic where the record is sent.
- Partition (optional)
The partition where the record is sent.
If not provided, then the
keyfield is used to choose the partition.- Key (optional)
The identifier of the record. The key is used to choose the partition where the record is sent. Records with the same key are sent to the same partition.
If both
partitionandkeyfields are not provided, then the partition is assigned in a round-robin fashion.- Value
The value or payload. In event-driven architectures, this is usually the event.
- Timestamp (optional)
Represents the creation time of the record. If not provided, then the producer uses the current time.
The timestamp used by Kafka depends on the topic configuration:
If the topic is configured to use
CreateTime, then the time stamp provided by the producer is used (this field takes effect).If the topic is configured to use
LogAppendTime, then the current time of the broker when the record is appended to the partition is used (this field is overwritten).
- Headers (optional)
A list of key/value objects that might contain additional information relevant for your application.
A constructor for the ProducerRecord class has the following signature:
ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers)
In Kafka, the serialization process transforms an object, or a data structure, into a stream of bytes. This is necessary because Kafka messages are required to be an array of bytes. You serialize when you produce messages and you deserialize (the opposite operation) when you consume messages.
Kafka includes built-in serializers for simple data types (integer, string, float, long…). However, you usually want to serialize custom objects because simple data types do not allow you to represent complex data models. For this purpose, there are several serialization libraries available (for example, Avro, which is covered later in this course).
The idea behind serialization is to provide Kafka with an array of bytes because that is the format that messages require. This array needs to have a fixed format so it can be converted back into an object whenever the record is consumed (this process is called deserialization).
For example, let's say you have the following class:
public Transaction {
private int id;
private int amount;
}You can convert it into an array of bytes by appending the id and amount fields (the first bytes are used by id and the following ones by amount).
Considering that an int holds 4 bytes, the resulting array will have 8 bytes (the first 4 bytes for id and the last 4 bytes for amount).
Whenever the consumer has to deserialize the array of bytes, it uses the same format.
Serializers and deserializers implement the org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer interfaces (respectively).
Creating custom serializers is covered later in this course.
Producers expect a configuration property named key.serializer to be completed with the serializer class used to send records.
Although sending a record to a Kafka topic is a simple concept, many processes, methods, and configurations are involved.
The producer creates a record with at least a topic and a value.
The producer serializes the record key, and value.
The producer sends the record to a partitioner. A partitioner is a component of the producer responsible for choosing the partition where the record is sent. If the
partitionfield in theProducerRecordis provided, then the partitioner uses that partition. If it is not provided, then a partition is selected based on the key of the record. If no key is provided, then a round-robin strategy is used to select the partition.The producer does not directly send records to the Kafka broker. Instead, the producer groups records going to the same partition in batches. A batch is a group of records that are sent together to the Kafka broker. Sending individual records to Kafka would lead into a lot of networking traffic and overhead. At the same time, batches are usually compressed for more efficient data transfer and storage.
Another thread sends the batches of records to the appropriate Kafka brokers.
The broker receives the messages and sends back a response. If the records were written successfully, then the broker sends a response containing topic, partition, and offset. Because several records were sent, the offset starts from the last record written into the topic.
If the records are not written successfully, then the broker sends an error. On errors, the producer might retry sending the records if the error is transient and the producer is configured to retry.
Depending on the type of messages that you are sending to Kafka and your application's needs, there are three ways of producing records.
- Fire-and-forget
By using this method, you do not wait for the broker to send back a response. The message is sent to the broker and your application continues processing the code after sending the message. In case of an error, your application is not notified.
ProducerRecord<String, String> record = new ProducerRecord<>( "topic", "value" ); try {producer.send(record);} catch (Exception e) { e.printStackTrace(); }In the previous code snippet, you create a record without a key. The
sendmethod returns aFutureobject, which is ignored to avoid waiting for a response, ensuring a "Fire-and-Forget" pattern. Although you can not capture broker errors with this approach, thetry/catchblock can capture other exceptions (for example, serialization or time out issues).Not waiting for a response from the broker might increase the performance of your producer application. This is a useful approach if each individual message is not critical. For example, in an application that is sending messages to track web page traffic, expediency of the messages being sent is a higher priority than ensuring each message is delivered. Even with this approach, Kafka is highly available and it is not common that messages are dropped.
- Synchronous
By using this method, the thread where you execute the action waits for the broker response. This leads to poor performance if the brokers are busy because the thread is blocked until the request is completed.
Using this approach, however, allows you to capture errors from the broker. Retriable errors are those caused by temporary issues (for example, a failed network connection). The producer can automatically retry these errors. Some errors are not caused by temporary issues (for example, if the message size is too large).
ProducerRecord<String, String> record = new ProducerRecord<>( "topic", "value" ); try {producer.send(record).get();} catch (Exception e) { e.printStackTrace(); }In the previous snippet, you execute
geton theFutureobject returned by thesendmethod. This blocks the current thread until the producer receives a response from the broker.- Asynchronous
By using this method, the thread does not wait for a response from the broker. Instead, you define a callback to be executed when the response is ready. The producer thread continues sending messages and is interrupted to execute the callback whenever it is ready.
private class ProducerCallback implements Callback { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) { e.printStackTrace(); }} } ProducerRecord<String, String> record = new ProducerRecord<>( "topic", "value" );producer.send(record, new ProducerCallback());The previous snippet defines a callback that implements the
org.apache.kafka.clients.producer.Callbackinterface. If an error has occurred, then the exception on theonCompletionmethod will not benull.You should not include any heavy tasks inside the callback because it blocks the producer thread from sending messages or processing other callbacks. If long tasks are required, then you should use a separate thread.
This approach is a mix of the previous methods and it is commonly used in production environments.
When records are sent to Kafka, the leader partition receives those records first and then replicates them among the other replicas.
Kafka allows you to configure when a producer should consider that a record has been successfully written into a topic.
The producer has a configuration property named acks, which you can set to any of following values:
-
acks=0 The producer does not wait for a response from the broker. The message is marked as written as soon as it is sent. If an error occurs (for example, the leader is not available), then the producer does not receive a response. Not waiting for a response means that the producer is available for sending new messages faster.
-
acks=1(default) The producer receives a response as soon as the leader receives the message. The producer does not wait for the message to be written in other replicas. If the leader crashes and another replica without the message is elected, then the message is lost.
-
acks=all The producer receives a response whenever the message is written to all available replicas. This ensures that the message is replicated and it is not lost if the leader crashes and a new leader election is needed.
If the leader is the only replica, however, then Kafka relies on the
min.insync.replicasproperty. This property states how many replicas should be in-sync (with the same replicated data) before a producer request withack=allis allowed. For example, ifmin.insync.replicas=2and the leader is the only replica available, then the request is rejected. Otherwise, the behavior is the same asack=1and losing messages without replication is possible.
Throughout this course, most activities use the Quarkus framework to produce and consume data from Kafka. The Quarkus Kafka extension is built on top of the SmallRye Reactive Messaging framework, which allows you to connect to several messaging platforms (Kafka, Apache Camel, RabbitMQ…). For this reason, the name of the abstractions used by SmallRye are generic and they intend to match several messaging systems. The following are the main concepts used by SmallRye and its equivalent in Kafka.
- Messages
A message is the abstraction of a Kafka record.
- Channel
An abstraction for a stream of records in Kafka. Channels are mapped to Kafka topics.
There are outgoing and incoming channels. An outgoing channel represents a Kafka producer and is used to send messages to the mapped topic. An incoming channel represents a Kafka consumer and is used to read messages from the mapped topic.
- Connectors
A connector is a component that binds a channel to a remote broker (Kafka, RabbitMQ…) In this course, the connector is always SmallRye Kafka.
The Quarkus Kafka extension relies on configuration and annotations to create producers on start-up.
A method annotated with @Outgoing that returns a Multi<T> object is considered a producer.
The Multi class implements the Publisher interface and it is used to send messages to Kafka.
The following snippet shows a producer sending a random double number every second.
import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;
import java.util.Random;
@ApplicationScoped
public class KafkaPriceProducer {
private final Random random = new Random();
@Outgoing("prices")
public Multi<Double> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> random.nextDouble());
}
}
|
The previous code needs some configuration to work.
This configuration is placed in the application.properties file in the resources directory.
The properties are specified by channel.
Therefore, they are of the form:
mp.messaging.outgoing.CHANNEL.PROPERTY=VALUE
In the following snippet you set several properties for the prices channel.
In this case, the channel is outgoing.
kafka.bootstrap.servers=my-cluster-kafka-bootstrap-youruser-kafka-cluster.apps.cluster.example.com:443 kafka.security.protocol=SSL kafka.ssl.truststore.location=ABSOLUTE_PATH/truststore.jks kafka.ssl.truststore.password=password mp.messaging.outgoing.prices.key.serializer = org.apache.kafka.common.serialization.StringSerializermp.messaging.outgoing.prices.value.serializer = org.apache.kafka.common.serialization.IntegerSerializer
mp.messaging.outgoing.prices.connector = smallrye-kafka
mp.messaging.outgoing.prices.topic = store-prices
References
Neha Narkhede, Gwen Shapira, Todd Palino. (2017) Kafka: The Definitive Guide. O'Reilly. ISBN 978-1-491-99065-0.
Adam Bellemare. (2020) Building Event-Driven Microservices. O'Reilly. ISBN 978-1-492-05789-5.