Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
After completing this section, you should be able to define data contracts and integrate schema registries.
To send records to Kafka brokers, producers must choose a serializer for both the key and value record fields. Similarly, consumers must choose de-serializers to parse records.
The serializer-deserializer classes create a data type contract between consumers and producers.
Kafka provides serializer and deserializer implementations in the org.apache.kafka.common.serialization package, such as:
LongSerializerandLongDeserializerIntegerSerializerandIntegerDeserializerStringSerializerandStringDeserializer
You can create custom (de)serializer classes by implementing the org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer interfaces.
In addition, some Kafka components, such as Kafka Connect, define SerDe classes. A SerDe class is a wrapper class that contains both a serializer and deserializer.
The key and value (de)serializer architecture has a number of benefits, such as:
Developers can send custom classes and binary values as the record payload.
Developers can implement custom (de)serializers for higher efficiency.
Enforcing a typed contract means a type-safe environment, which results in fewer runtime errors.
Kafka brokers do not parse records in any way. This enables high throughput and scalability.
Note
Sending large binary values in Kafka records can have a performance impact.
Kafka brokers reject records larger than 1MB by default.
As the number of producers and consumers raises, however, a SerDe can become difficult to manage:
Each consumer must keep track of the (de)serializer contract with its producers. Changing the producer contract implies changing the consumer contracts.
Versioning of (de)serializer classes can become difficult.
Record validation can become difficult.
The producers and consumers must have the same understanding about the meaning of the data. A data contract is a formal agreement between a producer and a consumer that describes the exchanged data. For example, if a producer sends a JSON record, then a consumer that expects a numerical record value fails to parse such a record.
Consider the following simplified NewAccountEvent class:
public class NewAccountEvent implements Serializable {
private int accountId;
// getters, setters, and constructors omitted for brevity
}You can implement a custom NewAccountEventSerializer serializer class:
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper;import com.redhat.model.NewAccountEvent; import org.apache.kafka.common.serialization.Serializer; public class NewAccountEventSerializer implements Serializer<NewAccountEvent> {
private ObjectMapper objectMapper = new ObjectMapper(); @Override public byte[] serialize(String topic, NewAccountEvent data) { try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { e.printStackTrace(); return null; } } }
Note dependency on the | |
This class implements the Kafka |
When the producer uses the preceding serializers, kafka receives JSON events.
For example, a NewAccountEvent instance with the accountId set to 1 is {"accountId":1}.
Note that a serializer can be generic.
The accompanying custom NewAccountEventDeserializer class implementation is similar:
// Import statements omitted for brevity
public class NewAccountEventDeserializer implements Deserializer<NewAccountEvent> {
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public NewAccountEvent deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data, "UTF-8"),
NewAccountEvent.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}Note that the deserializer must be class specific.
Both the producer and consumer applications must contain references to:
The (de)serializer implementation classes.
The model classes, such as
NewAccountEventin the preceding example.
Commonly, issues arise when the model classes change. The deserializer classes might use outdated models. Similarly, the producer application might use the latest version of the model classes when a subset of consumers were not updated to use the latest class.
The Quarkus project enables you to reduce the boilerplate (de)serialization code with the ObjectMapperSerializer and ObjectMapperDeserializer classes in the io.quarkus.kafka.client.serialization package.
For example, the following is a NewAccountEventDeserializer deserializer implementation:
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class NewAccountEventDeserializer extends ObjectMapperDeserializer<NewAccountEvent> {
public NewAccountEventDeserializer() {
super(NewAccountEvent.class);
}
}Because serializer classes can be generic, you can use the io.quarkus.kafka.client.serialization.ObjectMapperSerializer class directly.
Apache Avro is a data serialization system that attempts to resolve the tight coupling between models and (de)serialization while preserving type-safety.
Apache Avro enables you to create self describing data by decoupling the format of the data from the content. Avro achieves this by defining schemas that describe data format.
Consider the following CustomClass.avsc schema:
{
"namespace": "com.redhat.training",
"type": "record",
"name": "CustomClass",
"fields": [
{"name": "firstName", "type": "string"},
{"name": "middleNames", "type": ["null", "string"], "default": "null"},
{"name": "lastName", "type": "string"}
]
}A name qualifier, such as a Java package. | |
An Avro data type. | |
Name of this data structure. | |
Defining a | |
Defining an optional |
Apache Avro provides a number of primitive data types, such as int or string.
Additionally, you can use a number of default complex data types, such as record, enum, or array.
Consult the Apache Avro schema specification document in the reference section for more information.
Avro enables developers to generate classes based on the schema files.
For example, Avro generates a com.redhat.training.CustomClass from the preceding example.
Finally, Avro provides the binary and JSON encodings for data (de)serialization. The binary serialization encoding tends to be faster, and results in less data transfer. However, the JSON format is useful for debugging. Additionally, some applications can benefit from consuming the JSON data directly.
A schema registry is a system that stores schema artifacts. The Apache Avro format decouples the data content from the data schema. Kafka producers, however, must provide a schema for every record sent to Kafka brokers. Similarly, Kafka consumers must read a schema to deserialize every record received from Kafka brokers. Because the schemas tend to be larger than the actual record data, including the schema with each message is impractical.
Schema registry enables you to decouple the record schemas from the producers and consumers. Red Hat enables developers to use the Service Registry product, which is based on the Apicurio Registry project.
Administrators can deploy Service Registry to the Red Hat OpenShift Container Platform and use it for storing various artifacts, such as:
Apache Avro
Google protocol buffers (protobuf)
JSON Schema
Kafka Connect schema
Service Registry enables developers to:
Validate data based on schemas.
For example, when a producer attempts to send invalid data, due to a partial network partition, the deserializer can refuse to send the record.
Version record schemas.
Developers can create backwards-compatible record schemas. Because the Service Registry versions record schemas, consumers can choose which schema to use. Consequently, a subset of consumers can use different record schema from other consumers.
Validate new schema compatibility.
Service Registry contains rules for forward and backward compatibility checking that control each schema artifact. Developers can optionally turn on the compatibility checks.
Manage schemas programmatically.
Service Registry provides the
io.apicurio:apicurio-registry-maven-pluginMaven plug in that enables developers to programmatically download and upload schema files.
In the Kafka ecosystem, the producers upload the schema used for deserialization of the record to the Service Registry. Consumers then look up the schema based on a look up schema strategy, and de-serialize the message.
In a Quarkus project, the io.quarkus:quarkus-apicurio-registry-avro dependency provides the Avro (de)serializer classes and code generation functionality.
Quarkus transforms Avro schema files in the src/main/avro or src/test/avro directories into Java classes.
Execute the compile Maven target to generate Java classes from Avro schemas in the target/generated-sources/avsc/ directory.
Quarkus applications that use Service Registry commonly define the following Service Registry properties:
- Registry URL
Configure the registry URL by using the
apicurio.registry.urlproperty. For example:mp.messaging.connector.smallrye-kafka.apicurio.registry.url = http://service-registry.stage.com/apis/registry/v2
- (De)serializer Implementation (optional)
The Apicurio Registry provides the
io.apicurio.registry.serde.avro.AvroKafkaDeserializerandio.apicurio.registry.serde.avro.AvroKafkaDeserializerclasses. Use the standardvalue.serializerandvalue.deserializerproperties to set the (de)serializer classes.For example, to set the serializer for the
examplechannel, use the following property:mp.messaging.outgoing.example.value.serializer = io.apicurio.registry.utils.serde.AvroKafkaSerializer
- Schema Lookup Strategy (optional)
Producers and consumers can look up what schema is registered with each record. Service Registry provides several strategies to resolve the schema-record mapping. Consult the Apicurio Lookup Strategies document in the references section for more information about each strategy.
References
For more information about the Service Registry, refer to the Service Registry User Guide documentation at https://access.redhat.com/documentation/en-us/red_hat_integration/2021.q3/html-single/service_registry_user_guide/index#managing-registry-artifacts-maven