How to implement TTL-based cleanup to expire data in a KTable

Question:

How can you delete KTable data (in the topic and in the state store) based on TTL?

Edit this page

Example use case:

You have a KStreams application or ksqlDB application which uses KTables from a topic in Kafka. You want to purge older data if it is considered too old, or to manage the size of the topic and the state store. Although the Kafka Streams API does not natively include any notion of a TTL (Time To Live) for KTables, this tutorial shows you how to expire messages by making clever use of tombstones and writing them out to topics underlying the KTable, using a state store containing TTLs.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir schedule-ktable-ttl && cd schedule-ktable-ttl

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

And launch it by running:

docker compose up -d

Configure the project

4

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"

    id "idea"
    id "eclipse"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:2.0.7"

    implementation "org.apache.kafka:kafka-streams:3.1.0"
    implementation "io.confluent:common-utils:7.3.3"
    implementation "org.apache.kafka:kafka-clients:3.1.0"
    testImplementation "org.apache.kafka:kafka-streams-test-utils:3.1.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaStreamsKTableTTLExample"
    )
  }
}

shadowJar {
    archiveBaseName = "schedule-ktable-ttl-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=schedule-ktable-ttl
bootstrap.servers=localhost:29092

input.topic.name=inputTopicForStream
input.topic.partitions=1
input.topic.replication.factor=1

table.topic.name=inputTopicForTable
table.topic.partitions=1
table.topic.replication.factor=1
table.topic.ttl.store.name=table-purge-store
table.topic.ttl.minutes=1
table.topic.ttl.scan.seconds=5

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1


Create the Kafka Streams topology

5

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create the Kafka Streams application file let’s go over the key points of the application. In this tutorial we want to show how a KTable loaded from an input topic can have its data periodically purged via use of a transformer. The example shows a fixed TTL per key based on the last update for that key. This may or may not serve all needs but it is sufficient to illustrate the mechanism via which we can purge data from a KTable. The transformer use a state store to store the last updated time seen for a key and periodically (via a punctuator) scans its list of keys to see if any of them have exceeded a configured cutoff period (TTL). If they have, then a tombstone is forwarded onwards in the pipeline and the key removed from its own internal store.

For a refresh on scheduling logic using a punctuator, have a look at the Scheduling Operations tutorial.

Now let’s take a look at some of the key points from the application.

For context your application has a simple KStream-KTable join where the output of the join is a trivial concatenation of the left side and the right side if the the associated key exists in the KTable. The goal is to purge data in the KTable for which updates have not arrived within a TTL of 1 minute.

The following detailed sections are already included in the application file, we’re just taking a detailed step through the code before you create it.

Let’s look at the TTL Emitter transformer that will schedule emitting tombstones past a certain time:

TTLEmitter transformer punctuator
  public TTLEmitter(final Duration maxAge, final Duration scanFrequency,
      final String stateStoreName) { (1)
    this.maxAge = maxAge;
    this.scanFrequency = scanFrequency;
    this.purgeStoreName = stateStoreName;
  }

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
    context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> { (2)
      final long cutoff = timestamp - maxAge.toMillis();

     try (final KeyValueIterator<K, Long> all = stateStore.all()) {
        while (all.hasNext()) {
          final KeyValue<K, Long> record = all.next();
          if (record.value != null && record.value < cutoff) {
            System.out.println("Forwarding Null");
            context.forward(record.key, null); (3)
          }
        }
      }
    });
  }

  @Override
  public R transform(K key, V value) { (4)

    if (value == null) { (5)
      System.out.println("CLEANING key="+key);
      stateStore.delete(key);
    } else {
      System.out.println("UPDATING key="+key);
      stateStore.put(key, context.timestamp());
    }
    return null;
  }
1 Initialize the transformer with maximum age, scan frequency, and state store name
2 Schedule the operation to (according to stream time) to scan all records and pick out which one exceeded TTL. We could change this to wallclock time based but it means in some cases there could just be data deleted without any activity in the KTable topic due to a failure. If the use case understands the implication of using wallclock time, they can use that.
3 Forward the tombstone for keys that have not been updated within the maximum age
4 We still need to create a transform() method to handle incoming changes to the KTable
5 Handle tombstones coming from upstream or update the timestamp in the local purge state store

