How to implement TTL-based cleanup to expire data in a KTable

Question:

Do you have a requirement to delete KTable data (in the topic and in the state store) based on TTL?

Edit this page

Example use case:

You have a KStreams application or ksqlDB application which uses KTables from a topic in Kafka. You may want to purge older data if it is considered to be too old or to manage the size of the topic and the state store. Although the Kafka Streams API does not natively include any notion of a TTL (Time To Live) for KTables, this tutorial shows you how to expire message by making clever use of tombstones and writing them out to topics underlying the KTable, using a state store containing TTLs

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir schedule-ktable-ttl && cd schedule-ktable-ttl

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.1.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.1.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

And launch it by running:

docker-compose up -d

3
Configure the project

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:6.0.0"
    }
}

plugins {
    id "java"
    id "com.google.cloud.tools.jib" version "3.1.1"
    id "idea"
    id "eclipse"
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"

repositories {
    mavenCentral()
    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:1.7.30"
    implementation "org.apache.kafka:kafka-streams:2.7.0"
    implementation "io.confluent:common-utils:6.2.0"
    implementation "org.apache.kafka:kafka-clients:2.7.0"
    testImplementation "org.apache.kafka:kafka-streams-test-utils:2.7.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaStreamsKTableTTLExample"
    )
  }
}

shadowJar {
    archiveBaseName = "schedule-ktable-ttl-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=schedule-ktable-ttl
bootstrap.servers=localhost:29092

input.topic.name=inputTopicForStream
input.topic.partitions=1
input.topic.replication.factor=1

table.topic.name=inputTopicForTable
table.topic.partitions=1
table.topic.replication.factor=1
table.topic.ttl.store.name=table-purge-store
table.topic.ttl.minutes=1
table.topic.ttl.scan.seconds=5

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1


4
Create the Kafka Streams topology

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create the Kafka Streams application file let’s go over the key points of the application. In this tutorial we want to show how a KTable loaded from an input topic can have its data periodically purged via use of a transformer. The example shows a fixed TTL per key based on the last update for that key. This may or may not serve all needs but it is sufficient to illustrate the mechanism via which we can purge data from a KTable. The transformer use a state store to store the last updated time seen for a key and periodically (via a punctuator) scans its list of keys to see if any of them have exceeded a configured cutoff period (TTL). If they have, then a tombstone is forwarded onwards in the pipeline and the key removed from its own internal store.

For a refresh on scheduling logic using a punctuator, have a look at the Scheduling Operations tutorial.

Now let’s take a look at some of the key points from the application.

For context your application has a simple KStream-KTable join where the output of the join is a trivial concatenation of the left side and the right side if the the associated key exists in the KTable. The goal is to purge data in the KTable for which updates have not arrived within a TTL of 1 minute.

The following detailed sections are already included in the application file, we’re just taking a detailed step through the code before you create it.

Let’s look at the TTL Emitter transformer that will schedule emitting tombstones past a certain time:

TTLEmitter transformer punctuator
  public TTLEmitter(final Duration maxAge, final Duration scanFrequency,
      final String stateStoreName) { (1)
    this.maxAge = maxAge;
    this.scanFrequency = scanFrequency;
    this.purgeStoreName = stateStoreName;
  }

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
    context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> { (2)
      final long cutoff = timestamp - maxAge.toMillis();

     try (final KeyValueIterator<K, Long> all = stateStore.all()) {
        while (all.hasNext()) {
          final KeyValue<K, Long> record = all.next();
          if (record.value != null && record.value < cutoff) {
            System.out.println("Forwarding Null");
            context.forward(record.key, null); (3)
          }
        }
      }
    });
  }

  @Override
  public R transform(K key, V value) { (4)

    if (value == null) { (5)
      System.out.println("CLEANING key="+key);
      stateStore.delete(key);
    } else {
      System.out.println("UPDATING key="+key);
      stateStore.put(key, context.timestamp());
    }
    return null;
  }
