Exploring Kafka Streams Partitioning, Scaling, and Fault Tolerance

Intro

This post is about the inner workings of a few core characteristics of Kafka Streams.

There is certainly some excellent conceptual documentation on the topics of repartitioning, workload distribution, and state stores. Still, while reading through it, I felt I needed some deeper context.

That’s how I decided to dive into the matter and share the results in this article.

Concretely, here are the main steps we’ll go through:

  1. Create a simple Kafka Streams app using topology with stateful aggregation and repartitioning.
  2. Discuss how the records are repartitioned and review the contents of the repartition topic.
  3. Read the data from the RocksDB state store containing disk-persisted aggregation results.
  4. Inspect the internal changelog Kafka topic that backs the state store for fault tolerance.
  5. Run multiple instances of the streaming application and track the work distribution among them.

You can check the code on GitHub.

This article explores the Kafka Streams framework in C#. Reading through the Java docs, the implementation there looks really identical, although I haven’t done deep research on all platforms. Still, I believe that even if there are minor differences, the core design concepts should stay the same, so my hope is JVM users will find this post helpful as well.

Overview of the Demo App

The sample application we’re about to review is pretty straightforward.

It contains Order entities flowing in through a Kafka orders topic. Each Order includes a ProductType. Then, our Kafka Streams pipeline groups the orders by product type and sums up the total count for each product.

The Order class is quite simple:

public class Order
{
    public string Id { get; set; }
    public ProductType ProductType { get; set; }
}

where the ProductType is just an enum with two values:

public enum ProductType
{
    Jackets,
    Shirts
}

This simplicity enables us to focus on the research topics without spending too much time on the actual functionality.

Dataflow Diagram

Visually, here is some data flow for a random stream of data:

Note that, for the orders and repartition topics, the diagram represents only the IDs of the values (Product Type/Order ID) so that we can easily follow the data repartitioning. In reality, each value in these topics contains a complete order entity.

Don’t worry if that doesn’t make a lot of sense for now. We’ll talk about each of the components in detail later on.

For now, let’s just give a quick overview.

Orders Topic – the source Kafka topic partitioned by the order ID.

Repartition Topic – this is an internal topic created by Kafka Streams that contains the same orders as the orders topic but partitioned by product type.

RocksDB State Store – an embedded database that keeps the result of the aggregation.

Changelog Topic – this topic acts as a central backup of the RocksDB store. It allows the framework to rebuild the state in case of service crashes where we might lose the data stored locally on disk.

Running the Demo

Let’s delve into the main modules of the sample system, see how to execute them, and what’s produced behind the scenes.

Setup Kafka

To run everything locally, you need to install a Kafka cluster with two brokers. One way to do that is via the following docker-compose:

---
version: '3.8'
services:
  zookeeper-1:
    image: confluentinc/cp-zookeeper:6.1.9
    ports:
      - '32181:32181'
    restart: always
    environment:
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_TICK_TIME: 2000


  kafka-1:
    image: confluentinc/cp-kafka:6.1.9
    ports:
      - '9092:9092'
    restart: always  
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2


  kafka-2:
    image: confluentinc/cp-kafka:6.1.9
    ports:
      - '9093:9093'
    restart: always  
    depends_on:
      - zookeeper-1
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:32181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
      KAFKA_DEFAULT_REPLICATION_FACTOR: 2
      KAFKA_NUM_PARTITIONS: 2
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2

The Orders Producer

This simple console app creates the orders topic with two partitions and pushes orders into it every second.

Here’s a code snippet of the producing logic:

A few points here:

  1. The format of the key is order_<int>. I’m using that just for clarity when reviewing the topic later.
  2. We get an equal number of orders for each product – handled by the division modulo the number of product types (orderId % 2).
  3. For simplicity, we use JSON for serializing/deserializing the messages.

Running the App

You can do dotnet run to get the producer running. The result should look like this:

The Orders Topic

For reviewing the contents of the Kafka topics, I’ll use the Offset Explorer tool as it gives us a decent visual experience. Another helpful tool I’m often utilizing is kafkacat – you might want to look into it as well.

After we run all of the components of the app, these are all the topics we’ll see created:

For now, let’s check the records of the orders topic:

The entries are distributed between the two partitions based on the default Kafka partitioning algorithm.

The Orders Aggregator

This program contains the actual Kafka Streams aggregation logic. Here is the core fragment:

I believe the code should be relatively straightforward to comprehend. We group the orders by product and sum up the total number of orders for each product in the Aggregate step.

