Bookmark this page

Using Basic Kafka Streams Objects

Objectives

After completing this section, you should be able to use KTables, KStreams and other DSL objects to manage data streams.

Listing Kafka Streams APIs

Apache Kafka Streams uses stream processing topologies to process data streams. These topologies are made of nodes called processors. Each processor represents an operation to apply to the stream.

There are two ways to build topologies in Kafka Streams.

Kafka Streams DSL

A domain specific language, intended to speed up and simplify the development of streaming applications. The DSL provides abstractions that represent streams, tables, and common data transformations, such as map, reduce, and groupBy. The DSL also hides the low-level details of building processors and topologies.

Processor API

A low-level API to define processors and topologies. The processor API might require more development effort than the DSL. However, the processor API provides finer control and flexibility to handle use cases and operations not covered by the DSL. This API requires a clear understanding of the Kafka Streams architecture.

This course covers the use of the DSL. The Processor API is out of the scope of the course.

Serializing and Deserializing Streams

Kafka Streams utilizes the Kafka client library. This means that you must indicate how to serialize and deserialize records to consume and produce them in Kafka topics, just like you would with the Kafka client library.

Kafka applies the notion of SerDe. A serde is an object that implements both serialization and deserialization capabilities.

In the Kafka client library, this abstraction is defined by the org.apache.kafka.common.serialization.Serde interface. Kafka Streams requires you to implement the Serde interface for your object (de)serialization.

(De)Serializing Basic Types

The org.apache.kafka.common.serialization package provides concrete serializers for primitive and basic types, such as IntegerSerde or StringSerde.

(De)Serializing Complex Objects

There are multiple ways to (de)serialize complex objects, such as the following methods:

  • Use the Serdes#serdeFrom method to create JSON serdes.

  • Use Apache Avro to define serdes.

  • Implement the Serde interface with the logic to serialize and deserialize your messages.

  • Use the ObjectMapperSerde class, which is included in the Quarkus Kafka client, to create a JSON serde. ObjectMapperSerde relies on the Jackson library for JSON (de)serialization.

    import io.quarkus.kafka.client.serialization.ObjectMapperSerde;
    
    // ...class omitted...
    
    ObjectMapperSerde<YourClass> yourSerde = new ObjectMapperSerde<>(
        YourClass.class
    );

Reading Streaming Data from Kafka

The Kafka Streams DSL provides two main abstractions to handle streaming data: KStream and KTable. You can interpret a set or records as a stream or as a table, you can map a stream to a table, and you can map a table to a stream. This is known as the table-stream duality.

Note

Stream and table abstractions, such as KStream and KTable, are only available in the DSL.

The DSL also facilitates the creation of these objects with the StreamsBuilder class. This class allows you to read data from a Kafka topic as a stream or as a table, hiding the details of how processors and topologies are created.

To build a topology, start by creating a StreamsBuilder object.

import org.apache.kafka.streams.StreamsBuilder;

// ...class omitted...

StreamsBuilder builder = new StreamsBuilder();

KStream

The KStream interface represents a data stream. Use this object to read data from a Kafka topic as a stream, to apply transformations to the stream, and to send the result to another Kafka topic.

To create a new KStream object that reads from a Kafka topic, call the StreamsBuilder#stream method. Specify the topic name, and the key and value serdes.

import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.common.serialization.Serdes;
import io.quarkus.kafka.client.serialization.ObjectMapperSerde;

// ...class omitted...

Serde<String> keySerde = Serdes.String();
ObjectMapperSerde<Price> valueSerde = new ObjectMapperSerde<>(Price.class);

KStream prices = builder.stream(
    "prices",
    Consumed.with(keySerde, valueSerde)
);

The resulting KStream instance creates a source processor in the application topology graph. When reading a topic, Kafka Streams distributes the Kafka topic partitions as stream partitions across your application replicas to achieve scalability. Consequently, a KStream object running on a particular application instance only keeps track of data from one or more stream partitions.

Note

A stream partition is a Kafka Streams concept that represents a subset of the data. Each stream partition maps to a Kafka topic partition.

After you have created a stream, you can start specifying stream transformations.

KTable

KTable objects allow you to interpret a data stream as the change log of a table. From this perspective, each record read from the stream represents a change in the table. Kafka Streams interprets the stream as follows:

  • The first value of a given key inserts the record in the table.

  • Subsequent non-null values of the same key update the record.

  • A null value of the same key deletes the record.

Tables in Kafka Streams are particularly useful for stateful operations, for example:

  • Perform a query, such as listing all records from orders where product.id is 1.

  • Query the current value of streaming records, such as looking up the current user rating for a product.

  • Store the result of a cumulative function, such as querying the delta value from the previous stock tick in a stock market application to determine if the stock price is rising or falling.

  • Enrich a stream, such as joining a stream of vehicle positions with a vehicle profiles table.

KTable objects manage an internal state store. Consequently, every node of a Kafka Streams application that uses a table contains a local state store. The same applies to GlobalKTable objects, which are covered later in this section.

Note

Stateful operations on KStream objects also use internal state stores, such as join.

With other messaging solutions, you might have to use an external database, or additional logic, to implement the preceding examples. With a KTable object, you can easily perform such operations in a streaming fashion, and benefit from built-in fault-tolerance capabilities. These state stores are fault-tolerant, because Kafka Streams replicates them into Kafka topics.

Note

You can implement similar functionality with the Processor API, by creating and managing state stores. This gives you more power in configuring internal storage details, such as change logs, and fault tolerance, but might require more maintenance.