1 Initialize the transformer with maximum age, scan frequency, and state store name
2 Schedule the operation to (according to stream time) to scan all records and pick out which one exceeded TTL. We could change this to wallclock time based but it means in some cases there could just be data deleted without any activity in the KTable topic due to a failure. If the use case understands the implication of using wallclock time, they can use that.
3 Forward the tombstone for keys that have not been updated within the maximum age
4 We still need to create a transform() method to handle incoming changes to the KTable
5 Handle tombstones coming from upstream or update the timestamp in the local purge state store

Next we setup simple KStream-KTable join to have a KTable that we will attach a TTLEmitter to.

Initializing a simple KStream-KTable join in the Kafka Streams application
// Read the input data.
    final KStream<String, String> stream =
        builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
    final KTable<String, String> table = builder.table(inputTopicForTable,
        Consumed.with(Serdes.String(), Serdes.String()));


    // Perform the custom join operation.
    final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
      System.out.println("JOINING left="+left+" right="+right);
      if (right != null)
        return left+" "+right; // this is, of course, a completely fake join logic
      return left;
    });
    // Write the join results back to Kafka.
    joined.to(outputTopic,
        Produced.with(Serdes.String(), Serdes.String()));

Next we attach a transformer to the original table in order to do the work of emitting tombstones as appropriate:

