Handling uncaught exceptions

Question:

How do I handle uncaught exceptions?

Edit this page

Example use case:

You have an event streaming application and you want to make sure it's robust in the face of unexpected errors. Depending on the situation, you'll either want the application to either continue running or shut down. In this tutorial you'll learn how to use the `StreamsUncaughtExceptionHandler` to provide this functionality.

Code example:





Short Answer

To handle uncaught exceptions, use the KafkaStreams.setUncaughtExceptionHandler method.

KafkaStreams kafkaStreams = new KafkaStreams(topologyBuilder.build(), properties);

// Using a lambda, take a static approach to errors regardless of the exception
kafkaStreams.setUncaughtExceptionHander((exception) -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);

// Using a concrete implementation
kafkaStreams.setUncaughtExceptionHander(new MyExceptionHandler());

The StreamsUncaughtExceptionHandler interface gives you an opportunity to respond to exceptions not handled by Kafka Streams. It has one method, handle, and it returns an enum of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. There are three possible values: REPLACE_THREAD, SHUTDOWN_CLIENT, or SHUTDOWN_APPLICATION.

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir error-handling && cd error-handling

Next, create a directory for configuration data:

mkdir configuration

2
Sign up for Confluent Cloud and provision resources

Sign up for Confluent Cloud, a fully-managed Apache Kafka service. Then provision your resources:

  1. After you log in to Confluent Cloud, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

3
Create a properties file with Confluent Cloud information

From the Confluent Cloud UI, navigate to your Kafka cluster and click on Clients and then select Java.

Create new credentials for your Kafka cluster and Schema Registry, and then Confluent Cloud will show a configuration similar to below with your new credentials automatically populated (make sure show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='{{ CLUSTER_API_KEY }}'   password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the UI so that it includes your Confluent Cloud information and credentials.

4
Download and setup the Confluent Cloud CLI

Instructions for installing Confluent Cloud CLI and configuring it to your Confluent Cloud environment is available from within the Confluent Cloud UI. Navigate to your Kafka cluster, click on the CLI and tools section, and run through the steps in the CCloud CLI tab.

5
Configure the project

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.22.0"
        classpath "com.github.jengelman.gradle.plugins:shadow:6.0.0"
    }
}

plugins {
    id "java"
    id "com.google.cloud.tools.jib" version "2.8.0"
    id "idea"
    id "eclipse"
}

sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
version = "0.0.1"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.apache.avro:avro:1.10.2"
    implementation "org.slf4j:slf4j-simple:1.7.30"
    implementation "org.apache.kafka:kafka-streams:2.8.0"
    implementation "org.apache.kafka:kafka-clients:2.8.0"

    testImplementation "org.apache.kafka:kafka-streams-test-utils:2.8.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.StreamsUncaughtExceptionHandling"
    )
  }
}