Next we setup simple KStream-KTable join to have a KTable that we will attach a TTLEmitter to.

Initializing a simple KStream-KTable join in the Kafka Streams application
// Read the input data.
    final KStream<String, String> stream =
        builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
    final KTable<String, String> table = builder.table(inputTopicForTable,
        Consumed.with(Serdes.String(), Serdes.String()));


    // Perform the custom join operation.
    final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
      System.out.println("JOINING left="+left+" right="+right);
      if (right != null)
        return left+" "+right; // this is, of course, a completely fake join logic
      return left;
    });
    // Write the join results back to Kafka.
    joined.to(outputTopic,
        Produced.with(Serdes.String(), Serdes.String()));

Next we attach a transformer to the original table in order to do the work of emitting tombstones as appropriate:

Attaching a transformer to the KTable and writing back to the KTable’s input topic
    // tap the table topic in order to insert a tombstone after MAX_AGE based on event time
    //builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()))
    table.toStream()  //we just have to do this part for doing in the same topology but in another app, you can do as above
        .transform(() -> new TTLEmitter<String, String, KeyValue<String, String>>(MAX_AGE, (1)
            SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
        .to(inputTopicForTable, Produced.with(Serdes.String(), Serdes.String())); // write the
                                                                                // tombstones back
                                                                                // out to the input
                                                                                // topic
1 Turn the table into a stream and call transform on it with the TTL Emitter

Create the TTLEmitter class by copying the following file to src/main/java/io/confluent/developer/TTLEmitter.java:

package io.confluent.developer;

import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

/**
 * A simple transformer maintaining a purge store of keys and the
 * last update time and if a TTL has been exceeded, emits tombstones
 * for those keys
 *
 * @author sarwar
 *
 * @param <K>
 * @param <V>
 * @param <R>
 */
public class TTLEmitter<K, V, R> implements Transformer<K, V, R> {

  private final Duration maxAge;
  private final Duration scanFrequency;
  private final String purgeStoreName;
  private ProcessorContext context;
  private KeyValueStore<K, Long> stateStore;


  public TTLEmitter(final Duration maxAge, final Duration scanFrequency,
      final String stateStoreName) {
    this.maxAge = maxAge;
    this.scanFrequency = scanFrequency;
    this.purgeStoreName = stateStoreName;
  }

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
    // This is where the magic happens. This causes Streams to invoke the Punctuator
    // on an interval, using stream time. That is, time is only advanced by the record
    // timestamps
    // that Streams observes. This has several advantages over wall-clock time for this
    // application:
    //
    // It'll produce the exact same sequence of updates given the same sequence of data.
    // This seems nice, since the purpose is to modify the data stream itself, you want to
    // have a clear understanding of when stuff is going to get deleted. For example, if something
    // breaks down upstream for this topic, and it stops getting new data for a while, wall
    // clock time would just keep deleting data on schedule, whereas stream time will wait for
    // new updates to come in.
    //
    // You can change to wall clock time here if that is what is needed
    context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> {
      final long cutoff = timestamp - maxAge.toMillis();

      // scan over all the keys in this partition's store
      // this can be optimized, but just keeping it simple.
      // this might take a while, so the Streams timeouts should take this into account
      try (final KeyValueIterator<K, Long> all = stateStore.all()) {
        while (all.hasNext()) {
          final KeyValue<K, Long> record = all.next();
          if (record.value != null && record.value < cutoff) {
            System.out.println("Forwarding Null");
            // if a record's last update was older than our cutoff, emit a tombstone.
            context.forward(record.key, null);
          }
        }
      }
    });
  }

  @Override
  public R transform(K key, V value) {

    // this gets invoked for each new record we consume. If it's a tombstone, delete
    // it from our state store. Otherwise, store the record timestamp.
    if (value == null) {
      System.out.println("CLEANING key="+key);
      stateStore.delete(key);
    } else {
      System.out.println("UPDATING key="+key);
      stateStore.put(key, context.timestamp());
    }
    return null; // no need to return anything here. the punctuator will emit the tombstones
                 // when necessary
  }

  @Override
  public void close() {


  }

}

Create the KafkaStreamsKTableTTLExample class by copying the following file to src/main/java/io/confluent/developer/KafkaStreamsKTableTTLExample.java:

package io.confluent.developer;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import io.confluent.common.utils.TestUtils;

public class KafkaStreamsKTableTTLExample {

  /**
   * This is the main topology showing a very simple kstream-ktable join
   * The KTable here is based on an input topic and not created in the middle
   * of a topology from an aggregation
   *
   * @param envProp
   * @return
   */
  public Topology buildTopology(Properties envProp) {
    final StreamsBuilder builder = new StreamsBuilder();

    String inputTopicForStream = envProp.getProperty("input.topic.name");
    String inputTopicForTable = envProp.getProperty("table.topic.name");
    String outputTopic = envProp.getProperty("output.topic.name");

    // Read the input data.
    final KStream<String, String> stream =
        builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
    final KTable<String, String> table = builder.table(inputTopicForTable,
        Consumed.with(Serdes.String(), Serdes.String()));


    // Perform the custom join operation.
    final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
      System.out.println("JOINING left="+left+" right="+right);
      if (right != null)
        return left+" "+right; // this is, of course, a completely fake join logic
      return left;
    });
    // Write the join results back to Kafka.
    joined.to(outputTopic,
        Produced.with(Serdes.String(), Serdes.String()));




    // TTL part of the topology
    // This could be in a separate application
    // Setting tombstones for records seen past a TTL of MAX_AGE
    final Duration MAX_AGE = Duration.ofMinutes(Integer.parseInt(envProp.getProperty("table.topic.ttl.minutes")));
    final Duration SCAN_FREQUENCY = Duration.ofSeconds(Integer.parseInt(envProp.getProperty("table.topic.ttl.scan.seconds")));
    final String STATE_STORE_NAME = envProp.getProperty("table.topic.ttl.store.name");



    // adding a custom state store for the TTL transformer which has a key of type string, and a
    // value of type long
    // which represents the timestamp
    final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(STATE_STORE_NAME),
        Serdes.String(),
        Serdes.Long()
    );



    builder.addStateStore(storeBuilder);

    // tap the table topic in order to insert a tombstone after MAX_AGE based on event time
    //builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()))
    table.toStream()  //we just have to do this part for doing in the same topology but in another app, you can do as above
        .transform(() -> new TTLEmitter<String, String, KeyValue<String, String>>(MAX_AGE,
            SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
        .to(inputTopicForTable, Produced.with(Serdes.String(), Serdes.String())); // write the
                                                                                // tombstones back
                                                                                // out to the input
                                                                                // topic



    System.out.println(builder.toString());
    return builder.build();
  }




  public Properties getStreamProps(Properties envProp) {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, envProp.get("application.id"));
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProp.get("bootstrap.servers"));
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // Use a temporary directory for storing state, which will be automatically removed after the
    // test.
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
        TestUtils.tempDirectory().getAbsolutePath());
    //streamsConfiguration.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 20000);


    // These two settings are only required in this contrived example so that the
    // streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
    // streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
    return streamsConfiguration;
  }

  public void createTopics(final Properties envProps) {
    final Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
    try (final AdminClient client = AdminClient.create(config)) {

      final List<NewTopic> topics = new ArrayList<>();

      topics.add(new NewTopic(envProps.getProperty("input.topic.name"),
          Integer.parseInt(envProps.getProperty("input.topic.partitions")),
          Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

      topics.add(new NewTopic(envProps.getProperty("table.topic.name"),
          Integer.parseInt(envProps.getProperty("table.topic.partitions")),
          Short.parseShort(envProps.getProperty("table.topic.replication.factor"))));

      topics.add(new NewTopic(envProps.getProperty("output.topic.name"),
          Integer.parseInt(envProps.getProperty("output.topic.partitions")),
          Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));



      client.createTopics(topics);
    }
  }

  public Properties loadEnvProperties(String fileName) throws IOException {
    final Properties envProps = new Properties();
    final FileInputStream input = new FileInputStream(fileName);
    envProps.load(input);
    input.close();



    // These two settings are only required in this contrived example so that the
    // streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
    // streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
    return envProps;
  }

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      throw new IllegalArgumentException(
          "This program takes one argument: the path to an environment configuration file.");
    }

    final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
    final Properties envProps = instance.loadEnvProperties(args[0]);

    // Setup the input topic, table topic, and output topic
    instance.createTopics(envProps);

    // Normally these can be run in separate applications but for the purposes of the demo, we
    // just run both streams instances in the same application

    try (final KafkaStreams streams = new KafkaStreams(instance.buildTopology(envProps), instance.getStreamProps(envProps))) {
     final CountDownLatch startLatch = new CountDownLatch(1);
     // Attach shutdown handler to catch Control-C.
     Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
         @Override
         public void run() {
             //streams.cleanUp();
             streams.close(Duration.ofSeconds(5));
             startLatch.countDown();
         }
     });
     // Start the topology.
     streams.start();

     try {
       startLatch.await();
     } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       System.exit(1);
     }
    }
    System.exit(0);
  }
}

