Bookmark this page

Chapter 3. Building Applications with the Streams API

Abstract

Goal Leverage the Streams API to create data streaming applications.
Objectives
  • Process a basic data stream with Kafka Streams.

  • Describe Kafka Streams architecture and concepts.

  • Use KTables, KStreams and other DSL objects to manage data streams.

Sections
  • Processing Data Streams (and Guided Exercise)

  • Describing Kafka Streams Architecture (and Quiz)

  • Using Basic Kafka Streams Objects (and Guided Exercise)

Lab

Building Applications with the Streams API

Processing Data Streams

Objectives

After completing this section, you should be able to process a basic data stream with Kafka Streams.

Understanding Modern Data Processing Needs

The era of mobile devices, edge computing, and big data has brought increasingly growing data generation rates, as well as a demand to process these large volumes promptly and efficiently. To be successful, modern applications must handle large volumes of data, while maintaining acceptable throughput levels and minimizing latency. Traditional database-centred data processing models, however, are becoming insufficient to handle these requirements.

As a result of these demands, a new data processing technique has emerged: data stream processing.

What is a Data Stream?

A data stream is a continuous flow of data without known limits on the stream properties, such as total size of the events, or the number of events. Data streams are continuous and never ending.

Data streams are common in real-time or data-intensive applications, which are becoming more and more popular. In these applications, users, stakeholders, or external providers generate data at fairly high rates.

The following scenarios are examples of real-world data streams:

  • Messages posted in a social network

  • Application logs

  • Currency exchange rates

  • Users authentication events in a web application. In this case, the authentication mechanism is modeled as a stream of events.

Describing Stream Processing

Stream processing is the action of reading data from a source as it is produced, running computations on this data, and returning the result. When compared with database-focused batch processing, stream processing presents important differences:

Table 3.1. Batch Processing Versus Stream Processing

Batch processingStream processing
Database-oriented processing.Operate directly on the data stream, minimizing storage needs.
The system processes each record multiple times.The system processes each message only once.
The data set is first collected, and then processed.Each message is processed immediately.
Known data set size.The data stream does not have known limits.
Scheduled background computation.On demand, real-time computation.
Prioritize computation accuracy.Prioritize throughput and minimize latency.

Operations

Because data streams are sequences, developers process them by applying well-known operations of sequential data structures, such as map, forEach, and reduce, among others.

Performance

In real-time sequence processing, high throughput and responsiveness are essential. Therefore, stream processing applications must be able to meet the following requirements:

  • Process real-time data in a timely manner

  • Be fault-tolerant

  • Be scalable

  • Guarantee processing

Applications

Applications processing data streams are usually called data streaming applications or streaming applications.

If the data stream contains events occurring in an application, then this is an event streaming application. Event streaming applications are a popular choice to implement the event-driven architecture (EDA) pattern.

Apache Kafka for Data Streams

Kafka is a distributed data streaming platform. You can use it to take care of data stream transportation in a scalable, fault-tolerant, and secure way. Also, Kafka client APIs enable developers to easily create data streams by producing and consuming messages.

The Kafka servers, however, along with its producer and consumer libraries, do not provide features to execute distributed computation logic on the data consumed from a stream. For example, consider that you want to process a Kafka topic, which contains a stream of payments, by using the Kafka producer/consumer libraries.

  • For each payment, you run a fraud detection algorithm, and send detected fraudulent payments to the potential-frauds topic, for later investigation.

  • You send each legitimate payment to the validated-payments topic.

  • For each validated payment, you process the payment.

  • If the application processes the payment successfully, you send a confirmation to the confirmed-payments topic.

In that application, you have to take care of the following responsibilities:

  • Creating, maintaining, and configuring consumers and consumer groups to read from the topics used in the application.

  • Creating, maintaining, and configuring producers, to publish each transformation of the event stream to its destination topic.

  • Guaranteeing processing, avoiding duplicates or data loss, in case of application failures or network issues.

  • Developing mechanisms to distribute the computation, to scale payment processing.

The Kafka Streams library from the Apache Kafka ecosystem is designed to handle such responsibilities.

Note

In addition to data stream storage and transportation, computation should also be scalable, fault-tolerant, and secure.

Introducing Kafka Streams

Apache Kafka Streams is a library for developing distributed stream processing applications. The library is part of the Apache Kafka ecosystem, and is built on top of the Kafka producer and consumer APIs, providing abstractions that represent data streams, tables and transformations. These abstractions allow developers to build stream processing models, with built-in scalability, elasticity, fault-tolerance, and security.

Some of its distinguishing characteristics are the following:

  • Provides an expressive domain specific language (DSL) to produce, consume, store and process data streams.

  • Built on Kafka, which acts as the message broker.

  • Depends on the Kafka client libraries to produce and consume streams.

  • Leverages fault-tolerant features from Kafka and adds additional processing guarantees, such as exactly-once processing.

  • Processes data with a collection of stream and table abstractions, and stateless and stateful data transformations.

  • Defines data processing and computational models, called topologies. A topology is a set of connected data processing nodes forming a directed acyclic graph (DAG), which executes logic and transformations on the data.

  • Distributes the execution of topologies to scale out.

  • Provides state storage and query mechanisms.

