Bookmark this page

Chapter 4. Creating Asynchronous Services with Event Collaboration

Abstract

Goal Create asynchronous services using the event collaboration pattern.
Objectives
  • Apply stateless transformations to event streams.

  • Apply stateful transformations to event streams.

  • Repartition an event stream to scale Streams applications.

Sections
  • Implementing Stateless Event Processing (and Guided Exercise)

  • Implementing Stateful Event Processing (and Guided Exercise)

  • Partitioning Stream Processing for Scalability (and Guided Exercise)

Lab

Creating Asynchronous Services with Event Collaboration

Implementing Stateless Event Processing

Objectives

After completing this section, you should be able to apply stateless transformations to event streams.

Defining Events

An event is an immutable record of something that happened within the scope of the business domain. Events communicate a circumstance that is important to your business. Each event must contain the necessary information to precisely describe what happened.

For example, in the retail business, an event can communicate a sale, or an update in the stock levels of a product.

ProductStockWasIncreased {
    "id": 12345678,
    "increase": 1000,
}

The preceding example represents an event in the retail business domain. It communicates an increase of 1000 units on the stock level of the product with ID 12345678.

Events are generally structured using a key/value format.

  • Key: not required field used for identification, routing, or partitioning purposes.

  • Value: field that contains the event details.

Good Practices for Designing Events

There are some good practices to follow when you are designing the events that define the communication structure of your application. The most widespread good practices among developers are:

  • Follow a standard naming pattern when naming events, including using past tense to indicate what happened in your business domain. For example, the name UserCreated states what happened to the user.

  • Use narrow data types to avoid ambiguity in the meaning of the data. For example, if you have an integer field, always use an integer type instead of a string type.

  • Create single-purpose events to avoid overloading an event type with different business logic's or meanings.

  • Use one logical event definition per stream to avoid confusing what the event is and what the event stream represents. This means that no stream should contain two types of events.

Performing Stateless Transformations on Events

Event-driven architectures use events as the communication channel for the different parts of the application. For that reason, a common task is to process streams of events.

In stateless event processing, you process each event independently of any previous events. There is no stored knowledge of past transactions and the stream processor applies the exact same transformation every time, regardless of previously processed events. This lack of state knowledge simplifies the processing logic and makes the stream processing easier to scale up.

For example, consider a stream of numbers numbersStream = [1, 9, 100, 25, 8, 45]. If you only want to keep the numbers below 25, you would:

  • Create another stream of numbers below 25.

  • Iterate over every element of the numbersStream stream.

  • For every element, compare if it is lower than 25. If it is lower, then send it to the new stream. If it is greater or equal to, then discard it. This comparison does not depend on any previous element or external data.

Because every event is processed independently, this is a stateless transformation.

In the Apache Kafka ecosystem, there are two ways of implementing the stateless processing logic. By using regular consumers and producers or by using Kafka Streams.

Stateless Transformations with the Kafka Streams DSL

Some of the most common stateless transformations with the Kafka Stream DSL are the following:

Branch

Splits a KStream into one or more branches, based on the provided predicate. Each branch is a KStream object. The stream processor node evaluates the predicates in order and places the records only on the first branch with a predicate that matches. If there is no match, then the stream processor node drops the record. You use this method if you want to apply different processing for the records based on certain criteria. For example, in a stream of payments, you might need to process payments greater than 1000 USD in a different way than payments below that number.

orders.split() 1
    .branch( 2
        (key, order) -> order.amount < 100, 3
        Branched.withConsumer( 4
            this::doSomething
        )
    )
    .branch(
        (key, order) -> order.amount > 9000,
        Branched.withConsumer(
            this::doAnotherThing
        )
    );

1

Stateless transformation method

2

Branch definition enclosure

3

Predicate against which each record is evaluated

4

Optional parameter that defines the branch processor. A branch processor is an operation that accepts a KStream as a parameter and returns no result.

The preceding example splits the orders stream into two branches based on the value of the amount field. The first branch contains all the orders with an amount value lower than 100, and uses the doSomething method to process the branch. The second branch contains all the orders with an amount value greater than 9000, and uses the doAnotherThing method to process the branch. The stream processor drops any order with an amount value out of the defined ranges.

Filter

Evaluates a predicate for each element of the stream, and creates a new KStream with all the elements that satisfy the predicate. You use this transformation when you want to discard some stream elements based on certain condition. For example, if you want to keep only a number greater than 99999 in a stream of numbers.

orders.filter( 1
    (key, order) -> order.amount > 99999 2
);

1

Stateless transformation method

2

Predicate against which each record is evaluated

The preceding example creates a new stream that contains all the orders with an amount value greater than 99999.

Inverse Filter

Evaluates a predicate for each element of the stream, and creates a new KStream with all the elements that do not satisfy the predicate.

orders.filterNot( 1
    (key, order) -> order.amount > 99999 2
);

1

Stateless transformation method

2

Predicate against which each record is evaluated

The preceding example creates a new stream that contains all the orders with an amount value lower or equal to 99999.

Foreach

A terminal operation that allows you to perform an action on each record of the stream.

orders.foreach( 1
    (key, order) -> {
        log(order); 2
    }
);

1

Stateless transformation method

2

Action performed on each record

The preceding example loops through the records in the orders stream, and executes the log method on each one of the events.

Map

Creates a new KStream in which the elements are transformations of each one of the records of another stream. This operation changes the keys, and marks the stream for data repartitioning. That means that the next grouping or join operations will repartition the records. The mapValues operation does not cause data repartitioning.

orders.map( 1
    (key, order) -> { 2
        return new KeyValue<>(
            order.id,
            new OrderWasCreatedInEur(order.id, order.amount * 1.15)
        );
    }
);

1

Stateless transformation method

2

Transformation applied to each record

The preceding example transforms the records available in the orders stream, and transforms each one of them into a new OrderWasCreatedInEur event.

Revision: ad482-1.8-cc2ae1c