Bookmark this page

Partitioning Stream Processing for Scalability

Objectives

After completing this section, you should be able to repartition an event stream to scale Streams applications.

Scaling Apache Kafka Streams

When the amount of data required by your application begins to grow, it is often necessary to increase the number of processing resources. Kafka Streams scales by load balancing the workload between multiple execution threads or instances of the same application.

The Streams engine parallelizes the topology execution by splitting it into tasks. Each task executes independently of others, and processes events from a subset of the partitions. The Streams engine defines the number of tasks and depends on the number of partitions of the topics processed by the application.

Increasing Processing Rate

You can increase the processing rate (i.e. the amount of data that your application can process over time) of a Kafka Streams topology by adding more threads or new instances of your stream processing application. When you add more instances of your application, Kafka Streams distributes some of the tasks that were running among the new instances. Therefore, to achieve scalability in Kafka Streams, it is important to know the number of partitions that the topic has.

You can have as many tasks as the number of partitions you have in the processing topics. If you start more instances than the number of partitions, then the redundant instances do not process any data. However, in the case that any of the active instances crashes down, Kafka uses the redundant instances as a fall back. To have more active instances than the number of partitions in the topic, you can use Kafka Streams to move the records to another topic with more partitions. You would only use this topic for the Kafka Streams processing.

For example, consider a topic with four partitions and only one application instance. The single instance reads all the data from all partitions, as the following image illustrates.

Figure 4.5: Single Kafka Streams instance reading from four partitions

If you start another application instance, however, then the reading workload is distributed across the different instances, as the following image illustrates.

Figure 4.6: Two Kafka Streams instances reading from four partitions

Decreasing Processing Rate

You can decrease your Kafka Streams application processing rate by removing threads or application instances. The tasks assigned to the stopped instances are distributed across the remaining instances. The state of the stopped instances is also copied to the remaining instances, which allows you to resume the transformations.

Repartitioning Event Streams

In Kafka Streams, repartitioning means moving records from a stream to another source topic to perform transformations.

Understanding Repartitioning

For example, consider two instances of Kafka Streams reading from a topic with two partitions called users, where records are keyed by user ID. Every instance reads from a partition, as the following image illustrates.

Figure 4.7: Users' records keyed by ID

Consider that you must count the number of users that belong to the same country. To group the users by the country field, you use the groupBy method of the KStream object. Then, you use the count aggregation.

KStream<Integer, User> userStreams = builder.stream(
        "users",
        Consumed.with(intSerde, userSerde)
    );

userStreams.groupBy((key, value) -> value.country).count();

However, users with the same country are read by different Kafka Streams instances, which means that it is not possible to group them because no single instance has all the records. To overcome this problem, Kafka Streams creates another topic that uses the country field as key, instead of id. This way, users with the same key (the same country) go to the same partition and a single instance reads them. This is an example of repartitioning.

Figure 4.8: Users' records keyed by country as a result of groupBy

These kinds of topics are called partition topics, and they only work in the context of Kafka Streams without modifying the original topic.

Technically, when Kafka Streams performs repartitioning, it splits your topology in to sub-topologies. If you describe the topology from the previous example, the result should be similar to the following output.

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [users]) 1
      --> KSTREAM-KEY-SELECT-0000000001
    Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
      --> KSTREAM-FILTER-0000000005
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-FILTER-0000000005 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-KEY-SELECT-0000000001
    Sink: KSTREAM-SINK-0000000004 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition) 2
      <-- KSTREAM-FILTER-0000000005

  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition]) 3
      --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])
      --> none
      <-- KSTREAM-SOURCE-0000000006

1

The topic users is the source processor of the first sub-topology.

2

The repartition topic where users are keyed by country is the sink processor of the first sub-topology.

3

The repartition topic is the source processor of the second sub-topology.

Transformations That Trigger Repartitioning

Both stateless and stateful transformations can cause repartitioning. Repartitioning is usually triggered when you change the key of a record, so any transformation that involves modifying keys can cause repartitioning. Some transformations mark the stream for repartition, which means that Kafka Streams only performs the repartition if any transformation applied after requires it.

For example, the map and flatmap transformations mark the stream for repartition because they allow you to modify the key of records. If you apply a group or join transformation after, then data is repartitioned.

The groupBy transformation always triggers repartitioning.

Co-partitioning Event Streams

The concept of co-partitioning was introduced in the previous sections of this course because it is a requirement to perform operations, such as joins.

Two streams are co-partitioned if they:

  • Have the same number of partitions.

  • Use the same partition strategy. Therefore, producers send records to partitions based on the same criteria.

Most joins require streams to be co-partitioned because it is the only way to ensure that records with the same key are sent to the same partition. Every Kafka Streams instance reads from a partition, so if records with the same key were sent to different partitions, Kafka would need a complex mechanism to perform the join.

 

References

Neha Narkhede, Gwen Shapira, Todd Palino. (2017) Kafka: The Definitive Guide. O'Reilly. ISBN 978-1-491-99065-0.

Adam Bellemare. (2020) Building Event-Driven Microservices. O'Reilly. ISBN 978-1-492-05789-5.

Revision: ad482-1.8-cc2ae1c