shadowJar {
    archiveBaseName = "error-handling-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Then create a development file at configuration/dev.properties:

application.id=error-handling
replication.factor=3

input.topic.name=input-topic
input.topic.partitions=6
input.topic.replication.factor=3

output.topic.name=output-topic
output.topic.partitions=6
output.topic.replication.factor=3

max.failures=3
max.time.millis=3600000

6
Update the properties file with Confluent Cloud information

Using the command below, append the contents of configuration/ccloud.properties (with your Confluent Cloud configuration) to configuration/dev.properties (with the application properties).

cat configuration/ccloud.properties >> configuration/dev.properties

7
Create an exception handler implementation

First, create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create the Kafka Streams application you’ll need to create an instance of a StreamsUncaughtExceptionHandler. For more information you can read KIP-671 which introduced the new functionality.

Before we dive into the code let’s briefly cover a few points about the StreamsUncaughtExceptionHander.

It’s an important point to keep in mind that the exception handler will not work for all exceptions, just those not directly handled by Kafka Streams. An example of an exception that Kafka Streams handles is the ProducerFencedException But any exceptions related to your business logic are not dealt with and bubble all the way up to the StreamThread, leaving the application no choice but to shut down. So the StreamsUncaughtExceptionHandler gives you a mechanism to take different actions in the case of a thrown exception.

The StreamsUncaughtExceptionHandler has one method handle, and it returns an enum of type StreamThreadExceptionResponse which provides you the opportunity to instruct Kafka Streams how to respond to the exception. The possible return values are:

  • REPLACE_THREAD - Replaces the thread receiving the exception and processing continues with the same number of configured threads. (Note: this can result in duplicate records depending on the application’s processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)

  • SHUTDOWN_CLIENT - Shut down the individual instance of the Kafka Streams application experiencing the exception. (This is the previous behavior and the current default behavior if you don’t provide a StreamsUncaughtExceptionHandler)

  • SHUTDOWN_APPLICATION - Shut down all instances of a Kafka Streams application with the same application-id. Kafka Streams uses a rebalance to instruct all application instances to shutdown, so even those running on another machine will receive the signal and exit.

For your implementation of the StreamsUncaughtExceptionHandler, it will keep track of the number of errors that occur within a given time frame. If the number of errors exceed the threshold within the provided timeframe, then the entire application shuts down. While you could put the exception handling code in a lambda statement, having a separate concrete implementation is better for testing.

Here’s the constructor where you provide the max number of failures and the timeframe:

public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
    this.maxFailures = maxFailures;   (1)
    this.maxTimeIntervalMillis = maxTimeIntervalMillis;  (2)
}
1 The max number of failures your application will tolerate within a given timeframe
2 The max total time allowed for observing the failures

This is probably best understood by taking a look at the core logic:

 if (currentFailureCount >= maxFailures) {  (1)
    if (millisBetweenFailure <= maxTimeIntervalMillis) { (2)
        return SHUTDOWN_APPLICATION;
    } else {
        currentFailureCount = 0;  (3)
        previousErrorTime = null;
    }
}
return REPLACE_THREAD;  (4)
1 Checking if the current number of failures equals or exceeds the maximum
2 Checking if the threshold of max failures occurs within given time window, if yes then shut down.
3 If you’ve reached the max number, but the are spread out, reset
4 The default behavior here is to replace the thread

The idea here is that a couple of errors spread out are ok so processing continues. But a bunch of errors withing a small window of time could indicate a bigger issue, so it’s better to shutdown. While the code doesn’t inspect the type of the exception, that’s another valid approach as well.

The above code is just an example of what you could do and definitely not tested in a production setting. The main point here is while it’s a good idea to keep processing with a small number of errors, it’s not a good idea to continually replace the thread with sustained errors. It’s better to have some "guard rails" in place to make sure your application is robust, but won’t continue on when it shouldn’t.

Now create the following file at src/main/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandler.java

package io.confluent.developer;

import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;

import java.time.Instant;
import java.time.temporal.ChronoUnit;

import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;


public class MaxFailuresUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {

    final int maxFailures;
    final long maxTimeIntervalMillis;
    private Instant previousErrorTime;
    private int currentFailureCount;


    public MaxFailuresUncaughtExceptionHandler(final int maxFailures, final long maxTimeIntervalMillis) {
        this.maxFailures = maxFailures;
        this.maxTimeIntervalMillis = maxTimeIntervalMillis;
    }

    @Override
    public StreamThreadExceptionResponse handle(final Throwable throwable) {
        currentFailureCount++;
        Instant currentErrorTime = Instant.now();

        if (previousErrorTime == null) {
            previousErrorTime = currentErrorTime;
        }

        long millisBetweenFailure = ChronoUnit.MILLIS.between(previousErrorTime, currentErrorTime);

        if (currentFailureCount >= maxFailures) {
            if (millisBetweenFailure <= maxTimeIntervalMillis) {
                return SHUTDOWN_APPLICATION;
            } else {
                currentFailureCount = 0;
                previousErrorTime = null;
            }
        }
        return REPLACE_THREAD;
    }
}

You’ll add the StreamsUncaughtExceptionHandler to your Kafka Streams application in the next step.

There is an older, deprecated version of KafkaStreams.setUncaughtExceptionHandler that takes an instance of a java.lang.Thread.UncaughtExceptionHandler. It is advised for users to migrate to use the newer method.