Attaching a transformer to the KTable and writing back to the KTable’s input topic
    // tap the table topic in order to insert a tombstone after MAX_AGE based on event time
    //builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()))
    table.toStream()  //we just have to do this part for doing in the same topology but in another app, you can do as above
        .transform(() -> new TTLEmitter<String, String, KeyValue<String, String>>(MAX_AGE, (3)
            SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
        .to(inputTopicForTable, Produced.with(Serdes.String(), Serdes.String())); // write the
                                                                                // tombstones back
                                                                                // out to the input
                                                                                // topic
1 Loading some variables from the properties file
2 Initialization of the purge state store builder
3 Tell the topology builder about state store builder
4 Turn the table into a stream and call transform on it with the TTL Emitter

Create the TTLEmitter class by copying the following file to src/main/java/io/confluent/developer/TTLEmitter.java:

package io.confluent.developer;

import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;

/**
 * A simple transformer maintaining a purge store of keys and the
 * last update time and if a TTL has been exceeded, emits tombstones
 * for those keys
 *
 * @author sarwar
 *
 * @param <K>
 * @param <V>
 * @param <R>
 */
public class TTLEmitter<K, V, R> implements Transformer<K, V, R> {

  private final Duration maxAge;
  private final Duration scanFrequency;
  private final String purgeStoreName;
  private ProcessorContext context;
  private KeyValueStore<K, Long> stateStore;


  public TTLEmitter(final Duration maxAge, final Duration scanFrequency,
      final String stateStoreName) {
    this.maxAge = maxAge;
    this.scanFrequency = scanFrequency;
    this.purgeStoreName = stateStoreName;
  }

  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.stateStore = (KeyValueStore<K, Long>) context.getStateStore(purgeStoreName);
    // This is where the magic happens. This causes Streams to invoke the Punctuator
    // on an interval, using stream time. That is, time is only advanced by the record
    // timestamps
    // that Streams observes. This has several advantages over wall-clock time for this
    // application:
    //
    // It'll produce the exact same sequence of updates given the same sequence of data.
    // This seems nice, since the purpose is to modify the data stream itself, you want to
    // have a clear understanding of when stuff is going to get deleted. For example, if something
    // breaks down upstream for this topic, and it stops getting new data for a while, wall
    // clock time would just keep deleting data on schedule, whereas stream time will wait for
    // new updates to come in.
    //
    // You can change to wall clock time here if that is what is needed
    context.schedule(scanFrequency, PunctuationType.STREAM_TIME, timestamp -> {
      final long cutoff = timestamp - maxAge.toMillis();

      // scan over all the keys in this partition's store
      // this can be optimized, but just keeping it simple.
      // this might take a while, so the Streams timeouts should take this into account
      try (final KeyValueIterator<K, Long> all = stateStore.all()) {
        while (all.hasNext()) {
          final KeyValue<K, Long> record = all.next();
          if (record.value != null && record.value < cutoff) {
            System.out.println("Forwarding Null");
            // if a record's last update was older than our cutoff, emit a tombstone.
            context.forward(record.key, null);
          }
        }
      }
    });
  }

  @Override
  public R transform(K key, V value) {

    // this gets invoked for each new record we consume. If it's a tombstone, delete
    // it from our state store. Otherwise, store the record timestamp.
    if (value == null) {
      System.out.println("CLEANING key="+key);
      stateStore.delete(key);
    } else {
      System.out.println("UPDATING key="+key);
      stateStore.put(key, context.timestamp());
    }
    return null; // no need to return anything here. the punctuator will emit the tombstones
                 // when necessary
  }

  @Override
  public void close() {


  }

}

Create the KafkaStreamsKTableTTLExample class by copying the following file to src/main/java/io/confluent/developer/KafkaStreamsKTableTTLExample.java:

package io.confluent.developer;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import io.confluent.common.utils.TestUtils;

public class KafkaStreamsKTableTTLExample {

  /**
   * This is the main topology showing a very simple kstream-ktable join
   * The KTable here is based on an input topic and not created in the middle
   * of a topology from an aggregation
   *
   * @param envProp
   * @return
   */
  public Topology buildTopology(Properties envProp) {
    final StreamsBuilder builder = new StreamsBuilder();

    String inputTopicForStream = envProp.getProperty("input.topic.name");
    String inputTopicForTable = envProp.getProperty("table.topic.name");
    String outputTopic = envProp.getProperty("output.topic.name");

    // Read the input data.
    final KStream<String, String> stream =
        builder.stream(inputTopicForStream, Consumed.with(Serdes.String(), Serdes.String()));
    final KTable<String, String> table = builder.table(inputTopicForTable,
        Consumed.with(Serdes.String(), Serdes.String()));


    // Perform the custom join operation.
    final KStream<String, String> joined = stream.leftJoin(table, (left, right) -> {
      System.out.println("JOINING left="+left+" right="+right);
      if (right != null)
        return left+" "+right; // this is, of course, a completely fake join logic
      return left;
    });
    // Write the join results back to Kafka.
    joined.to(outputTopic,
        Produced.with(Serdes.String(), Serdes.String()));




    // TTL part of the topology
    // This could be in a separate application
    // Setting tombstones for records seen past a TTL of MAX_AGE
    final Duration MAX_AGE = Duration.ofMinutes(Integer.parseInt(envProp.getProperty("table.topic.ttl.minutes")));
    final Duration SCAN_FREQUENCY = Duration.ofSeconds(Integer.parseInt(envProp.getProperty("table.topic.ttl.scan.seconds")));
    final String STATE_STORE_NAME = envProp.getProperty("table.topic.ttl.store.name");



    // adding a custom state store for the TTL transformer which has a key of type string, and a
    // value of type long
    // which represents the timestamp
    final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(STATE_STORE_NAME),
        Serdes.String(),
        Serdes.Long()
    );



    builder.addStateStore(storeBuilder);

    // tap the table topic in order to insert a tombstone after MAX_AGE based on event time
    //builder.stream(inputTopicForTable, Consumed.with(Serdes.String(), Serdes.String()))
    table.toStream()  //we just have to do this part for doing in the same topology but in another app, you can do as above
        .transform(() -> new TTLEmitter<String, String, KeyValue<String, String>>(MAX_AGE,
            SCAN_FREQUENCY, STATE_STORE_NAME), STATE_STORE_NAME)
        .to(inputTopicForTable, Produced.with(Serdes.String(), Serdes.String())); // write the
                                                                                // tombstones back
                                                                                // out to the input
                                                                                // topic



    System.out.println(builder.toString());
    return builder.build();
  }




  public Properties getStreamProps(Properties envProp) {
    final Properties streamsConfiguration = new Properties();
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, envProp.get("application.id"));
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProp.get("bootstrap.servers"));
    streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    // Use a temporary directory for storing state, which will be automatically removed after the
    // test.
    streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
        TestUtils.tempDirectory().getAbsolutePath());
    //streamsConfiguration.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 20000);


    // These two settings are only required in this contrived example so that the
    // streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
    // streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    return streamsConfiguration;
  }

  public void createTopics(final Properties envProps) {
    final Map<String, Object> config = new HashMap<>();
    config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
    try (final AdminClient client = AdminClient.create(config)) {

      final List<NewTopic> topics = new ArrayList<>();

      topics.add(new NewTopic(envProps.getProperty("input.topic.name"),
          Integer.parseInt(envProps.getProperty("input.topic.partitions")),
          Short.parseShort(envProps.getProperty("input.topic.replication.factor"))));

      topics.add(new NewTopic(envProps.getProperty("table.topic.name"),
          Integer.parseInt(envProps.getProperty("table.topic.partitions")),
          Short.parseShort(envProps.getProperty("table.topic.replication.factor"))));

      topics.add(new NewTopic(envProps.getProperty("output.topic.name"),
          Integer.parseInt(envProps.getProperty("output.topic.partitions")),
          Short.parseShort(envProps.getProperty("output.topic.replication.factor"))));



      client.createTopics(topics);
    }
  }

  public Properties loadEnvProperties(String fileName) throws IOException {
    final Properties envProps = new Properties();
    final FileInputStream input = new FileInputStream(fileName);
    envProps.load(input);
    input.close();



    // These two settings are only required in this contrived example so that the
    // streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
    // streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
    return envProps;
  }

  public static void main(String[] args) throws Exception {

    if (args.length < 1) {
      throw new IllegalArgumentException(
          "This program takes one argument: the path to an environment configuration file.");
    }

    final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
    final Properties envProps = instance.loadEnvProperties(args[0]);

    // Setup the input topic, table topic, and output topic
    instance.createTopics(envProps);

    // Normally these can be run in separate applications but for the purposes of the demo, we
    // just run both streams instances in the same application

    try (final KafkaStreams streams = new KafkaStreams(instance.buildTopology(envProps), instance.getStreamProps(envProps))) {
     final CountDownLatch startLatch = new CountDownLatch(1);
     // Attach shutdown handler to catch Control-C.
     Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
         @Override
         public void run() {
             //streams.cleanUp();
             streams.close(Duration.ofSeconds(5));
             startLatch.countDown();
         }
     });
     // Start the topology.
     streams.start();

     try {
       startLatch.await();
     } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
       System.exit(1);
     }
    }
    System.exit(0);
  }
}

