How to name stateful operations in Kafka Streams

Question:

How can I change the topology of an existing Kafka Streams application that is compatible with the existing one?

Edit this page

Example use case:

You want to add or remove some operations in your Kafka Streams application. In this tutorial we'll name the changelog and repartition topics so that the topology updates don't break compatibility.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir naming-changelog-repartition-topics && cd naming-changelog-repartition-topics

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

And launch it by running:

docker-compose up -d

3
Configure the project

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        jcenter()
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.15.1"
        classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
    }
}

plugins {
    id "java"
    id "com.google.cloud.tools.jib" version "1.1.1"
    id "idea"
    id "eclipse"
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"

repositories {
    jcenter()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.apache.avro:avro:1.9.1"
    implementation "org.slf4j:slf4j-simple:1.7.26"
    implementation "org.apache.kafka:kafka-streams:2.4.1"
    implementation "io.confluent:kafka-streams-avro-serde:5.4.1"
    testImplementation "org.apache.kafka:kafka-streams-test-utils:2.4.1"
    testImplementation "junit:junit:4.12"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.NamingChangelogAndRepartitionTopics"
    )
  }
}

shadowJar {
  archiveBaseName = "naming-changelog-repartition-topics-standalone"
  archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=naming-changelog-repartition-topics
bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

join.topic.name=join-topic
join.topic.partitions=1
join.topic.replication.factor=1

4
Create an initial Kafka Streams topology

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

The point of this tutorial is to discuss the importance of naming state stores (hence changelog topics) and repartition topics. In addition to having a more readable topology description, you can make your Kafka Streams application more robust to topology changes.

Lets look at the core logic of the Kafka Streams application:


  KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)));

  KStream<Long, Long> countStream = inputStream.groupByKey().count().toStream();

  KStream<Long, String> joinedStream = inputStream.join(exampleCountStream, (v1, v2) -> v1.getName() + v2.toString(),
                                                              JoinWindows.of(Duration.ofMillis(100)),
                                                              StreamJoined.with(longSerde, exampleSerde, longSerde));

In the inputStream there is a selectKey() operation, changing the key of the incoming stream.

As a result, executing the inputStream.groupByKey() operation forces a repartition to make sure the modified keys end up on the correct partition.

Additionally, count() is an aggregation, so Kafka Streams creates a state store plus a changelog topic for fault-tolerance of the state store.

There are additional state stores and another repartition topic in this topology, but we’ll focus on the countStream to keep things simple. The same principles apply to any state store, changelog and repartition topic.

When using the DSL, Kafka Streams generates the names for each processor, state store, and any required internal topics. To view a textual representation of your topology, you can run Topology.describe().

We won’t show the full output here, but describing this topology indicates the following names for the state store, changelog and repartition topic:

  • state store - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002

  • changelog topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog

  • repartition topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition

Here’s an image of the relevant part of the topology (view a full image of the topology)


(Note that changelog topics don’t show up in Topology.describe())

You’ll notice the number 0000000002 at the end of the names. Kafka Streams appends an incrementing number as part of the name for each part of the topology. Here the state store, changelog topic, and repartition topic share the same number, since by default, they reuse the name of the corresponding state store.

Now go ahead and create the following file at src/main/java/io/confluent/developer/NamingChangelogAndRepartitionTopics.java.

package io.confluent.developer;


import io.confluent.common.utils.TestUtils;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.StreamJoined;

public class NamingChangelogAndRepartitionTopics {


  public Properties buildStreamsProperties(Properties envProps) {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
    props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
    props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

    return props;
  }

