Bookmark this page

Defining Data Formats and Structures

Objectives

After completing this section, you should be able to define data contracts and integrate schema registries.

Defining Data Contracts

To send records to Kafka brokers, producers must choose a serializer for both the key and value record fields. Similarly, consumers must choose de-serializers to parse records.

The serializer-deserializer classes create a data type contract between consumers and producers.

Kafka provides serializer and deserializer implementations in the org.apache.kafka.common.serialization package, such as:

  • LongSerializer and LongDeserializer

  • IntegerSerializer and IntegerDeserializer

  • StringSerializer and StringDeserializer

You can create custom (de)serializer classes by implementing the org.apache.kafka.common.serialization.Serializer and org.apache.kafka.common.serialization.Deserializer interfaces. In addition, some Kafka components, such as Kafka Connect, define SerDe classes. A SerDe class is a wrapper class that contains both a serializer and deserializer.

The key and value (de)serializer architecture has a number of benefits, such as:

  • Developers can send custom classes and binary values as the record payload.

  • Developers can implement custom (de)serializers for higher efficiency.

  • Enforcing a typed contract means a type-safe environment, which results in fewer runtime errors.

  • Kafka brokers do not parse records in any way. This enables high throughput and scalability.

Note

Sending large binary values in Kafka records can have a performance impact.

Kafka brokers reject records larger than 1MB by default.

As the number of producers and consumers raises, however, a SerDe can become difficult to manage:

  • Each consumer must keep track of the (de)serializer contract with its producers. Changing the producer contract implies changing the consumer contracts.

  • Versioning of (de)serializer classes can become difficult.

  • Record validation can become difficult.

The producers and consumers must have the same understanding about the meaning of the data. A data contract is a formal agreement between a producer and a consumer that describes the exchanged data. For example, if a producer sends a JSON record, then a consumer that expects a numerical record value fails to parse such a record.

Implementing Custom (De)Serializer Classes

Consider the following simplified NewAccountEvent class:

public class NewAccountEvent implements Serializable {
    private int accountId;
    // getters, setters, and constructors omitted for brevity
}

You can implement a custom NewAccountEventSerializer serializer class:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; 1
import com.redhat.model.NewAccountEvent;

import org.apache.kafka.common.serialization.Serializer;


public class NewAccountEventSerializer implements Serializer<NewAccountEvent> { 2

    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public byte[] serialize(String topic, NewAccountEvent data) {
        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
            return null;
        }
    }
}

1

Note dependency on the Jackson library. Consequently, the producer that uses this serializer uses the JSON format.

2

This class implements the Kafka Serializer interface.

When the producer uses the preceding serializers, kafka receives JSON events. For example, a NewAccountEvent instance with the accountId set to 1 is {"accountId":1}. Note that a serializer can be generic.

The accompanying custom NewAccountEventDeserializer class implementation is similar:

// Import statements omitted for brevity

public class NewAccountEventDeserializer implements Deserializer<NewAccountEvent> {