5
Compile and run the Kafka Streams program

Now that we have data generation working, let’s build your application by running:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/schedule-ktable-ttl-standalone-0.0.1.jar configuration/dev.properties

6
Enter some data into KTable

Let us produce some records into the KTable before doing a join.

Open a terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForTable --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

The producer will start and wait for you to enter input. Each line represents one record and to send it you’ll hit the enter key. If you type multiple words and then hit enter, the entire line is considered one record.

Try typing one line at a time, hit enter and go back to the console consumer window and look for the output. Or, you can select all the records and send at one time.

key1:what a lovely
foo:bar
fun:not quarantine

7
Enter some data into KStream

Let us produce some records into the KStream.

Open another terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForStream --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Type or paste the data into the KStream:

key1:Bunch of coconuts
foo:baz
fun:post quarantine

8
Consume data from the output topic

Now that your Kafka Streams application is running, start a console-consumer to confirm the output:

docker exec -it broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --property print.key=true \
 --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
 --property key.separator=" : "  \
 --from-beginning \
 --max-messages 3

Your results should look someting like this:


key1 : Bunch of coconuts what a lovely
foo : baz bar
fun : post quarantine not quarantine

The timestamp after the user-id is there to help see the time when Kafka Streams executed the punctuation. In practice you most likely wouldn’t append a timestamp to your key.

9
Wait 65 seconds and enter some data into KTable

Let us wait 60 seconds and produce some more records into the KTable. This has the effect of moving the time forward so that the TTL purging kicks in.

