Bookmark this page

Implementing Stateful Event Processing

Objectives

After completing this section, you should be able to apply stateful transformations to event streams.

Understanding Stateful Processing

In stateless processing, the element that you transform has no dependencies on previous operations. For example, if you want filter a stream of numbers, you can do it without storing any data in the processing. In stateful processing, however, you must consider previous operations. The previous operations form a state, where data generated as a result of previous transformations is stored.

For example, to sum a series of numbers, the sum function might use an accumulator object to store the current sum. An accumulator is a helping data structure where you save information that you need while doing a computation. In this example, the accumulator is the state.

In event-driven microservice applications, stateful event processing is achieved by subscribing the microservices to the event streams required to execute the microservice business logic. Subscribing to the streams allows accessing the data necessary to perform stateful operations in the application.

Understanding Stateful Processing in Kafka Streams

You can use Kafka Streams to process event streams in EDAs. Stateful processing can be achieved without Kafka Streams, but if your applications are written in Java or any other JVM language, then Kafka Streams enables you to manage the state easily. There are also community libraries for non-JVM programming languages.

Capturing State With Kafka Streams Tables

You can use both KStream and KTable to perform stateful operations. A KStream contains all the records for a specific key and a KTable only contains the latest record for a specific key. Kafka Streams has another abstraction, a GlobalKTable, which is similar to the KTable object.

KTable

A KTable object contains records processed by a Kafka Streams node. If you parallelize your Kafka Streams application by deploying multiple application nodes, then each node contains a table from the records that the node receives. Consequently, the table in each node will not read the records from all partitions.

GlobalKTable

A GlobalKTable object contains records from all topic partitions. Therefore, if you deploy multiple application nodes, GlobalKTable replicates all records from all partitions on each node. Kafka Streams ensures that the GlobalKTable contains all the records before the stream processing starts.

Managing State

To perform stateful transformations, Kafka Streams needs a place to save data related to the transformations. This is done in a state store, which you can query to retrieve the data saved during transformations.

Stores

Stores can be in-memory or persistent, depending on your requirements. A persistent state store allows Kafka Streams to recover transformations from the point where they were left in the case of system failure.

Persistent stores can be saved into the Kafka platform and are fault-tolerant. When you work with the DSL by executing joins and aggregations, Kafka Streams automatically takes care of managing the state as needed.

Interactive Queries

Sometimes, you might need to interact with the state store directly. For this purpose, Kafka Streams allows you to use interactive queries.

You can query the local state store (the store of every application instance that you are using) by using the DSL. If you want to query all the local state stores of different application instances, then you should add an RPC layer first.

To retrieve data from a store, you should materialize the stream or table. This way, data is retained in the store and you can access it. You can only access the store after the start method has been called on the KafkaStreams class.

The following snippet shows how to retrieve data from a store.

KafkaStreams streams = ...output omitted...;
...output omitted...
streams.start();

ReadOnlyKeyValueStore<Integer, Long> store = streams
            .store(StoreQueryParameters.fromNameAndType( 1
                "numbers", 2
                QueryableStoreTypes.keyValueStore() 3
            ));

store.all()
     .forEachRemaining(row -> {
          System.out.println(row.key + " - " + row.value); 4
       });

1

The store method allows you to retrieve a store by name and type.

2

Name of the store that you provided when you materialized the stream or table.

3

The type of store.

4

For every element, print key and value.

Record Caches

When aggregations are performed, you can store their result in the state store. Because writing to a persistent storage is expensive, Kafka Streams provides a record cache that you can enable.

With the cache, writes to the persistent storage are prevented by establishing a commit interval. Kafka Streams waits for the commit interval to persist the data.

Describing Stateful Transformations

The Kafka Streams DSL provides several options to perform transformations on streams and tables. Some of these transformations depend on the concept of time. A window is a period of time where records should be considered to avoid performance issues.

Introducing Joins

The concept of joining in Kafka Streams is similar to joining relational database tables. Given two KStream or KTable instances, a join operation merges the records into one by using a common field in both. The common field is usually the key of the record. Joining is supported among KStream, KTable, and GlobalKTable objects.

Important

The Apache Kafka ecosystem does not aim to eliminate a relational database from your projects. You can replace a database with Apache Kafka in specific situations and applications. This, however, requires deep knowledge of distributed systems and relational algebra.

Some joins also require partitions of topics involved in the join to be co-partitioned. This means that:

  • The streams participating in the join must have the same number of stream partitions. Consequently, the stream source topics must have the same number of partitions.

  • All producers writing to the topics must have the same partition strategy, so records with the same key are sent to the same partition number. The default producer partition strategy conforms to this requirement.

Like in relational databases, joins can be inner, left or outer. For simplicity, only inner joins are considered in this section.

KStream - KStream