Important

In this course:

  • Kafka refers to the distributed message broker server, which allows you to store and distribute data. Red Hat AMQ Streams delivers and manages the Kafka server and its components as a cluster in OpenShift.

  • The Kafka producer and consumer APIs are client libraries. You can install these libraries in your applications to interact with Kafka, producing and consuming messages.

  • Kafka Streams is an additional library for the development of streaming applications. This library wraps the producer and consumer APIs, providing a higher level of abstraction that represents streams and table structures. It allows developers to process Kafka messages in real time and distribute the computational load.

Installing Kafka Streams

Kafka Streams applications require the following libraries from the org.apache.kafka group:

  • kafka-streams: Kafka Streams API

  • kafka-clients: Kafka producer and consumer APIs, required to interact with Kafka

To install Kafka Streams in a Maven project, you must add the kafka-streams dependency to your pom.xml file.

<dependencies>
    ...output omitted...
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.8.0</version>
    </dependency>
</dependency>

kafka-streams depends on kafka-clients, so you do not need to explicitly include the kafka-clients Maven dependency.

Quarkus Extension

In Quarkus projects, the preferred way to use Kafka Streams is the kafka-streams Quarkus extension.

[user@host ~] mvn quarkus:add-extension -Dextensions="kafka-streams"

Installing this extension adds the following Maven dependency to your project's pom.xml file:

<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

Creating a Basic Stream Processing Pipeline

Kafka Streams applications are organized according to the following logical steps:

  1. Set the Kafka and Kafka Streams configuration parameters.

  2. Specify the processing topology.

  3. Build the topology.

  4. Initialize and start the Kafka Streams client.

The following is an example of a very basic Kafka Streams application, illustrating the preceding steps:

...output omitted...

import java.util.Properties;

import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

...output omitted...

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "your-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server-host:kafka-server-port");
...more props omitted...
StreamsConfig config = new StreamsConfig(settings); 1

StreamsBuilder builder = new StreamsBuilder(); 2

builder 3
    .stream(
        "topic-name",
        Consumed.with(keySerde, valueSerde)
    ) 4
    .foreach((key, value) ->
        System.out.println("Received payment: " + value)
    ) 5
    .to(
        "output-topic",
        Produced.with(keySerde, valueSerde)
    ); 6

Topology topology = builder.build(); 7

KafkaStreams streams = new KafkaStreams(topology, config); 8

streams.start(); 9

1

Define Kafka Streams configuration parameters. The StreamsConfig object contains the definition of all configuration parameters. application.id and bootstrap.servers are required. For more details, refer to the Configuration Parameter Reference.

2

Create an instance of StreamsBuilder.

3

Use the StreamsBuilder instance to start building the processing topology.

4

Specify the Kafka topic to consume the stream, and the required deserializers to consume the key and the value of each message.

5

Process the stream. This particular example uses the foreach transformation to print the value of each message.

6

Send the result to another Kafka topic.

7

Build the topology.

8

Create the Kafka Streams client with the topology and the configuration properties.

9

Start the Kafka Streams client.

The Kafka Streams DSL offers a variety of abstractions to represent data streams and tables, as well as different operations. These abstractions and operations are covered later in the course.

Kafka Streams Quarkus Extension

If you use the Quarkus Kafka Streams extension, then you do not need to take care of the KafkaStreams client.

Quarkus handles the following for you:

  • Initialization, creation, and stopping of the KafkaStreams client.

  • Reading the configuration in the application.properties file and injecting configuration parameters into the KafkaStreams client.

To create a processing pipeline, you must create a CDI producer method that returns a topology, as the following example shows:

@Produces
public Topology buildTopology() {

    StreamsBuilder builder = new StreamsBuilder();

    builder
        .stream(
            "input-topic",
            Consumed.with(keySerde, valueSerde)
        )
        .filter((key, value) -> value > 0)
        .to(
            "output-topic",
            Produced.with(keySerde, valueSerde)
        );

    Topology topology = builder.build();
    return topology;
}

Finally, to set the Kafka Streams configuration, set the following configuration values in the applications.properties file of your Quarkus application.

quarkus.kafka-streams.application-id=your-app-id
quarkus.kafka-streams.bootstrap-servers = kafka-server-host:kafka-server-port
quarkus.kafka-streams.topics = input-topic,output-topic

...output omitted...

The quarkus.kafka-streams.application-id and the quarkus.kafka-streams.bootstrap-servers parameters are not required. If not set, then they default to the Quarkus application name and localhost:9012, respectively.

The quarkus.kafka-streams.topics parameter is required and prevents Quarkus from starting the Kafka Streams engine if any of the specified Kafka topics are not ready.

Most of the configuration parameters of the Kafka Streams Quarkus extension are mapped from the Kafka Streams native parameters. For more information, refer to the Quarkus Kafka Streams Extension - Configuration Guide.

Revision: ad482-1.8-cc2ae1c