Bookmark this page

Describing Kafka Streams Architecture

Objectives

After completing this section, you should be able to describe Kafka Streams architecture and concepts.

Explaining Kafka Streams Architecture

Kafka Streams uses existing Apache Kafka concepts and APIs. You can think of a Kafka Stream application as a series of processing nodes that use Kafka topics as input and output.

Figure 3.1: Kafka Streams application architecture

You can use Kafka Connect to transform data from external systems to records in a topic. A Kafka Streams application defines a set of nodes that process records from a set of topics. Each node outputs the resulting records into a Kafka topic. For example, you can inject data from the GitHub API as records into a Kafka topic. Next, you can apply filters and transformations to calculate repository statistics, and write the output into another topic.

In Kafka Streams, you define processing nodes. Just like you can define nodes to read input records, you can transform them and output the resulting records into other Kafka applications, or into external systems, for example by using Kafka Connect. Within a Kafka Streams application, processing nodes form a topology. A Kafka Streams topology is a directed acyclic graph (DAG) of processors that declare a function to apply to each record. In a topology, edges that connect processors represent streams.

Figure 3.2: An example Kafka Streams topology

Defining Kafka Stream Processor Nodes

A processor is a topology node that receives a stream of records as its input. Each processor defines a function that is applied to every record in a stream. Subsequently, Kafka passes processed records to processors further down in the topology.

In a topology, a processor can have an additional function:

Source Processor

A source stream processor creates an entry point to your Kafka Streams application. A source processor reads a topic and turns it into a continuous stream of data.

Sink Processor

A sink stream processor creates an endpoint of your Kafka Streams application. A sink processor writes processed records to a Kafka topic.

Topologies require source and sink processors to feed data into the topology and write the results. Without source and sink processors, topologies are just specifications, without data flowing through the nodes.

A processor can be stateful or stateless. Stateless processors apply functions that do not require a state store, such as the map or filter functions. Stateful processors apply functions that require one or more state stores, such as the join, groupBy, or other aggregation functions. A state store enables a processor to store previous state for further functions, such as interactive querying, or creating data views by using data windows.

Implementing Kafka Streams Topologies with Streams DSL

Kafka Streams provides a StreamsBuilder object that enables you to define and execute a topology by using a domain-specific language (DSL).

The following simplified Streams application applies the System.out#println function to each record.

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
// class omitted

final StreamsBuilder builder = new StreamsBuilder();

builder.stream("input-topic")
  .foreach((key, value) -> System.out.println(value));

final Topology topology = builder.build();

// Property configuration omitted
final KafkaStreams streams = new KafkaStreams(topology, getProperties());
streams.start();

Each additional DSL function creates one or more processors in the application topology. You can query the Topology object by using the Topology#describe function to get a topology description. The previous code sample creates the following topology:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
      --> KSTREAM-FOREACH-0000000001

The following is a visualization of the topology description:

Figure 3.3: Stateless Kafka Streams topology

You can also create stateful topologies, for example:

new StreamsBuilder().stream("input-topic")
  .groupBy(
      (key, value) -> value,
      Grouped.with(Serdes.Integer(),Serdes.String())
  ).count();

In the preceding example, the groupBy DSL function selects a new key based on the record value, and filters values with the null key. Then, the count function requires a state store to keep track of the number of records in each category. Consequently, the preceding code generates the following topology:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])
      --> KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-KEY-SELECT-0000000001
    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)
      <-- KSTREAM-FILTER-0000000005

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> none
      <-- KSTREAM-SOURCE-0000000006
Figure 3.4: Stateful Kafka Streams topology

See the streams API documentation for further details about the generated topology.

In the preceding stateful example, consider that input-topic contains userID, "link" records. These records log what links a user clicks on a web page:

1, "/technologies"
1, "/our-code-is-open"
2, "/technologies/linux/platform"
5, "/technologies/cloud-computing/openshift"
5, "/technologies"
5, "/our-code-is-open"

The preceding topology counts how many times any user clicked a particular link. Instead of grouping the records by the user ID, the Kafka Streams application groups the records by URL to count records with the same URL value:

"/technologies", 2 // clicked by user IDs 1 and 5
"/our-code-is-open", 2
"/technologies/linux/platform", 1
...