  public Topology buildTopology(Properties envProps) {
    final StreamsBuilder builder = new StreamsBuilder();
    final String inputTopic = envProps.getProperty("input.topic.name");
    final String outputTopic = envProps.getProperty("output.topic.name");
    final String joinTopic = envProps.getProperty("join.topic.name");

    final Serde<Long> longSerde = Serdes.Long();
    final Serde<String> stringSerde = Serdes.String();

    final boolean addFilter = Boolean.parseBoolean(envProps.getProperty("add.filter"));
    final boolean addNames = Boolean.parseBoolean(envProps.getProperty("add.names"));

    KStream<Long, String> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, stringSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.substring(0, 1)));
    if (addFilter) {
      inputStream = inputStream.filter((k, v) -> k != 100L);
    }

    final KStream<Long, String> joinedStream;
    final KStream<Long, Long> countStream;

    if (!addNames) {
         countStream = inputStream.groupByKey(Grouped.with(longSerde, stringSerde))
                                    .count()
                                    .toStream();

        joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                                              JoinWindows.of(Duration.ofMillis(100)),
                                                              StreamJoined.with(longSerde, stringSerde, longSerde));
    } else {
        countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
                                   .count(Materialized.as("the-counting-store"))
                                   .toStream();

        joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                                              JoinWindows.of(Duration.ofMillis(100)),
                                                              StreamJoined.with(longSerde, stringSerde, longSerde)
                                                                          .withName("join").withStoreName("the-join-store"));
    }

    joinedStream.to(joinTopic, Produced.with(longSerde, stringSerde));
    countStream.map((k,v) -> KeyValue.pair(k.toString(), v.toString())).to(outputTopic, Produced.with(stringSerde, stringSerde));


    return builder.build();
  }

  public void createTopics(final Properties envProps) {
    final Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
    try (final AdminClient client = AdminClient.create(config)) {

      final List<NewTopic> topics = new ArrayList<>();

      topics.add(new NewTopic(
          envProps.getProperty("input.topic.name"),
          Integer.parseInt(envProps.getProperty("input.topic.partitions")),
          Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

      topics.add(new NewTopic(
          envProps.getProperty("output.topic.name"),
          Integer.parseInt(envProps.getProperty("output.topic.partitions")),
          Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));

      topics.add(new NewTopic(
          envProps.getProperty("join.topic.name"),
          Integer.parseInt(envProps.getProperty("join.topic.partitions")),
          Short.parseShort(envProps.getProperty("join.topic.replication.factor"))));

      client.createTopics(topics);
    }
  }

  public Properties loadEnvProperties(String fileName) throws IOException {
    final Properties envProps = new Properties();
    final FileInputStream input = new FileInputStream(fileName);
    envProps.load(input);
    input.close();

    return envProps;
  }

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      throw new IllegalArgumentException(
          "This program takes one argument: the path to an environment configuration file.");
    }

    final NamingChangelogAndRepartitionTopics instance = new NamingChangelogAndRepartitionTopics();
    final Properties envProps = instance.loadEnvProperties(args[0]);
    if (args.length > 1 ) {
      final String namesAndFilter = args[1];

      if (namesAndFilter.contains("filter")) {
        envProps.put("add.filter", "true");
      }

      if (namesAndFilter.contains("names")) {
        envProps.put("add.names", "true");
      }
    }

    final CountDownLatch latch = new CountDownLatch(1);
    final Properties streamProps = instance.buildStreamsProperties(envProps);
    final Topology topology = instance.buildTopology(envProps);

    instance.createTopics(envProps);

    final KafkaStreams streams = new KafkaStreams(topology, streamProps);

    // Attach shutdown handler to catch Control-C.
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
      @Override
      public void run() {
        streams.close(Duration.ofSeconds(5));
        latch.countDown();
      }
    });

    try {
      streams.start();
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }
    System.exit(0);
  }

}

5
Compile and run the Kafka Streams program

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. In most tutorials, the Kafka Streams application runs until you shut it down. In this tutorial you’ll make changes to the topology which requires you to restart the streams application.


java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties

6
Produce sample data to the input topic

Open a new terminal and start the console-producer

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --broker-list broker:9092

Then copy-paste the following records to send. The data is formatted this way becuase the Kafka Streams application will create a key from the first character.

1foo
1bar
1baz

After you’ve sent the records, you can shut down the console-producer with a CTRL+C.

7
Consume data from the output topic

Stop the console-producer with a CTRL+C, and start the console-consumer. Take note you’re running the consumer with the --from-beginning option so you’ll get all messages sent to the output topic.

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-"

From your first run you should see the following output:

1-1
1-2
1-3

Note that even though this is the output of an aggregation operation, this tutorial configured the streams application to use StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG = 0, so you’ll see every update from the count() operation.

Since the streams application takes the first character to use as the key, the output of 1-1, 1-2, and 1-3 (key-count) is expected.

You can close this consumer for now with a CTRL+C.

8
Add an operation to the topology

As you may have guessed, adding or subtracting a processor in your topology will change the name of all processors downstream of your change.

With that in mind, let’s add a filter() processor to the inputStream:


  KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)))
                                                  .filter((k, v) -> k != 1L);

To make things a little easier, the code for the tutorial is configured to use feature flags. You’ll use the feature flags by passing different parameters in the command to start the Kafka Streams application.

Since you’ve added the filter() before the aggregation, the name of the state store, changelog topic, and repartition topic will change.

In the new names, the number suffix will go from 0000000002 to 0000000003 like so:

  • state store - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003

  • changelog topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003-changelog

  • repartition topic - <application.id>-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition

Here’s an image of the relevant part of the updated topology (view a full image of the topology)


(Note that changelog topics don’t show up in Topology.describe())

Now, in the terminal running the streams application, do a CTRL+C, then restart the app with the following command:

java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties filter-only

9
Produce some records to the updated topology

Go back to the producer/consumer terminal and start the console-producer again.

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --broker-list broker:9092

We’ll send the exact same data from before, as we want to update the counts for existing records. Copy and paste the following into the prompt and press enter:

1foo
1bar
1baz

After you’ve sent the records, you can shut down the console-producer with a CTRL+C.

10
Consume the updated records

Now, restart the console-consumer. Remember the consumer is running with the --from-beginning option so you’ll get all messages sent to the output-topic topic.

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-"

In this second run, you should see this output:

1-1
1-2
1-3
1-1
1-2
1-3