In this kind of operation, the join is triggered every time that a new element is appended to any of the streams. The window restriction is necessary for efficient joins. An addition to any of the streams triggers a join operation. If you specify no window, then the application must process the full stream every time a new element is appended. Consequently, windowing influences the application performance.

The result is another KStream object. Co-partitioning is required.

For example, consider two streams: visits, which holds the information of visits to a web page, and clicks, which contains the information for clicks in that website. The key for both streams is the user ID. In a specific period of time, you might want to relate the visits and clicks performed by a user. Consider an interval of 5 seconds, the following diagram illustrates the visits and clicks timeline.

Figure 4.1: KStream - KStream join timeline

The letters (A, B, and C) represent the key (the user identifier, in the example) of the records as they arrive to the timeline.

  • At t=0, a new visit of A is received.

  • At t=1 a new visit of B is received.

  • At t=2, a new visit of C is received. A new click of B is received. Because B has a visit, a join is triggered and both records are retrieved together.

  • At t=3, another visit of C is received. Now, there are two visits for the same key. If this was a KTable, only the last one would be considered.

  • At t=4, a click of C is received. There are two visits of C, so a join for every visit is triggered.

  • At t=6, a click of A is received. Although there is a visit record for A, the click is out of the time window, so no join for A is triggered.

    A standard implementation with the Kafka Streams library in Java:

    KStream<String, String> left = ...output omitted...; 1
    KStream<String, String> right = ...output omitted...; 2
    
    KStream<String, String> joined = left.join(right, 3
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, 4
        JoinWindows.of(Duration.ofSeconds(5)), 5
        Joined.with(
          Serdes.String(), 6
          Serdes.String(), 7
          Serdes.String()) 8
      );

    1

    Creation of the left side of the join. In the previous example, the visits.

    2

    Creation of the right side of the join. In the previous example, the clicks.

    3

    The left stream is joined with the right stream.

    4

    The function that is applied when the join occurs. In this example, a new string that contains the value of both records is created.

    5

    The window time.

    6

    The serde for the key. Because the key should be equal in both streams, only one serde is provided.

    7

    The serde for the left stream value.

    8

    The serde for the right stream value.

KTable - KTable

In this kind of operation, the join is triggered every time that a key is updated in any of the tables. Because KTable holds the last value for a specific key, when the join occurs, the last value of every key is used.

The result is another KTable object. No time window is applied. Data co-partitioning is required.

For example, consider two tables: users, which contains the information for every user, and profiles, which contains the public data of every user. Both tables use the user identifier as a key. A KTable is used for both because you are only concerned with the latest update.

Figure 4.2: KTable - KTable join timeline
  • At t=0, the user A is updated.

  • At t=1, the user B is updated.

  • At t=2, the profile of user B is updated. Because the information of user B had been updated, a join is triggered for both records.

  • At t=3, the user A is updated again. The name Sus is replaced by Susan. At the same time, the profile of user C is updated.

  • At t=4, the profile of user A is updated. A join is triggered. The last version of both tables for user A is retrieved.

  • At t=6, the user C is updated. A new join is triggered.

    The following is a standard implementation with the Kafka Streams library in Java:

    KTable<String, Long> left = ...output omitted..;
    KTable<String, Double> right = ...output omitted...;
    
    KTable<String, String> joined = left.join(right,
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue
      );
KStream - KTable

In the previous operations, the join was triggered every time a change happened in any of the streams (either left or right side). However, when joining a KStream and a KTable, only appends to the KStream (the left side) trigger a join operation. The table (the right operand) just receives updates that are used whenever a new matching key arrives to the stream.

The result is another KStream object. No time window is applied. Data co-partitioning is required.

For example, consider two topics users and orders. Both use the user identifier as a key. The users topic is mapped to a KTable because we just care about the last update of every user. The orders topic is mapped to a KStream because we must consider all orders, not just the last order for every user.

Figure 4.3: KStream - KTable join timeline
  • At t=0, user A is updated.

  • At t=1, user B is updated.

  • At t=2, a new order for user A arrives. A join is triggered.

  • At t=3, user B is updated. The email sample1 is replaced by newmail.

  • At t=4, a new order for user B arrives. A join is triggered, and the latest user information for B is retrieved.

  • At t=5, a new order for user C arrives.

  • At t=6, user C is updated. Although an order has been received previously, a join operation is not triggered because only changes to the KStream are considered.

    The following is a standard implementation with the Kafka Streams library in Java:

    KStream<String, Long> left = ...output omitted..;
    KTable<String, Double> right = ...output omitted...;
    
    KStream<String, String> joined = left.join(right,
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
        Joined.keySerde(Serdes.String()) 1
          .withValueSerde(Serdes.Long()) 2
      );

    1

    Serde for the key

    2

    Serde for the left value