8
Create the Kafka Streams topology

Here is the code we’ll use to drive our tutorial

   builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    counter++;
                    if (counter == 2 || counter == 8 || counter == 15) { (1)
                        throw new IllegalStateException("It works on my box!!!");
                    }
                    return value.toUpperCase();
                })
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

    // Details left out for clarity
    .......

    // In the main method

    final KafkaStreams streams = new KafkaStreams(topology, streamProps);
    final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval); (2)
    streams.setUncaughtExceptionHandler(exceptionHandler); (3)
1 Simulating an error depending on the value of a counter (which gets incremented with every record)
2 Instantiating the exception handler, the maxFailures (3) and maxTimeInterval (3600000 ms == 1 hour) variables get their values from the configuration files
3 Adding the handler to Kafka Streams

This code ensures that the rate of errors (3 within a 1 hour window) meets the criteria for shutting down the application.

Now create the following file at src/main/java/io/confluent/developer/StreamsUncaughtExceptionHandling.java

package io.confluent.developer;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class StreamsUncaughtExceptionHandling {

    int counter = 0;

    public Topology buildTopology(Properties allProps) {
        final StreamsBuilder builder = new StreamsBuilder();
        final String inputTopic = allProps.getProperty("input.topic.name");
        final String outputTopic = allProps.getProperty("output.topic.name");

        builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()))
                .mapValues(value -> {
                    counter++;
                    if (counter == 2 || counter == 8 || counter == 15) {
                        throw new IllegalStateException("It works on my box!!!");
                    }
                    return value.toUpperCase();
                })
                .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));

        return builder.build();
    }

    public void createTopics(Properties allProps) {
        try (AdminClient client = AdminClient.create(allProps)) {
            List<NewTopic> topicList = new ArrayList<>();

            NewTopic sessionInput = new NewTopic(allProps.getProperty("input.topic.name"),
                    Integer.parseInt(allProps.getProperty("input.topic.partitions")),
                    Short.parseShort(allProps.getProperty("input.topic.replication.factor")));
            topicList.add(sessionInput);

            NewTopic counts = new NewTopic(allProps.getProperty("output.topic.name"),
                    Integer.parseInt(allProps.getProperty("output.topic.partitions")),
                    Short.parseShort(allProps.getProperty("output.topic.replication.factor")));

            topicList.add(counts);
            client.createTopics(topicList);
        }
    }

    public Properties loadEnvProperties(String fileName) throws IOException {
        Properties allProps = new Properties();
        FileInputStream input = new FileInputStream(fileName);
        allProps.load(input);
        input.close();

        return allProps;
    }

    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to an environment configuration file.");
        }

        StreamsUncaughtExceptionHandling tw = new StreamsUncaughtExceptionHandling();
        Properties allProps = tw.loadEnvProperties(args[0]);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // Change this to StreamsConfig.EXACTLY_ONCE to eliminate duplicates
        allProps.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.AT_LEAST_ONCE);
        Topology topology = tw.buildTopology(allProps);

        tw.createTopics(allProps);
        TutorialDataGenerator dataGenerator = new TutorialDataGenerator(allProps);
        dataGenerator.generate();

        final int maxFailures = Integer.parseInt(allProps.getProperty("max.failures"));
        final long maxTimeInterval = Long.parseLong(allProps.getProperty("max.time.millis"));
        final KafkaStreams streams = new KafkaStreams(topology, allProps);
        final MaxFailuresUncaughtExceptionHandler exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeInterval);
        streams.setUncaughtExceptionHandler(exceptionHandler);

        // Attach shutdown handler to catch Control-C.
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
            }
        });

        try {
            streams.cleanUp();
            streams.start();
        } catch (Throwable e) {
            System.exit(1);
        }
    }

    static class TutorialDataGenerator {
        final Properties properties;


        public TutorialDataGenerator(final Properties properties) {
            this.properties = properties;
        }

        public void generate() {
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

            try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
                String topic = properties.getProperty("input.topic.name");
                List<String> messages = Arrays.asList("All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit");


                messages.forEach(message -> producer.send(new ProducerRecord<>(topic, message), (metadata, exception) -> {
                        if (exception != null) {
                            exception.printStackTrace(System.out);
                        } else {
                            System.out.printf("Produced record at offset %d to topic %s %n", metadata.offset(), metadata.topic());
                        }
                }));
            }
        }
    }
}

