Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
After completing this section, you should be able to apply stateful transformations to event streams.
In stateless processing, the element that you transform has no dependencies on previous operations. For example, if you want filter a stream of numbers, you can do it without storing any data in the processing. In stateful processing, however, you must consider previous operations. The previous operations form a state, where data generated as a result of previous transformations is stored.
For example, to sum a series of numbers, the sum function might use an accumulator object to store the current sum.
An accumulator is a helping data structure where you save information that you need while doing a computation.
In this example, the accumulator is the state.
In event-driven microservice applications, stateful event processing is achieved by subscribing the microservices to the event streams required to execute the microservice business logic. Subscribing to the streams allows accessing the data necessary to perform stateful operations in the application.
You can use Kafka Streams to process event streams in EDAs. Stateful processing can be achieved without Kafka Streams, but if your applications are written in Java or any other JVM language, then Kafka Streams enables you to manage the state easily. There are also community libraries for non-JVM programming languages.
You can use both KStream and KTable to perform stateful operations.
A KStream contains all the records for a specific key and a KTable only contains the latest record for a specific key.
Kafka Streams has another abstraction, a GlobalKTable, which is similar to the KTable object.
- KTable
A
KTableobject contains records processed by a Kafka Streams node. If you parallelize your Kafka Streams application by deploying multiple application nodes, then each node contains a table from the records that the node receives. Consequently, the table in each node will not read the records from all partitions.- GlobalKTable
A
GlobalKTableobject contains records from all topic partitions. Therefore, if you deploy multiple application nodes,GlobalKTablereplicates all records from all partitions on each node. Kafka Streams ensures that theGlobalKTablecontains all the records before the stream processing starts.
To perform stateful transformations, Kafka Streams needs a place to save data related to the transformations. This is done in a state store, which you can query to retrieve the data saved during transformations.
- Stores
Stores can be in-memory or persistent, depending on your requirements. A persistent state store allows Kafka Streams to recover transformations from the point where they were left in the case of system failure.
Persistent stores can be saved into the Kafka platform and are fault-tolerant. When you work with the DSL by executing joins and aggregations, Kafka Streams automatically takes care of managing the state as needed.
- Interactive Queries
Sometimes, you might need to interact with the state store directly. For this purpose, Kafka Streams allows you to use interactive queries.
You can query the local state store (the store of every application instance that you are using) by using the DSL. If you want to query all the local state stores of different application instances, then you should add an RPC layer first.
To retrieve data from a store, you should materialize the stream or table. This way, data is retained in the store and you can access it. You can only access the store after the
startmethod has been called on theKafkaStreamsclass.The following snippet shows how to retrieve data from a store.
KafkaStreams streams = ...output omitted...; ...output omitted... streams.start(); ReadOnlyKeyValueStore<Integer, Long> store = streams
.store(StoreQueryParameters.fromNameAndType("numbers",QueryableStoreTypes.keyValueStore()));store.all() .forEachRemaining(row -> { System.out.println(row.key + " - " + row.value);});
- Record Caches
When aggregations are performed, you can store their result in the state store. Because writing to a persistent storage is expensive, Kafka Streams provides a record cache that you can enable.
With the cache, writes to the persistent storage are prevented by establishing a commit interval. Kafka Streams waits for the commit interval to persist the data.
The Kafka Streams DSL provides several options to perform transformations on streams and tables. Some of these transformations depend on the concept of time. A window is a period of time where records should be considered to avoid performance issues.
The concept of joining in Kafka Streams is similar to joining relational database tables.
Given two KStream or KTable instances, a join operation merges the records into one by using a common field in both.
The common field is usually the key of the record.
Joining is supported among KStream, KTable, and GlobalKTable objects.
Important
The Apache Kafka ecosystem does not aim to eliminate a relational database from your projects. You can replace a database with Apache Kafka in specific situations and applications. This, however, requires deep knowledge of distributed systems and relational algebra.
Some joins also require partitions of topics involved in the join to be co-partitioned. This means that:
The streams participating in the join must have the same number of stream partitions. Consequently, the stream source topics must have the same number of partitions.
All producers writing to the topics must have the same partition strategy, so records with the same key are sent to the same partition number. The default producer partition strategy conforms to this requirement.
Like in relational databases, joins can be inner, left or outer.
For simplicity, only inner joins are considered in this section.
- KStream - KStream
In this kind of operation, the join is triggered every time that a new element is appended to any of the streams. The window restriction is necessary for efficient joins. An addition to any of the streams triggers a join operation. If you specify no window, then the application must process the full stream every time a new element is appended. Consequently, windowing influences the application performance.
The result is another
KStreamobject. Co-partitioning is required.For example, consider two streams:
visits, which holds the information of visits to a web page, andclicks, which contains the information for clicks in that website. The key for both streams is the user ID. In a specific period of time, you might want to relate the visits and clicks performed by a user. Consider an interval of 5 seconds, the following diagram illustrates the visits and clicks timeline.Figure 4.1: KStream - KStream join timelineThe letters (
A,B, andC) represent the key (the user identifier, in the example) of the records as they arrive to the timeline.At
t=0, a new visit ofAis received.At
t=1a new visit ofBis received.At
t=2, a new visit ofCis received. A new click ofBis received. BecauseBhas a visit, a join is triggered and both records are retrieved together.At
t=3, another visit ofCis received. Now, there are two visits for the same key. If this was aKTable, only the last one would be considered.At
t=4, a click ofCis received. There are two visits ofC, so a join for every visit is triggered.At
t=6, a click ofAis received. Although there is a visit record forA, the click is out of the time window, so no join forAis triggered.A standard implementation with the Kafka Streams library in Java:
KStream<String, String> left = ...output omitted...;
KStream<String, String> right = ...output omitted...;
KStream<String, String> joined =
left.join(right,(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,JoinWindows.of(Duration.ofSeconds(5)),Joined.with( Serdes.String(),Serdes.String(),Serdes.String()));Creation of the left side of the join. In the previous example, the visits.
Creation of the right side of the join. In the previous example, the clicks.
The left stream is joined with the right stream.
The function that is applied when the join occurs. In this example, a new string that contains the value of both records is created.
The window time.
The serde for the key. Because the key should be equal in both streams, only one serde is provided.
The serde for the left stream value.
The serde for the right stream value.
- KTable - KTable
In this kind of operation, the join is triggered every time that a key is updated in any of the tables. Because
KTableholds the last value for a specific key, when the join occurs, the last value of every key is used.The result is another
KTableobject. No time window is applied. Data co-partitioning is required.For example, consider two tables:
users, which contains the information for every user, andprofiles, which contains the public data of every user. Both tables use the user identifier as a key. AKTableis used for both because you are only concerned with the latest update.Figure 4.2: KTable - KTable join timelineAt
t=0, the userAis updated.At
t=1, the userBis updated.At
t=2, the profile of userBis updated. Because the information of userBhad been updated, a join is triggered for both records.At
t=3, the userAis updated again. The nameSusis replaced bySusan. At the same time, the profile of userCis updated.At
t=4, the profile of userAis updated. A join is triggered. The last version of both tables for userAis retrieved.At
t=6, the userCis updated. A new join is triggered.The following is a standard implementation with the Kafka Streams library in Java:
KTable<String, Long> left = ...output omitted..; KTable<String, Double> right = ...output omitted...; KTable<String, String> joined =
left.join(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue );
- KStream - KTable
In the previous operations, the join was triggered every time a change happened in any of the streams (either left or right side). However, when joining a
KStreamand aKTable, only appends to theKStream(the left side) trigger a join operation. The table (the right operand) just receives updates that are used whenever a new matching key arrives to the stream.The result is another
KStreamobject. No time window is applied. Data co-partitioning is required.For example, consider two topics
usersandorders. Both use the user identifier as a key. Theuserstopic is mapped to aKTablebecause we just care about the last update of every user. Theorderstopic is mapped to aKStreambecause we must consider all orders, not just the last order for every user.Figure 4.3: KStream - KTable join timelineAt
t=0, userAis updated.At
t=1, userBis updated.At
t=2, a new order for userAarrives. A join is triggered.At
t=3, userBis updated. The emailsample1is replaced bynewmail.At
t=4, a new order for userBarrives. A join is triggered, and the latest user information forBis retrieved.At
t=5, a new order for userCarrives.At
t=6, userCis updated. Although an order has been received previously, a join operation is not triggered because only changes to theKStreamare considered.The following is a standard implementation with the Kafka Streams library in Java:
KStream<String, Long> left = ...output omitted..; KTable<String, Double> right = ...output omitted...; KStream<String, String> joined =
left.join(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, Joined.keySerde(Serdes.String()).withValueSerde(Serdes.Long()));
- KStream - GlobalKTable
This kind of join is similar to the
KStream - KTablejoin. However, theGlobalKTableholds all the values from the topic before the stream processing starts.The result is another
KStreamobject. No time window is applied. Data co-partitioning is not required.For example, consider the example from the
KStream - KTablejoin,usersandorders. Theuserstopic is now mapped to aGlobalKTableand contains all the values att=0.Figure 4.4: KStream - GlobalKTable join timelineAt
t=0, the usersGlobalKTablecontains all the elements from the topic.At
t=2, an order for theAuser arrives. BecauseAis already in the table, a join is triggered.At
t=3, userBis updated. The old emailsample1is replaced bynewmail.At
t=4, an order for userBarrives. A join is triggered.At
t=5, a new order for userCarrives. BecauseCis already in the table, a join is triggered.A standard implementation with the Kafka Streams library in Java:
KStream<String, Long> left = ...output omitted...; GlobalKTable<Integer, Double> right = ...output omitted...; KStream<String, String> joined =
left.join(right, (leftKey, leftValue) -> leftKey,(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue );
Aggregations allow you to perform different transformations on grouped records.
When you group records of a KStream or KTable, you are creating blocks of records.
Records within a block are related based on some criteria that you provide.
For example, in a stream that contains words, you might choose to group words with the same length.
Consider the stream ["red", "egg", "hat", "black", "blue", "green"].
If you group by word length, the result would be [3 → [red, egg, hat]], [4 → [blue]], [5 → [black, green]]
KStream<String, String> wordsStream = ...output omitted...;KGroupedStream<Integer, String> wordsGrouped = wordsStream
.groupBy((key, word) -> word.length());
A stream of words is created. | |
The stream is grouped by word length.
The resulting object is |
Grouped streams and grouped tables are represented by KGroupedStream and KGroupedTable objects (respectively).
For every group, you can perform operations such as summing up all values or counting the number of records.
- Aggregate
The
aggregateoperation allows you to update an accumulator object for every element in the grouped stream. An accumulator is a custom object that you provide to theaggregatemethod and you can update during processing.The result is a
KTable. You can use windowing to aggregate elements in a specific time range.For example, consider a
usersstream where you want to find the average age of the users. Conceptually, you would have to sum up all the ages and divide the result by the number of users. With theaggregateoperation you have an accumulator object that contains the sum of the ages and the count number of the users. For every element in the grouped stream, you update the accumulator. Consider the followingAccumulatorclass.class Accumulator { int sum = 0; int count = 0; void addAge(int age) { sum += age; count++; } }In the following snippet, the
Accumulatorclass is used to find the age sum.KStream<Integer, User> usersStream = getUsersStream(); var grouped = usersStream.groupByKey().
aggregate(() -> new Accumulator(),(key, user, accumulator) -> accumulator.addAge(user.getAge()),Materialized.<Integer, Accumulator, KeyValueStore<Bytes, byte[]>>as("users-age").withKeySerde(intSerde) .withValueSerde(accumulatorSerde) );Users are grouped by key. In this example, the key is the user ID.
The initial accumulator is created.
For every user, we update the accumulator, adding the age and increasing the counter.
The state store where the data is saved. The
Integervalue is for the key.At the end of the operation, the accumulator contains the sum of the ages and the total number of users processed.
- Reduce
The
reducemethod is a specific case of theaggregateoperation. Thereducemethod iterates over the elements of theKStreamorKTableand for every element, it returns a new element of the same type. The new element is passed to the next iteration, and at the end only one resulting value remains.The result is a
KTable. You can use windowing to reduce elements in a specific time range.For example, consider a stream of numbers where you want to find the maximum number. For every element in the stream, you verify if the element is greater than the current maximum. If it is, then you return the element, otherwise, you return the current maximum. The following snippet shows a possible implementation.
KStream<String, Integer> stream = ...output omitted...; stream.groupByKey()
.reduce( (currentMax, element) -> { if (element > currentMax) { return element; } return currentMax; } );- Count
The
countoperation allows you to count the number of elements in a groupedKStreamorKTable. The result is aKTable. You can use windowing to count elements in a specific time range.For example, in the previous example of the word grouping, you can count the number of words with the same length.
KStream<String, String> wordsStream = ...output omitted...; KTable<String, Long> wordsCount = wordsStream .groupBy((key, word) -> word.length())
.count();