Compile and run the Kafka Streams program

6

Now that we have data generation working, let’s build your application by running:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/schedule-ktable-ttl-standalone-0.0.1.jar configuration/dev.properties

Enter some data into KTable

7

Let us produce some records into the KTable before doing a join.

Open a terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForTable --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

The producer will start and wait for you to enter input. Each line represents one record and to send it you’ll hit the enter key. If you type multiple words and then hit enter, the entire line is considered one record.

Try typing one line at a time, hit enter and go back to the console consumer window and look for the output. Or, you can select all the records and send at one time.

key1:what a lovely
foo:bar
fun:programming

Enter some data into KStream

8

Let us produce some records into the KStream.

Open another terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForStream --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Type or paste the data into the KStream:

key1:Bunch of coconuts
foo:baz
fun:post quarantine

Consume data from the output topic

9

Now that your Kafka Streams application is running, start a console-consumer to confirm the output:

docker exec -it broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --property print.key=true \
 --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
 --property key.separator=" : "  \
 --from-beginning \
 --max-messages 3

Your results should look someting like this:


key1 : Bunch of coconuts what a lovely
foo : baz bar
fun : post quarantine not quarantine

The timestamp after the user-id is there to help see the time when Kafka Streams executed the punctuation. In practice you most likely wouldn’t append a timestamp to your key.

Wait 65 seconds and enter some data into KTable

10

Let us wait 60 seconds and produce some more records into the KTable. This has the effect of moving the time forward so that the TTL purging kicks in.

Open a terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForTable --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Enter the following data:

key2: some new data

Now enter some KStream data with the old keys

11

Let us produce some records into the KStream using the old keys which should be purged on the KTable side.

Open another terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForStream --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Type or paste the data into the KStream:

key1:Bunch of coconuts
foo:baz
fun:post quarantine

Consume data from the output topic 2nd time

12

Start a console-consumer to confirm the output (no join happened in the final 3 lines so only the KStream side data should appear in those lines):

docker exec -it broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --property print.key=true \
 --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
 --property key.separator=" : "  \
 --from-beginning \
 --max-messages 6

Your results should look someting like this:


key1 : Bunch of coconuts what a lovely
foo : baz bar
fun : post quarantine not quarantine
key1 : Bunch of coconuts
foo : baz
fun : post quarantine

Test it

Create a test configuration file

1

First, create a test file at configuration/test.properties:

application.id=schedule-ktable-ttl
bootstrap.servers=localhost:29092

input.topic.name=inputTopicForStream
input.topic.partitions=1
input.topic.replication.factor=1

table.topic.name=inputTopicForTable
table.topic.partitions=1
table.topic.replication.factor=1
table.topic.ttl.store.name=test-table-purge-store
table.topic.ttl.minutes=1
table.topic.ttl.scan.seconds=5

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

Write a test

2

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

There is only one method in KafkaStreamsKTableTTLExampleTest annotated with @Test, and that is shouldTriggerStreamTableJoinFromTable(). This method actually runs our Streams topology using the TopologyTestDriver and some mocked data that is set up inside the test method.