Even though the Kafka Streams application counts by key and you sent the same keys, the output is repeated. The application produced the first 3 records in the previous run. So why is the output 1-1, 1-2, 1-3 instead of 1-4, 1-5, 1-6? Adding the new operation incremented the counter used to generate the names of every processor, state store, and internal topic downstream of the new operator.

This renaming means the streams application count() processor now uses a new state store, vs. the one created when you first started the application. The situation is the same if you used an in-memory store as the name of the changelog topic. When the name changes, there is nothing to restore once streams builds the in-memory store.

Your original data is still there, but Kafka Streams isn’t using the previously created state store and changelog topic.

You can close this consumer for now with a CTRL+C.

11
Add names to the stateful operators of the topology

Now let’s start over. This time you’ll update the topology and provide names to all stateful operators.


 countStream = inputStream.groupByKey(Grouped.with("count", longSerde, stringSerde))
                                   .count(Materialized.as("the-counting-store"))
                                   .toStream();

 joinedStream = inputStream.join(countStream, (v1, v2) -> v1 + v2.toString(),
                                 JoinWindows.of(Duration.ofMillis(100)),
                                 StreamJoined.with(longSerde, stringSerde, longSerde)
                                             .withName("join").withStoreName("the-join-store"));

Here’s an image of the relevant part of the topology now with names (view a full image of the topology):


(Note that changelog topics don’t show up in Topology.describe())

Just like you’ve done throughout the tutorial, the changes are made by using feature flags which are enabled by parameters you pass to start the application.

In the terminal running the streams application, do a CTRL+C, then restart the streams application with this command:

java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties names-only
	

12
Produce records to the named topology

Back in your producer/consumer terminal, start the console-producer again.

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --broker-list broker:9092

Then copy-paste the following records to send. You’ll use data resulting in different keys this time.

2foo
2bar
2baz

After you’ve sent the records, you can shut down the console-producer with a CTRL+C.

13
Consume records from the named topology

Now let’s start up the console consumer again:

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-"

For this run you should see all six records plus three new records:

1-1
1-2
1-3
1-1
1-2
1-3
2-1
2-2
2-3

Since you used data resulting in new keys, 2-1, 2-2, 2-3 looks correct.

You can close this consumer for now with a CTRL+C.

14
Update the named topology

Now let’s add a new operator (filter()) to the named topology:


  KStream<Long, Example> inputStream = builder.stream(inputTopic, Consumed.with(longSerde, exampleSerde))
                                                  .selectKey((k, v) -> Long.parseLong(v.getName().substring(0, 1)))
                                                  .filter((k, v) -> k != 1L);

But this time, adding a new processor won’t change the name of the stateful parts of your application, as you’ve explicity named them in the previous step.

Here’s an image of the relevant part of the updated topology with stateful operators are named (view a full image of the topology):


(Note that changelog topics don’t show up in Topology.describe())

You’ll notice the other processor names have shifted, but since these are stateless, that’s ok and it won’t break topology compatibility.

In the terminal running the streams application, do a CTRL+C, then restart the streams application with this command:


java -jar build/libs/naming-changelog-repartition-topics-standalone-0.0.1.jar configuration/dev.properties filter-with-names

15
Produce records to the updated, named topology

One last time, from your producer/consumer terminal, start a console-producer

docker exec -i broker /usr/bin/kafka-console-producer --topic input-topic --broker-list broker:9092

Then copy-paste the following records to send. Again you’re sending updates with the same keys.

2foo
2bar
2baz

After you’ve sent the records, you can shut down the console-producer with a CTRL+C.

16
Consume latest updates

Finally, start a console-consumer.

docker exec -it broker /usr/bin/kafka-console-consumer --topic output-topic --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator="-"

You should see the following output:

1-1
1-2
1-3
1-1
1-2
1-3
2-1
2-2
2-3
2-4
2-5
2-6

The last three records, 2-4, 2-5, 2-6, show the correct output, as you produced six records with the same key.

You have updated your topology and reused the existing state stores and internal topics!



Some points to remember

  1. Always name stateful operators

  2. If you haven’t named your stateful operators and you need to update your topology, use the Application Reset Tool to reprocess records.

    • Aggregation repartition topics (if needed)

  3. Use the Grouped.as() method

  4. Use Grouped to provide repartition Serdes as well if required

  5. Kafka Streams appends the text -repartition to the provided name

  6. If no name is provided, the state store name is used with -repartition appended

    • Joins

  7. Use the StreamJoined configuration object

  8. StreamJoined.name() names the join processors and provides the base-name of the repartition topic (if needed)

  9. StreamJoined.withStoreName() is used to name the state stores associated with the join.

  10. If you need to provide Serdes you’ll use StreamJoined as well

    • State Stores

  11. Use Materialized.as() method

  12. Use Materialized for state store Serdes if needed

Deploy on Confluent Cloud

1
Run your app to Confluent Cloud

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

First, create your Kafka cluster in Confluent Cloud. Use the promo code C50INTEG to receive an additional $50 free usage (details).

Next, from the Confluent Cloud UI, click on Tools & client config to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.

Now you’re all set to your run application locally while your Kafka topics and stream processing is backed to your Confluent Cloud instance.