KStream - GlobalKTable

This kind of join is similar to the KStream - KTable join. However, the GlobalKTable holds all the values from the topic before the stream processing starts.

The result is another KStream object. No time window is applied. Data co-partitioning is not required.

For example, consider the example from the KStream - KTable join, users and orders. The users topic is now mapped to a GlobalKTable and contains all the values at t=0.

Figure 4.4: KStream - GlobalKTable join timeline
  • At t=0, the users GlobalKTable contains all the elements from the topic.

  • At t=2, an order for the A user arrives. Because A is already in the table, a join is triggered.

  • At t=3, user B is updated. The old email sample1 is replaced by newmail.

  • At t=4, an order for user B arrives. A join is triggered.

  • At t=5, a new order for user C arrives. Because C is already in the table, a join is triggered.

    A standard implementation with the Kafka Streams library in Java:

    KStream<String, Long> left = ...output omitted...;
    GlobalKTable<Integer, Double> right = ...output omitted...;
    
    KStream<String, String> joined = left.join(right,
        (leftKey, leftValue) -> leftKey, 1
        (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue
      );

    1

    For this kind of join, you can choose to use the key or other field from the record to perform the join. In this example, the key is used.

Introducing Aggregations

Aggregations allow you to perform different transformations on grouped records. When you group records of a KStream or KTable, you are creating blocks of records. Records within a block are related based on some criteria that you provide.

For example, in a stream that contains words, you might choose to group words with the same length. Consider the stream ["red", "egg", "hat", "black", "blue", "green"]. If you group by word length, the result would be [3 → [red, egg, hat]], [4 → [blue]], [5 → [black, green]]

KStream<String, String> wordsStream = ...output omitted...; 1

KGroupedStream<Integer, String> wordsGrouped = wordsStream
    .groupBy((key, word) -> word.length()); 2

1

A stream of words is created.

2

The stream is grouped by word length. The resulting object is KGroupedStream<Integer, String> because words are grouped by an integer (the length).

Grouped streams and grouped tables are represented by KGroupedStream and KGroupedTable objects (respectively). For every group, you can perform operations such as summing up all values or counting the number of records.

Aggregate

The aggregate operation allows you to update an accumulator object for every element in the grouped stream. An accumulator is a custom object that you provide to the aggregate method and you can update during processing.

The result is a KTable. You can use windowing to aggregate elements in a specific time range.

For example, consider a users stream where you want to find the average age of the users. Conceptually, you would have to sum up all the ages and divide the result by the number of users. With the aggregate operation you have an accumulator object that contains the sum of the ages and the count number of the users. For every element in the grouped stream, you update the accumulator. Consider the following Accumulator class.

class Accumulator {
    int sum = 0;
    int count = 0;

    void addAge(int age) {
        sum += age;
        count++;
    }
}

In the following snippet, the Accumulator class is used to find the age sum.

KStream<Integer, User> usersStream = getUsersStream();

var grouped = usersStream.groupByKey().aggregate( 1
            () -> new Accumulator(), 2
            (key, user, accumulator) -> accumulator.addAge(user.getAge()), 3
            Materialized.<Integer, Accumulator,
                        KeyValueStore<Bytes, byte[]>>as("users-age") 4
                    .withKeySerde(intSerde)
                    .withValueSerde(accumulatorSerde)
        );

1

Users are grouped by key. In this example, the key is the user ID.

2

The initial accumulator is created.

3

For every user, we update the accumulator, adding the age and increasing the counter.

4

The state store where the data is saved. The Integer value is for the key.

At the end of the operation, the accumulator contains the sum of the ages and the total number of users processed.

Reduce

The reduce method is a specific case of the aggregate operation. The reduce method iterates over the elements of the KStream or KTable and for every element, it returns a new element of the same type. The new element is passed to the next iteration, and at the end only one resulting value remains.

The result is a KTable. You can use windowing to reduce elements in a specific time range.

For example, consider a stream of numbers where you want to find the maximum number. For every element in the stream, you verify if the element is greater than the current maximum. If it is, then you return the element, otherwise, you return the current maximum. The following snippet shows a possible implementation.

KStream<String, Integer> stream = ...output omitted...;

stream.groupByKey()
       .reduce(
            (currentMax, element) -> {
                if (element > currentMax) {
                    return element;
                }

                return currentMax;
            }
        );
Count

The count operation allows you to count the number of elements in a grouped KStream or KTable. The result is a KTable. You can use windowing to count elements in a specific time range.

For example, in the previous example of the word grouping, you can count the number of words with the same length.

KStream<String, String> wordsStream = ...output omitted...;

KTable<String, Long> wordsCount = wordsStream
    .groupBy((key, word) -> word.length())
    .count();
Revision: ad482-1.8-cc2ae1c