9
Compile and run the Kafka Streams program

Now that we have data generation working, let’s build your application by running:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally.

The application for this tutorial includes a record generator to populate the topic data. Here is the list of records produced:

"All", "streams", "lead", "to", "Confluent", "Go", "to", "Kafka", "Summit"

Since we force some exceptions at different intervals while the streams application runs, you should see some stack traces in the console indicating an error, but the application will continue running. However, when the application encounters an error that meets the threshold for max errors, it will shut down.

Now run the following program, but watch the logs in the console and let the application run for a few seconds.

java -jar build/libs/error-handling-standalone-0.0.1.jar configuration/dev.properties

You should observe it shutting down and see something similar to this in the console

INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] State transition from PENDING_ERROR to ERROR
INFO org.apache.kafka.streams.KafkaStreams - stream-client [error-handling-5c246409-ae84-4bbd-af85-c4e8d1d556d9] Streams client stopped to ERROR completely

10
Consume data from the output topic

Now that you’ve ran the Kafka Streams application, it should have shut itself down due to reaching the max-error threshold.

Let’s now run the Confluent Cloud CLI to confirm the output:

ccloud kafka topic consume output-topic --from-beginning

Your results should look something like this:


ALL
ALL
STREAMS
LEAD
TO
CONFLUENT
ALL
STREAMS
LEAD
TO
CONFLUENT
GO

You’ll notice there are some duplicated values in the output. This duplication is to be expected, as the streams application is running with the default processing mode of AT_LEAST_ONCE. Duplicate values is one thing to consider when using REPLACE_THREAD with the StreamsUncaughtExceptionHander, since this is analogous to using retries with the KafkaProducer. If you don’t want duplicate values, you should consider running with the processing mode of EXACTLY_ONCE

Enter Ctrl+C to exit.

11
Teardown Confluent Cloud resources

You may try another Kafka tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud UI or CLI to destroy all the resources you created. Verify they are destroyed to avoid unexpected charges.

Test it

1
Create a test configuration file

First, create a test file at configuration/test.properties:

application.id=error-handling-test
bootstrap.servers=localhost:29092

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

max.failures=3
max.time.millis=120000

2
Write a test

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

The test for our streams application is simple, but we have two scenarios to cover. The first is when the data is not in the expected format, so we expect that the topology will throw an exception. The second case is the happy path where the data is as we expect:

@Test
public void shouldThrowException() {
    assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(List.of("foo", "bar"))); (1)
}

@Test
public void shouldProcessValues() {  (2)
    var validMessages =  Collections.singletonList("foo");
    var expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
    inputTopic.pipeValueList(validMessages);
    var actualResults = outputTopic.readValuesToList();
    assertEquals(expectedMessages, actualResults);
}
1 Test verifying unexpected format throws exception
2 Test validating the expected processing

We also have logic in the MaxFailuresUncaughtExceptionHandler that needs testing as well. Just like the streams application test, we have two scenarios to verify.

  • The case when errors are spread out so the exception handler should return REPLACE_THREAD

  • The case when the errors occur within our window and we expect the handler to return a SHUTDOWN_APPLICATION

@Test
public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception {  (1)
    for (int i = 0; i < 10; i++) {
        assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
        Thread.sleep(200);
    }
}
@Test
public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception { (2)
    assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
    Thread.sleep(50);
    assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
}
1 Test validating errors spread out result in replacing the thread
2 This test validates that a bunch of errors in a small timeframe result in a shutdown

With the brief testing discussion done, let’s create our two test files.

First create the topology test file at src/test/java/io/confluent/developer/StreamsUncaughtExceptionHandlingTest.java.

package io.confluent.developer;


import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertThrows;


public class StreamsUncaughtExceptionHandlingTest {

    private final static String TEST_CONFIG_FILE = "configuration/test.properties";