To read a Kafka topic as a table, use the StreamsBuilder#table method. Specify the topic name, the key and value serdes, and, optionally, the name of the table store.

import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;

// ...class and serdes omitted...

KTable pricesTable = builder.table(
    "prices",
    Consumed.with(keySerde, valueSerde),
    Materialized.as("pricesTableStore")
        .withKeySerde(keySerde)
        .withValueSerde(valueSerde)
)

The resulting KTable instance creates a source processor in the application topology graph. Kafka Streams also distributes partitions when reading a topic into a table. Consequently, a KTable object running on a particular application instance only keeps track of a subset of all the stream partitions.

The third argument specifies the name of the table store and the custom serdes configurations to materialize the table to the store. You must use this argument if you intend to use the interactive queries feature to query the table. If you do not specify a store name, then Kafka Streams assigns an internal, non-queryable store to the table. Interactive queries are covered later in the course.

Important

Note that, by removing the third argument, the KTable instance uses the default serdes configuration, which is Serdes.ByteArraySerde.class.getName(). This means a KTable uses ByteArray (de)serialization by default.

You can override this configuration by setting the serdes property parameters in the Kafka Streams default configuration. This is out of scope of this course.

GlobalKTable

Sometimes, you need every node of your application to have a full copy of the data, instead of a subset of the partitions. For example, you might want to do the following:

  • Expose a full view of a topic in all the nodes of a web application.

  • Join a stream and a table, such as a stream of user actions and a table of user profiles, without meeting co-partitioning requirements. Co-partitioning means that Kafka topics share the same partition count and strategy. This is a requirement for KTable stateful transformations.

For these cases, Kafka Streams provides the GlobalKTable interface. Unlike KTable, which partitions the stream across the application instances, GlobalKTable objects are not partitioned.

For example, assume that your application reads a topic as a table change log. The topic uses three partitions and the application runs on three nodes.

  • If you use a KTable object, then each node loads just one partition.

  • If you use a GlobalKTable object, then each node loads all three partitions.

Loading the full set of partitions on each node allows you to join records by non-key values, and makes join co-partitioning unnecessary. Joins are covered later in the course.

To create a new GlobalKTable object, use the StreamsBuilder#globalTable method.

import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;

// ...class and serdes omitted...

GlobalKTable pricesTable = builder.globalTable(
    "prices",
    Consumed.with(keySerde, valueSerde),
    Materialized.as("pricesGlobalTableStore")
        .withKeySerde(keySerde)
        .withValueSerde(valueSerde)
)

Note

Global tables, although more convenient for joins or complete snapshots of the application state, have performance implications. The network and storage consumption increases, because the global tables must read all partitions in a single application instance.

Transforming a KStream into a KTable

Kafka Streams allows you to transform a stream into a table. This can be useful to, for example, view the state of a data stream at a specific moment.

Transforming a KStream into a KTable is a two-step process.

  • First, group the stream records by key. This transformation returns a KGroupedStream object.

  • Next, apply one of the aggregations available in the KGroupedStream object, such as count or reduce. The aggregation returns a KTable.

Figure 3.7: Transforming a KStream into a KTable by grouping and counting the records

Grouping a Stream

You can either group a stream by the Kafka topic key or a new key.

  • groupBy: Group the stream records by using a new key.

  • groupByKey: Group the stream records, by using the current key of the stream.

Both operations return a KGroupedStream. For example:

import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.Grouped;

// ...class and serdes omitted...

KGroupedStream<String, String> purchasesByProduct = stream.groupBy(
    (key, value) -> value.productId, 1
    Grouped.with( 2
      keySerde,
      valueSerde
    )
);

1

Function to select the key to group by. This example groups the stream by productId.

2

Provide serdes to perform the grouping.

Grouping a stream might cause data repartition. In particular, groupBy always repartitions data, due to the creation of a grouped stream with a new key.

Aggregating a Grouped Stream

After grouping the stream, you must aggregate the data to create a KTable object. The KGroupedStream interface provides the following aggregation methods:

  • count

  • reduce

  • aggregate

These aggregations return a KTable object. You can also use them in a windowed form, so the aggregation only computes records within a given time period.

A basic example of grouping and aggregating a stream into a table displays as follows:

import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Grouped;

// ...class and serdes omitted...

KTable<Integer, Long> positionsReportedByVehicleType = vehiclePositions 1
    .groupBy(
        (key, value) -> value.vehicleType, 2
        Grouped.with(Serdes.Integer(), vehicleSerde) 3
    )
    .count(); 4

1

Given a stream of vehicle positions.

2

Group the positions by vehicle type. Kafka Streams repartitions the data because records use a different key. A record in the resulting grouped stream might be assigned to a partition different from the partition the record used in the vehiclePositions stream.

3

Specify serdes required to produce the grouped stream.

4

Call KGroupedStream#count to get the number of positions reported by each vehicle type.

Transforming a KTable into a KStream

When representing a table as a stream, Kafka Streams interprets each table record as a stream record.

Figure 3.8: Transforming a KTable into a KStream

Use the KTable#toStream method to transform a table into a KTStream.

KStream positionCountStream = positionsReportedByVehicleType.toStream();

Writing a Stream into a Kafka Topic

To send the data from a stream to a Kafka topic, use the KStream#to method. This operation is also called materialization of the stream. Just like previous calls, the to method also require serdes to write the stream to the topic.

stream.to(
    "output-topic",
    Produced.with(Serdes.Integer(), Serdes.Double())
);

The Kafka topic must exist before writing the stream into the topic.

Revision: ad482-1.8-cc2ae1c