This test is fairly vanilla, but there is one section we should look into a little more

      tableTopic.pipeInput("alice", "a", 5);  (1)
      tableTopic.pipeInput("bobby", "b", 10);


      inputTopic.pipeInput("alice", "11", 10); (2)
      inputTopic.pipeInput("bobby", "21", 15);
      inputTopic.pipeInput("alice", "12", 30);


      tableTopic.pipeInput("freddy", "f", 65000);  (3)
      // punctuate gets called now

      inputTopic.pipeInput("alice", "13", 70006);  (4)
      inputTopic.pipeInput("bobby", "22", 70016);


      final List<KeyValue<String, String>> actualResults = outputTopic.readKeyValuesToList();

      assertThat(actualResults.size(), is(5));  (5)

      assertThat(actualResults.get(0).key, is("alice"));
      assertThat(actualResults.get(0).value, is("11 a"));

      assertThat(actualResults.get(1).key, is("bobby"));
      assertThat(actualResults.get(1).value, is("21 b"));

      assertThat(actualResults.get(2).key, is("alice"));
      assertThat(actualResults.get(2).value, is("12 a"));

      assertThat(actualResults.get(3).key, is("alice"));
      assertThat(actualResults.get(3).value, is("13")); // join didn't match on right side


      assertThat(actualResults.get(4).key, is("bobby"));
      assertThat(actualResults.get(4).value, is("22"));
1 Piping some data into the KTable initially
2 Pipe some data into the KStream to join
3 Move the time forward by publishing something into the KTable with a timestamp greater than the TTL
4 Pipe some more KStream data to fail to find a match
5 Check each join to see where it matched and where it didn’t match (due to the TTLed KTable records)

Now create the following file at src/test/java/io/confluent/developer/KafkaStreamsKTableTTLExampleTest.java.

package io.confluent.developer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.Test;

public class KafkaStreamsKTableTTLExampleTest {

  private final static String TEST_CONFIG_FILE = "configuration/test.properties";

  @Test
  public void shouldTriggerStreamTableJoinFromTable() throws Exception {

    final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
    final Properties envProps = instance.loadEnvProperties(TEST_CONFIG_FILE);

    final Properties streamProps = instance.getStreamProps(envProps);
    final String inputTopicName = envProps.getProperty("input.topic.name");
    final String outputTopicName = envProps.getProperty("output.topic.name");
    final String tableTopicName = envProps.getProperty("table.topic.name");

    final Topology topology = instance.buildTopology(envProps);
    System.out.println(topology);
    try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProps)) {


      final Serializer<String> keySerializer = Serdes.String().serializer();
      final Serializer<String> valueSerializer = Serdes.String().serializer();

      final Deserializer<String> keyDeserializer = Serdes.String().deserializer();
      final Deserializer<String> valueDeserializer = Serdes.String().deserializer();


      final TestInputTopic<String, String>  inputTopic = testDriver.createInputTopic(inputTopicName,
                                                                                        keySerializer,
                                                                                        valueSerializer);
      final TestInputTopic<String, String>  tableTopic = testDriver.createInputTopic(tableTopicName,
          keySerializer,
          valueSerializer);

      final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic(outputTopicName, keyDeserializer, valueDeserializer);
      final TestOutputTopic<String, String> outputTableTopic = testDriver.createOutputTopic(tableTopicName, keyDeserializer, valueDeserializer);


      tableTopic.pipeInput("alice", "a", 5);
      tableTopic.pipeInput("bobby", "b", 10);


      inputTopic.pipeInput("alice", "11", 10);
      inputTopic.pipeInput("bobby", "21", 15);
      inputTopic.pipeInput("alice", "12", 30);


      tableTopic.pipeInput("freddy", "f", 65000);  // punctuate gets called now

      inputTopic.pipeInput("alice", "13", 70006);
      inputTopic.pipeInput("bobby", "22", 70016);


      final List<KeyValue<String, String>> actualResults = outputTopic.readKeyValuesToList();

      assertThat(actualResults.size(), is(5));

      assertThat(actualResults.get(0).key, is("alice"));
      assertThat(actualResults.get(0).value, is("11 a"));

      assertThat(actualResults.get(1).key, is("bobby"));
      assertThat(actualResults.get(1).value, is("21 b"));

      assertThat(actualResults.get(2).key, is("alice"));
      assertThat(actualResults.get(2).value, is("12 a"));

      assertThat(actualResults.get(3).key, is("alice"));
      assertThat(actualResults.get(3).value, is("13")); // join didn't match on right side


      assertThat(actualResults.get(4).key, is("bobby"));
      assertThat(actualResults.get(4).value, is("22"));


  }

  }

}

Invoke the tests

3

Now run the test, which is as simple as:

./gradlew test

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.