    private ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public NewAccountEvent deserialize(String topic, byte[] data) {
        try {
            return objectMapper.readValue(new String(data, "UTF-8"),
                NewAccountEvent.class);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

Note that the deserializer must be class specific.

Both the producer and consumer applications must contain references to:

  • The (de)serializer implementation classes.

  • The model classes, such as NewAccountEvent in the preceding example.

Commonly, issues arise when the model classes change. The deserializer classes might use outdated models. Similarly, the producer application might use the latest version of the model classes when a subset of consumers were not updated to use the latest class.

Implementing Custom (De)Serializer Classes with Quarkus

The Quarkus project enables you to reduce the boilerplate (de)serialization code with the ObjectMapperSerializer and ObjectMapperDeserializer classes in the io.quarkus.kafka.client.serialization package.

For example, the following is a NewAccountEventDeserializer deserializer implementation:

import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class NewAccountEventDeserializer extends ObjectMapperDeserializer<NewAccountEvent> {
    public NewAccountEventDeserializer() {
        super(NewAccountEvent.class);
    }
}

Because serializer classes can be generic, you can use the io.quarkus.kafka.client.serialization.ObjectMapperSerializer class directly.

Introducing Apache Avro

Apache Avro is a data serialization system that attempts to resolve the tight coupling between models and (de)serialization while preserving type-safety.

Apache Avro enables you to create self describing data by decoupling the format of the data from the content. Avro achieves this by defining schemas that describe data format.

Consider the following CustomClass.avsc schema:

{
    "namespace": "com.redhat.training", 1
    "type": "record", 2
    "name": "CustomClass", 3
    "fields": [
        {"name": "firstName", "type": "string"}, 4
        {"name": "middleNames", "type": ["null", "string"], "default": "null"}, 5
        {"name": "lastName", "type": "string"}
    ]
}

1

A name qualifier, such as a Java package.

2

An Avro data type.

3

Name of this data structure.

4

Defining a firstName String parameter.

5

Defining an optional middleNames String parameter. Is null if not provided.

Apache Avro provides a number of primitive data types, such as int or string. Additionally, you can use a number of default complex data types, such as record, enum, or array. Consult the Apache Avro schema specification document in the reference section for more information.

Avro enables developers to generate classes based on the schema files. For example, Avro generates a com.redhat.training.CustomClass from the preceding example.

Finally, Avro provides the binary and JSON encodings for data (de)serialization. The binary serialization encoding tends to be faster, and results in less data transfer. However, the JSON format is useful for debugging. Additionally, some applications can benefit from consuming the JSON data directly.

Defining Schema Registries

A schema registry is a system that stores schema artifacts. The Apache Avro format decouples the data content from the data schema. Kafka producers, however, must provide a schema for every record sent to Kafka brokers. Similarly, Kafka consumers must read a schema to deserialize every record received from Kafka brokers. Because the schemas tend to be larger than the actual record data, including the schema with each message is impractical.

Schema registry enables you to decouple the record schemas from the producers and consumers. Red Hat enables developers to use the Service Registry product, which is based on the Apicurio Registry project.

Administrators can deploy Service Registry to the Red Hat OpenShift Container Platform and use it for storing various artifacts, such as:

  • Apache Avro

  • Google protocol buffers (protobuf)

  • JSON Schema

  • Kafka Connect schema

Service Registry enables developers to:

  • Validate data based on schemas.

    For example, when a producer attempts to send invalid data, due to a partial network partition, the deserializer can refuse to send the record.

  • Version record schemas.

    Developers can create backwards-compatible record schemas. Because the Service Registry versions record schemas, consumers can choose which schema to use. Consequently, a subset of consumers can use different record schema from other consumers.

  • Validate new schema compatibility.

    Service Registry contains rules for forward and backward compatibility checking that control each schema artifact. Developers can optionally turn on the compatibility checks.

  • Manage schemas programmatically.

    Service Registry provides the io.apicurio:apicurio-registry-maven-plugin Maven plug in that enables developers to programmatically download and upload schema files.

(De)serializing Avro Records with Service Registry and Quarkus

In the Kafka ecosystem, the producers upload the schema used for deserialization of the record to the Service Registry. Consumers then look up the schema based on a look up schema strategy, and de-serialize the message.

Figure 2.15: Service Registry SerDe workflow

In a Quarkus project, the io.quarkus:quarkus-apicurio-registry-avro dependency provides the Avro (de)serializer classes and code generation functionality. Quarkus transforms Avro schema files in the src/main/avro or src/test/avro directories into Java classes. Execute the compile Maven target to generate Java classes from Avro schemas in the target/generated-sources/avsc/ directory.

Quarkus applications that use Service Registry commonly define the following Service Registry properties:

Registry URL

Configure the registry URL by using the apicurio.registry.url property. For example:

mp.messaging.connector.smallrye-kafka.apicurio.registry.url = http://service-registry.stage.com/apis/registry/v2
(De)serializer Implementation (optional)

The Apicurio Registry provides the io.apicurio.registry.serde.avro.AvroKafkaDeserializer and io.apicurio.registry.serde.avro.AvroKafkaDeserializer classes. Use the standard value.serializer and value.deserializer properties to set the (de)serializer classes.

For example, to set the serializer for the example channel, use the following property:

mp.messaging.outgoing.example.value.serializer = io.apicurio.registry.utils.serde.AvroKafkaSerializer
Schema Lookup Strategy (optional)

Producers and consumers can look up what schema is registered with each record. Service Registry provides several strategies to resolve the schema-record mapping. Consult the Apicurio Lookup Strategies document in the references section for more information about each strategy.

Revision: ad482-1.8-cc2ae1c