Open a terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForTable --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Enter the following data:

key2: some new data

10
Now enter some KStream data with the old keys

Let us produce some records into the KStream using the old keys which should be purged on the KTable side.

Open another terminal window and run the following command to open shell on the broker container:

docker exec -i broker kafka-console-producer --topic inputTopicForStream --bootstrap-server broker:9092 \
  --property parse.key=true \
  --property key.separator=":"

Type or paste the data into the KStream:

key1:Bunch of coconuts
foo:baz
fun:post quarantine

11
Consume data from the output topic 2nd time

Start a console-consumer to confirm the output (no join happened in the final 3 lines so only the KStream side data should appear in those lines):

docker exec -it broker kafka-console-consumer \
 --bootstrap-server broker:9092 \
 --topic output-topic \
 --property print.key=true \
 --value-deserializer "org.apache.kafka.common.serialization.StringDeserializer" \
 --property key.separator=" : "  \
 --from-beginning \
 --max-messages 6

Your results should look someting like this:


key1 : Bunch of coconuts what a lovely
foo : baz bar
fun : post quarantine not quarantine
key1 : Bunch of coconuts
foo : baz
fun : post quarantine

Test it

1
Create a test configuration file

First, create a test file at configuration/test.properties:

application.id=schedule-ktable-ttl
bootstrap.servers=localhost:29092

input.topic.name=inputTopicForStream
input.topic.partitions=1
input.topic.replication.factor=1

table.topic.name=inputTopicForTable
table.topic.partitions=1
table.topic.replication.factor=1
table.topic.ttl.store.name=test-table-purge-store
table.topic.ttl.minutes=1
table.topic.ttl.scan.seconds=5

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

2
Write a test

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

There is only one method in KafkaStreamsKTableTTLExampleTest annotated with @Test, and that is shouldTriggerStreamTableJoinFromTable(). This method actually runs our Streams topology using the TopologyTestDriver and some mocked data that is set up inside the test method.

This test is fairly vanilla, but there is one section we should look into a little more

      tableTopic.pipeInput("alice", "a", 5);  (1)
      tableTopic.pipeInput("bobby", "b", 10);


      inputTopic.pipeInput("alice", "11", 10); (2)
      inputTopic.pipeInput("bobby", "21", 15);
      inputTopic.pipeInput("alice", "12", 30);


      tableTopic.pipeInput("freddy", "f", 65000);  <3> // punctuate gets called now

      inputTopic.pipeInput("alice", "13", 70006);  (4)
      inputTopic.pipeInput("bobby", "22", 70016);


      final List<KeyValue<String, String>> actualResults = outputTopic.readKeyValuesToList();

      assertThat(actualResults.size(), is(5));  (5)

      assertThat(actualResults.get(0).key, is("alice"));
      assertThat(actualResults.get(0).value, is("11 a"));

      assertThat(actualResults.get(1).key, is("bobby"));
      assertThat(actualResults.get(1).value, is("21 b"));

      assertThat(actualResults.get(2).key, is("alice"));
      assertThat(actualResults.get(2).value, is("12 a"));

      assertThat(actualResults.get(3).key, is("alice"));
      assertThat(actualResults.get(3).value, is("13")); // join didn't match on right side


      assertThat(actualResults.get(4).key, is("bobby"));
      assertThat(actualResults.get(4).value, is("22"));
1 Piping some data into the KTable initially
2 Pipe some data into the KStream to join
3 Move the time forward by publishing something into the KTable with a timestamp greater than the TTL
4 Pipe some more KStream data to fail to find a match
5 Check each join to see where it matched and where it didn’t match (due to the TTLed KTable records)

Now create the following file at src/test/java/io/confluent/developer/KafkaStreamsKTableTTLExampleTest.java.

package io.confluent.developer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.Test;

public class KafkaStreamsKTableTTLExampleTest {

  private final static String TEST_CONFIG_FILE = "configuration/test.properties";