    private TestInputTopic<String, String> inputTopic;
    private TestOutputTopic<String, String> outputTopic;
    private TopologyTestDriver testDriver;


    @Before
    public void setUp() throws IOException {
        final StreamsUncaughtExceptionHandling instance = new StreamsUncaughtExceptionHandling();
        final Properties allProps = instance.loadEnvProperties(TEST_CONFIG_FILE);
        allProps.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        allProps.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final String sessionDataInputTopic = allProps.getProperty("input.topic.name");
        final String outputTopicName = allProps.getProperty("output.topic.name");

        final Topology topology = instance.buildTopology(allProps);
        testDriver = new TopologyTestDriver(topology, allProps);
        final Serializer<String> keySerializer = Serdes.String().serializer();
        final Serializer<String> exampleSerializer = Serdes.String().serializer();
        final Deserializer<String> valueDeserializer = Serdes.String().deserializer();
        final Deserializer<String> keyDeserializer = Serdes.String().deserializer();

        inputTopic = testDriver.createInputTopic(sessionDataInputTopic, keySerializer, exampleSerializer);
        outputTopic = testDriver.createOutputTopic(outputTopicName, keyDeserializer, valueDeserializer);
    }

    @After
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void shouldThrowException() {
        assertThrows(org.apache.kafka.streams.errors.StreamsException.class, () -> inputTopic.pipeValueList(Arrays.asList("foo", "bar")));
    }

    @Test
    public void shouldProcessValues() {
        List<String> validMessages =  Collections.singletonList("foo");
        List<String> expectedMessages = validMessages.stream().map(String::toUpperCase).collect(Collectors.toList());
        inputTopic.pipeValueList(validMessages);
        List<String> actualResults = outputTopic.readValuesToList();
        assertEquals(expectedMessages, actualResults);
    }

}

Then create the handler test file at src/test/java/io/confluent/developer/MaxFailuresUncaughtExceptionHandlerTest.java.

package io.confluent.developer;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.*;

public class MaxFailuresUncaughtExceptionHandlerTest {

    private MaxFailuresUncaughtExceptionHandler exceptionHandler;
    private final IllegalStateException worksOnMyBoxException = new IllegalStateException("Strange, It worked on my box");

    @Before
    public void setUp() {
        long maxTimeMillis = 100;
        int maxFailures = 2;
        exceptionHandler = new MaxFailuresUncaughtExceptionHandler(maxFailures, maxTimeMillis);
    }

    @Test
    public void shouldReplaceThreadWhenErrorsNotWithinMaxTime() throws Exception {
        for (int i = 0; i < 10; i++) {
            assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
            Thread.sleep(200);
        }
    }

    @Test
    public void shouldShutdownApplicationWhenErrorsOccurWithinMaxTime() throws Exception {
        assertEquals(REPLACE_THREAD, exceptionHandler.handle(worksOnMyBoxException));
        Thread.sleep(50);
        assertEquals(SHUTDOWN_APPLICATION, exceptionHandler.handle(worksOnMyBoxException));
    }
}

3
Invoke the tests

Now run the test, which is as simple as:

./gradlew test

Take it to production

1
Create a production configuration file

First, create a new configuration file at configuration/prod.properties with the following content. Be sure to fill in the addresses of your production hosts and change any other parameters that make sense for your setup.

application.id=error-handling
bootstrap.servers=<<FILL ME IN>>
schema.registry.url=<<FILL ME IN>>

input.topic.name=temp-readings
input.topic.partitions=<<FILL ME IN>>
input.topic.replication.factor=<<FILL ME IN>>

output.topic.name=output-topic
output.topic.partitions=<<FILL ME IN>>
output.topic.replication.factor=<<FILL ME IN>>

2
Build a Docker image

In your terminal, execute the following to invoke the Jib plugin to build an image:

gradle jibDockerBuild --image=io.confluent.developer/error-handling-join:0.0.1

3
Launch the container

Finally, launch the container using your preferred container orchestration service. If you want to run it locally, you can execute the following:

docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/error-handling-join:0.0.1 config.properties