Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
- SectionImplementing Stateless Event Processing
- Guided Exercise: Implementing Stateless Event Processing
- Implementing Stateful Event Processing
- Guided Exercise: Implementing Stateful Event Processing
- Partitioning Stream Processing for Scalability
- Guided Exercise: Partitioning Stream Processing for Scalability
- Lab: Creating Asynchronous Services with Event Collaboration
- Summary
Abstract
| Goal | Create asynchronous services using the event collaboration pattern. |
| Objectives |
|
| Sections |
|
| Lab |
Creating Asynchronous Services with Event Collaboration |
After completing this section, you should be able to apply stateless transformations to event streams.
An event is an immutable record of something that happened within the scope of the business domain. Events communicate a circumstance that is important to your business. Each event must contain the necessary information to precisely describe what happened.
For example, in the retail business, an event can communicate a sale, or an update in the stock levels of a product.
ProductStockWasIncreased {
"id": 12345678,
"increase": 1000,
}The preceding example represents an event in the retail business domain. It communicates an increase of 1000 units on the stock level of the product with ID 12345678.
Events are generally structured using a key/value format.
Key: not required field used for identification, routing, or partitioning purposes.
Value: field that contains the event details.
There are some good practices to follow when you are designing the events that define the communication structure of your application. The most widespread good practices among developers are:
Follow a standard naming pattern when naming events, including using past tense to indicate what happened in your business domain. For example, the name
UserCreatedstates what happened to the user.Use narrow data types to avoid ambiguity in the meaning of the data. For example, if you have an integer field, always use an integer type instead of a string type.
Create single-purpose events to avoid overloading an event type with different business logic's or meanings.
Use one logical event definition per stream to avoid confusing what the event is and what the event stream represents. This means that no stream should contain two types of events.
Event-driven architectures use events as the communication channel for the different parts of the application. For that reason, a common task is to process streams of events.
In stateless event processing, you process each event independently of any previous events. There is no stored knowledge of past transactions and the stream processor applies the exact same transformation every time, regardless of previously processed events. This lack of state knowledge simplifies the processing logic and makes the stream processing easier to scale up.
For example, consider a stream of numbers numbersStream = [1, 9, 100, 25, 8, 45].
If you only want to keep the numbers below 25, you would:
Create another stream of numbers below
25.Iterate over every element of the
numbersStreamstream.For every element, compare if it is lower than
25. If it is lower, then send it to the new stream. If it is greater or equal to, then discard it. This comparison does not depend on any previous element or external data.
Because every event is processed independently, this is a stateless transformation.
In the Apache Kafka ecosystem, there are two ways of implementing the stateless processing logic. By using regular consumers and producers or by using Kafka Streams.
Some of the most common stateless transformations with the Kafka Stream DSL are the following:
- Branch
Splits a
KStreaminto one or more branches, based on the provided predicate. Each branch is aKStreamobject. The stream processor node evaluates the predicates in order and places the records only on the first branch with a predicate that matches. If there is no match, then the stream processor node drops the record. You use this method if you want to apply different processing for the records based on certain criteria. For example, in a stream of payments, you might need to process payments greater than 1000 USD in a different way than payments below that number.orders.
split().
branch((key, order) -> order.amount < 100,
Branched.withConsumer(
this::doSomething )
).branch( (key, order) -> order.amount > 9000, Branched.withConsumer( this::doAnotherThing ) );Stateless transformation method
Branch definition enclosure
Predicate against which each record is evaluated
Optional parameter that defines the branch processor. A branch processor is an operation that accepts a
KStreamas a parameter and returns no result.The preceding example splits the
ordersstream into two branches based on the value of theamountfield. The first branch contains all the orders with anamountvalue lower than 100, and uses thedoSomethingmethod to process the branch. The second branch contains all the orders with anamountvalue greater than 9000, and uses thedoAnotherThingmethod to process the branch. The stream processor drops any order with anamountvalue out of the defined ranges.- Filter
Evaluates a predicate for each element of the stream, and creates a new
KStreamwith all the elements that satisfy the predicate. You use this transformation when you want to discard some stream elements based on certain condition. For example, if you want to keep only a number greater than99999in a stream of numbers.orders.
filter((key, order) -> order.amount > 99999
);The preceding example creates a new stream that contains all the orders with an
amountvalue greater than 99999.- Inverse Filter
Evaluates a predicate for each element of the stream, and creates a new
KStreamwith all the elements that do not satisfy the predicate.orders.
filterNot((key, order) -> order.amount > 99999
);The preceding example creates a new stream that contains all the orders with an
amountvalue lower or equal to 99999.- Foreach
A terminal operation that allows you to perform an action on each record of the stream.
orders.
foreach((key, order) -> { log(order);
}
);The preceding example loops through the records in the
ordersstream, and executes thelogmethod on each one of the events.- Map
Creates a new
KStreamin which the elements are transformations of each one of the records of another stream. This operation changes the keys, and marks the stream for data repartitioning. That means that the next grouping or join operations will repartition the records. ThemapValuesoperation does not cause data repartitioning.orders.
map((key, order) -> {
return new KeyValue<>( order.id, new OrderWasCreatedInEur(order.id, order.amount * 1.15) ); } );
The preceding example transforms the records available in the
ordersstream, and transforms each one of them into a newOrderWasCreatedInEurevent.