Notice the Peek operator – it doesn’t have any functional purpose. It’s just used for the side effect – printing the current item to the console.

Let’s describe a couple of important points here:

  1. The GroupBy would cause the data to be repartitioned by ProductType. In simple terms, repartitioning, in this case, means that Kafka will redistribute the orders to a new internal topic by using the ProductType as a partition key instead of the order ID. The ProductType is an enum, a named integer, so Kafka will use the integer value for the partition key. We’ll review the repartition topic later in the article.
  2. We configure the Aggregate operator to store the accumulated state in an embedded RocksDB database on disk. The framework would also back this disk storage with an internal changelog topic for fault tolerance (in case the service crashes). We’ll inspect both of these storages later on.

Running the App

It’s best if you run four instances of the Aggregator app. This will show very well how the work gets distributed among them.

Why is that?

Kafka Streams would split the workload into “tasks”. A “task” is the smallest unit of execution. The number of tasks depends on the number of partitions in the topology. For our example, the orders topic has two partitions, and the repartition topic also has two. That’s four in total, so the framework spins up one task for each.

Let’s run four instances of the Aggregator and see what output we’re getting.

First, you’ll notice that every instance would initially print the full topology:

The data flow is split into two sub-topologies:

  • Sub-topology 0 with Source being the orders topic and Sink the orders-products-quantities-repartition topic.
  • Sub-topology 1 with Source being the orders-products-quantities-repartition topic and then aggregating the data into the orders-products-quantities store.

After the app prints the topology, it starts processing the data. Here’s a screenshot of the four instances running in parallel:

Every instance is assigned to a specific task. Let’s describe these assignments:

Top left instance – Task “0-0” – Handling Partition 0 of the orders topic.

Bottom right instance – Task “0-1” – Handling Partition 1 of the orders topic.

Top right instance – Task “1–0” – Handling Partition 0 of the repartition topic.

Bottom left instance – Task “1-1” – Handling Partition 1 of the repartition topic.

The Repartition Topic

Let’s look at the records of the repartition topic:

Nothing surprising – all the orders for ProductType 0 (Jackets) are assigned to Partition 0, while all the records for ProductType 1 (Shirts) are sent to Partition 1.

The RocksDB Data

RocksDB is an embedded key-value database. We can instruct Kafka Streams to use it for persisting data on disk for stateful operators like Aggregate.

I wanted to inspect the stored data, which wasn’t an effortless endeavor because the persistence logic is more or less an implementation detail of the framework. It has its custom format and binary serializer, making the task a little tricky.

Still, scanning through the Kafka Streams source code, I managed to accomplish this, so I wrote a small program that prints out the contents of the RocksDB database.

Which prints a result like:

I won’t get into the details here, so feel free to explore the implementation yourself.

The important thing to understand is that each task would work with its’ own RocksDB instance.

In our case, we have two tasks that aggregate data from the repartition topic (tasks 1-0 and 1-1), so we get two RocksDB instances. You can see them in a local directory:

We can check the RocksDB content for each of them:

Note that the name of the state store orders-product-quantities is something we set in code.

The Changelog Topic

The Kafka Architecture document gives a pretty good overview of the changelog topic:

Kafka Streams makes sure that the local state stores are robust to failures, too. For each state store, it maintains a replicated changelog Kafka topic in which it tracks any state updates. These changelog topics are partitioned as well so that each local state store instance, and hence the task accessing the store, has its own dedicated changelog topic partition. Log compaction is enabled on the changelog topics so that old data can be purged safely to prevent the topics from growing indefinitely. If tasks run on a machine that fails and are restarted on another machine, Kafka Streams guarantees to restore their associated state stores to the content before the failure by replaying the corresponding changelog topics prior to resuming the processing on the newly started tasks. As a result, failure handling is completely transparent to the end user.

Here’s the content of the changelog topic for our use case:

As expected, Partition 0 and Partition 1 store the values produced by tasks 1-0 and 1-1, respectively.

The accumulated sum for Product 0 is 10, and for Product 1, it’s 9. This matches the values returned from the RocksDB database.

Summary

I hope this article unraveled some of the mysteries behind Kafka Streams so you get the knowledge and confidence needed when developing your streaming pipelines.

Thanks for reading, and see you next time!

Resources

  1. GitHub Repo
  2. Streamiz.Kafka.Net
  3. Kafka Streams Overview

2 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Tan Hoang
Tan Hoang
1 year ago

Thanks for topic, but the github repo URL is not fond can you check again
Page not found · GitHub

Site Footer

Subscribe To My Newsletter

Email address