Developing Event-driven Applications with Apache Kafka and Red Hat AMQ Streams
In this exercise, you will create, alter, and describe a Kafka topic, observe partitions, and verify message ordering of the topics and partitions.
Outcomes
You should be able to:
Create and alter a Kafka topic.
Describe a Kafka topic to see its partition, replication factor, and other configuration details.
Verify the partition distribution, and identify the leader/follower brokers.
Understand the record ordering in topics and partitions.
To perform this exercise, ensure you have the following:
Access to a configured and running Red Hat OpenShift Container Platform cluster.
Access to an installed and running Kafka instance in the OpenShift cluster.
Installed the
occommand-line binary.A configured Python virtual environment, including the grading scripts for this course.
The JDK installed.
From your workspace directory, activate the Python virtual environment.
[user@host AD482]$ source .venv/bin/activate
Important
On Windows, use the Activate.ps1 script to activate your Python virtual environment.
PS C:\Users\user\AD482> ./.venv/Scripts/Activate.ps1Use the lab command to prepare your system for this exercise.
(.venv) [user@host AD482]$ lab start kafka-topics
...output omitted...
· Verifying your Kafka settings ...output omitted...
- Bootstrap Server: my-cluster-kafka-cluster.apps.cluster.example.com
- Bootstrap Port: 443
- Cluster Namespace: your-username-kafka-cluster
...output omitted...
Copy the Bootstrap Server, Bootstrap Port, and Cluster Namespace values.
You will use these values to connect to the Kafka cluster.
The lab command copies the exercise files, from the AD482-apps/kafka-topics/apps directory, at your local Git repository, into the kafka-topics directory, at the root of your workspace.
Procedure 2.1. Instructions
Create a topic named
call-detail-records.Verify that your project is set to
.RHT_OCP4_DEV_USER-kafka-cluster(.venv) [user@host AD482]$
oc projectUsing project "RHT_OCP4_DEV_USER-kafka-cluster" on server ...output omitted...Navigate to the
kafka-topics/resourcesdirectory.Create a
call-detail-records-topic.yamlfile that creates a Kafka topic with the following parameters:Topic name:
call-detail-recordsPartition number: 12
Replication factor: 2
Cluster name:
my-clusterRecord retention period:
604800000(default value of 7 days)Segment file size:
1073741824(default value of 1 GiB)
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: call-detail-records labels: strimzi.io/cluster: my-cluster spec: partitions: 12 replicas: 2 config: retention.ms: 604800000 segment.bytes: 1073741824Create the topic.
(.venv) [user@host resources]$
oc create -f call-detail-records-topic.yamlkafkatopic.kafka.strimzi.io/call-detail-records createdVerify the topic is in the
Readystate:(.venv) [user@host resources]$
oc get kafkatopics call-detail-recordsNAME CLUSTER PARTITIONS REPLICATION FACTOR READY call-detail-records my-cluster 12 2True
Produce and receive records by using the
call-detail-recordstopic.Use the
kafka-console-consumer.shandkafka-console-producer.shscripts. The producer and consumer Kafka scripts are available on every Kafka broker node.Use the
my-cluster-kafka-brokersKubernetes service on port9092to discover the Kafka broker nodes.Run the following command to receive records from the
call-detail-recordstopic.(.venv) [user@host resources]$
oc exec -it my-cluster-kafka-0 \-- bin/kafka-console-consumer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic call-detail-recordsBecause the topic is empty, the output should be empty. Leave the terminal window open and the console consumer working.
Open a new terminal window and start a producer for the
call-detail-recordstopic. Enter a text after the input indicator>. Then, press Enter to send each line as a separate record.(.venv) [user@host]$
oc exec -it my-cluster-kafka-0 \-- bin/kafka-console-producer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic call-detail-records>Call Detail Record 1 >Call Detail Record 2 >Call Detail Record 3Observe that the consumer receives the records:
Call Detail Record 1 Call Detail Record 2 Call Detail Record 3
Terminate the consumer and producer applications. Then, close the extra terminal window.
Alter the
call-detail-recordstopic.Copy the
kafka-topics/resources/call-detail-records-topic.yamlfile that you created in the preceding steps and rename itcall-detail-records-topic_altered.yaml.In the
call-detail-records-topic_altered.yamlfile, change the partition number from12to24andretention.msfrom604800000to2629800000(1 month).The final file should have the following content:
apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: call-detail-records labels: strimzi.io/cluster: my-cluster spec:partitions: 24replicas: 2 config:retention.ms: 2629800000segment.bytes: 1073741824Alter the
call-detail-recordstopic by using theoc applycommand.(.venv) [user@host resources]$
oc apply \-f call-detail-records-topic_altered.yamlkafkatopic.kafka.strimzi.io/call-detail-records configuredUse the
oc describecommand to verify the state of thecall-detail-recordstopic.(.venv) [user@host resources]$
oc describe kafkatopic call-detail-records...output omitted... Spec: Config:retention.ms: 2629800000segment.bytes: 1073741824Partitions: 24Replicas: 2 Status: Conditions: Last Transition Time: 2021-08-22.10:36:59.892248Z Status: TrueType: Ready...output omitted...Verify that the topic operator modified the topic.
Describe the
call-detail-recordstopic by using the Kafkakafka-topics.shshell script. Run the following command to see the change details of the topic.(.venv) [user@host resources]$
oc exec -it my-cluster-kafka-0 \-- bin/kafka-topics.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--describe --topic call-detail-recordsTopic: call-detail-records PartitionCount: 24 ReplicationFactor: 2 Configs: segment.bytes=1073741824,retention.ms=2629800000,message.format.version=2.8-IV2 Topic: call-detail-records Partition: 0 Leader: 1 Replicas: 1,0 Isr: 1,0 ...output omitted... Topic: call-detail-records Partition: 23 Leader: 0 Replicas: 0,1 Isr: 0,1
Verify that there is no record ordering guarantee in a topic with multiple partitions. Then, verify that the record order is preserved in a one-partition topic.
Restart the consumer to receive records from
call-detail-records.Add the
--from-beginningflag to the consumer command to receive already sent records.(.venv) [user@host resources]$
oc exec -it my-cluster-kafka-0 \-- bin/kafka-console-consumer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic call-detail-records --from-beginningCall Detail Record 2 Call Detail Record 1 Call Detail Record 3Note that your order output might be different from the previous output.
Notice that the consumer does not maintain the producer record order. Re-execute the command multiple times to verify there is no guaranteed order at the topic level.
Stop the receiver process in the terminal.
Create a topic named
one-partition-topic. Use thekafka-topics/resources/one-partition-topic.yamlfile.(.venv) [user@host resources]$
oc create -f one-partition-topic.yamlkafkatopic.kafka.strimzi.io/one-partition-topic createdSend records to the
one-partition-topictopic.(.venv) [user@host resources]$
oc exec -it my-cluster-kafka-0 \-- bin/kafka-console-producer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic one-partition-topic>Message 1 >Message 2 >Message 3Terminate the producer.
Receive the records from the
one-partition-topictopic by using the--from-beginningflag to verify the record ordering.(.venv) [user@host resources]$
oc exec -it my-cluster-kafka-0 \-- bin/kafka-console-consumer.sh \--bootstrap-server my-cluster-kafka-brokers:9092 \--topic one-partition-topic --from-beginningMessage 1 Message 2 Message 3Run this command again and check that the message order is kept.
This concludes the guided exercise.