  @Test
  public void shouldTriggerStreamTableJoinFromTable() throws Exception {

    final KafkaStreamsKTableTTLExample instance = new KafkaStreamsKTableTTLExample();
    final Properties envProps = instance.loadEnvProperties(TEST_CONFIG_FILE);

    final Properties streamProps = instance.getStreamProps(envProps);
    final String inputTopicName = envProps.getProperty("input.topic.name");
    final String outputTopicName = envProps.getProperty("output.topic.name");
    final String tableTopicName = envProps.getProperty("table.topic.name");

    final Topology topology = instance.buildTopology(envProps);
    System.out.println(topology);
    try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProps)) {


      final Serializer<String> keySerializer = Serdes.String().serializer();
      final Serializer<String> valueSerializer = Serdes.String().serializer();

      final Deserializer<String> keyDeserializer = Serdes.String().deserializer();
      final Deserializer<String> valueDeserializer = Serdes.String().deserializer();


      final TestInputTopic<String, String>  inputTopic = testDriver.createInputTopic(inputTopicName,
                                                                                        keySerializer,
                                                                                        valueSerializer);
      final TestInputTopic<String, String>  tableTopic = testDriver.createInputTopic(tableTopicName,
          keySerializer,
          valueSerializer);

      final TestOutputTopic<String, String> outputTopic = testDriver.createOutputTopic(outputTopicName, keyDeserializer, valueDeserializer);
      final TestOutputTopic<String, String> outputTableTopic = testDriver.createOutputTopic(tableTopicName, keyDeserializer, valueDeserializer);


      tableTopic.pipeInput("alice", "a", 5);
      tableTopic.pipeInput("bobby", "b", 10);


      inputTopic.pipeInput("alice", "11", 10);
      inputTopic.pipeInput("bobby", "21", 15);
      inputTopic.pipeInput("alice", "12", 30);


      tableTopic.pipeInput("freddy", "f", 65000);  // punctuate gets called now

      inputTopic.pipeInput("alice", "13", 70006);
      inputTopic.pipeInput("bobby", "22", 70016);


      final List<KeyValue<String, String>> actualResults = outputTopic.readKeyValuesToList();

      assertThat(actualResults.size(), is(5));

      assertThat(actualResults.get(0).key, is("alice"));
      assertThat(actualResults.get(0).value, is("11 a"));

      assertThat(actualResults.get(1).key, is("bobby"));
      assertThat(actualResults.get(1).value, is("21 b"));

      assertThat(actualResults.get(2).key, is("alice"));
      assertThat(actualResults.get(2).value, is("12 a"));

      assertThat(actualResults.get(3).key, is("alice"));
      assertThat(actualResults.get(3).value, is("13")); // join didn't match on right side


      assertThat(actualResults.get(4).key, is("bobby"));
      assertThat(actualResults.get(4).value, is("22"));


  }

  }

}

3
Invoke the tests

Now run the test, which is as simple as:

./gradlew test

Take it to production

1
Create a production configuration file

First, create a new configuration file at configuration/prod.properties with the following content. Be sure to fill in the addresses of your production hosts and change any other parameters that make sense for your setup.

application.id=schedule-ktable-ttl
bootstrap.servers=<<FILL ME IN>>

input.topic.name=inputTopicForStream
input.topic.partitions=1
input.topic.replication.factor=1

table.topic.name=inputTopicForTable
table.topic.partitions=1
table.topic.replication.factor=1
table.topic.ttl.store.name=table-purge-store
table.topic.ttl.minutes=1
table.topic.ttl.scan.seconds=5

2
Build a Docker image

In your terminal, execute the following to invoke the Jib plugin to build an image:

gradle jibDockerBuild --image=io.confluent.developer/schedule-ktable-ttl-join:0.0.1

3
Launch the container

Finally, launch the container using your preferred container orchestration service. If you want to run it locally, you can execute the following:

docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/schedule-ktable-ttl-join:0.0.1 config.properties

Deploy on Confluent Cloud

1
Run your app to Confluent Cloud

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

First, create your Kafka cluster in Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage (details).

Next, from the Confluent Cloud UI, click on Tools & client config to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.