Because the grouping is by URL, and to parallelize the count operation, Kafka Streams sends records with the same URL to the same processing node. Because the Kafka Streams scalability model is based on Kafka partitions, the grouping operation sends such records to the same partition. In the preceding example, Kafka Streams sends records with the value /technologies to the same partition, regardless of the different user ID values.

This operation is called repartitioning. The generated topology first selects records based on the URL values. Then, Kafka assigns the URL values as a key for the selected records. Finally, based on the new key, the records can be repartitioned, which ensures each record with the same URL is counted by the same node with the same local state.

Note

The following chapters explore in greater depth the partition-record relation and the use of co-partitioning and repartitioning in a Kafka Streams application.

The Kafka Streams DSL provides a number of built-in methods that often correspond to higher-order functions, for example:

  • filter

  • map

  • foreach

  • groupBy

  • peek

Each of the DSL methods adds one or more processors to the topology. If you require greater control over the higher-order functions, then you can implement custom processor classes, and include them in the topology. This is out of the scope for this course.

Defining the Stream-Table Relationship

Kafka Streams applications operate on streams of records. You can understand a stream as a distributed change log of events. As the application processes each record, it can be useful to map the stream of records to a database.

Consider the following records of key-tuple stock values in your stream:

(1, ("VMW", 15.33))
(2, ("AAPL", 151.88))
(3, ("IBM", 138.33))
(4, ("IBM", 139.01))

As each record comes in, you can have a consumer application that either appends a new row to a database table, or changes the existing value.

Figure 3.5: Transforming a record stream to a table

Similarly, consider the opposite interaction. You have a database table. Every time you append a new row, or change a row value, you can have a producer application that emits a new record to a stream:

Figure 3.6: Transforming a database table to a record stream

This relationship between a table and a stream illustrates the table-stream duality. You can map a stream of records to a database table-like structure, and you can map a table-like structure to a stream of data.

Warning

The Apache Kafka ecosystem does not aim to eliminate a relational database from your projects. You can replace a database with Apache Kafka in specific situations and applications. However, this requires deep knowledge of distributed systems and relational algebra.

Using Windowing and Aggregations

The stream-table relationship is useful for performing database operations on streams of data. The following operations are common transformations that Kafka Streams enables you to perform by default:

Aggregations

Aggregation operations, such as groupBy enable you to treat a stream of records as a table. Consequently, you can calculate stateful operations on a stream, such as count or join.

Windowing

Windowing enables you to perform operations on a subset of your stream based on time. For example, you can calculate the average value of records within the last day.

Explaining Time in Kafka Streams

Kafka applications associate a timestamp with each record. The record timestamp differs based on what time it represents, for example:

  • Event time: time when the event happened.

  • Processing time: time when a Kafka Streams application processed the event.

  • Ingestion time: time when a Kafka broker received the event.

Kafka Streams applications use the timestamp of each record to determine the stream time. Stream time is the concept of time within a stream. Stream time advances only when new records flow into the stream.

Stream time is data-driven time that Kafka Streams applications use for specific operations, such as windowing or time-dependent joins.

Explaining Parallelism in Kafka Streams

Kafka Streams applications subscribe to the horizontal scalability paradigm of the Apache Kafka ecosystem. Apache Kafka uses topics partitions to scale out. Similarly, Kafka Streams uses stream partitions to parallelize the workload. Each stream partition in Kafka Streams is associated with a Kafka topic partition.

Stream partitions are ordered sequences of records. A record key determines the stream partition to which a record is assigned.

Kafka Streams assigns each stream partition to a stream task. Each task then instantiates a topology and processes messages independently from other instances of the application.

For example, consider an input topic with seven partitions. This means a Kafka Streams application can create at most seven stream partitions. Consequently, you can scale out your Kafka Streams application to seven nodes. If you scale out your Kafka Streams application to more nodes, additional nodes remain idle until Kafka Streams can create more stream partitions.

You can also scale vertically, or scale up, by using multi-threaded nodes. That means one application node can process multiple stream tasks. However, this does not change the maximum parallelism factor. For example, if the hardware that executes your Kafka Streams application has eight or more CPU threads, you can configure parallelism by using seven threads. Any additional workers or threads remain idle.

 

References

Apache Kafka Streams Core Concepts

Streams API documentation

KStream API documentation

KTable API documentation

Bejeck, Bill. (2018). Kafka Streams in Action: Real-time apps and microservices with the Kafka Streams API.

Revision: ad482-1.8-cc2ae1c