How to build your first Apache Kafka Streams application

Question:

How do I get started in building my first Kafka Streams application?

Edit this page

Example use case:

You'd like to get started with Kafka Streams, but you're not sure where to start. In this tutorial you'll build a small stream processing application and produce some sample data to test it. You can use the code in this tutorial as an example of how to use Kafka Streams. After you complete this tutorial, you can go more in-depth in the Kafka Streams 101 Course.

Hands-on code example:

New to Confluent Cloud? Sign up and run this tutorial for free.




Short Answer

Using the Apache Kafka Streams DSL, create a stream processing topology to define your business logic. The example below reads events from the input topic using the stream function, processes events using the mapValues transformation, allows for debugging with peek, and writes the transformed events to an output topic using to.

static Topology buildTopology(String inputTopic, String outputTopic) {
    Serde<String> stringSerde = Serdes.String();
    StreamsBuilder builder = new StreamsBuilder();
    builder
        .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
        .peek((k,v) -> logger.info("Observed event: {}", v))
        .mapValues(s -> s.toUpperCase())
        .peek((k,v) -> logger.info("Transformed event: {}", v))
        .to(outputTopic, Produced.with(stringSerde, stringSerde));
    return builder.build();
}

Run it

1
Provision your Kafka cluster

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service. First, sign up for Confluent Cloud.

  1. After you log in to Confluent Cloud, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

2
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir creating-first-apache-kafka-streams-application && cd creating-first-apache-kafka-streams-application

Next, create a directory for configuration data:

mkdir configuration

3
Save cloud configuration values to a local file

From the Confluent Cloud Console, navigate to your Kafka cluster. From the Clients view, get the connection information customized to your cluster (select Java).

Create new credentials for your Kafka cluster and Schema Registry, and then Confluent Cloud will show a configuration similar to below with your new credentials automatically populated (make sure show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials.

4
Download and setup the Confluent CLI

This tutorial has some steps for Kafka topic management and/or reading from or writing to Kafka topics, for which you can use the Confluent Cloud Console or install the Confluent CLI. Instructions for installing Confluent CLI and configuring it to your Confluent Cloud environment is available from within the Confluent Cloud Console: navigate to your Kafka cluster, click on the CLI and tools link, and run through the steps in the Confluent CLI tab.

The CLI clients for Confluent Cloud (ccloud) and Confluent Platform (confluent v1.0) have been unified into a single client Confluent CLI confluent v2.0. This tutorial uses the unified Confluent CLI confluent v2.0 (ccloud client will continue to work until sunset on May 9, 2022, and you can read the migration instructions to the unified confluent CLI at https://docs.confluent.io/confluent-cli/current/migrate.html).

5
Configure the project

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
    }
}

plugins {
    id "java"
    id "com.google.cloud.tools.jib" version "3.1.1"
    id "idea"
    id "eclipse"
    id "application"
}

version = "0.0.1"
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
application {
  mainClassName = "io.confluent.developer.KafkaStreamsApplication"
}

repositories {
    mavenCentral()


    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:1.7.30"
    implementation "org.apache.kafka:kafka-streams:2.8.1"
    implementation 'com.github.javafaker:javafaker:1.0.2'
    testImplementation "org.apache.kafka:kafka-streams-test-utils:2.8.1"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaStreamsApplication"
    )
  }
}

shadowJar {
    archivesBaseName = "creating-first-apache-kafka-streams-application-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Then create a configuration file for development at configuration/dev.properties:

application.id=kafka-streams-101

input.topic.name=random-strings
output.topic.name=tall-random-strings

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

6
Update the properties file with Confluent Cloud information

Using the command below, append the contents of configuration/ccloud.properties (with your Confluent Cloud configuration) to configuration/dev.properties (with the application properties).

cat configuration/ccloud.properties >> configuration/dev.properties

7
Create a Utility class

First, create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Now create a utility class that provides functions to support our tutorial. You may decide not to include these types of functions in the production version of your application, however, they are useful for getting started quickly. This utility class includes functions to create our Kafka topics and generate sample event data we can use to exercise our Kafka Streams topology.

Create the following file at src/main/java/io/confluent/developer/Util.java.

package io.confluent.developer;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class Util implements AutoCloseable {

    private final Logger logger = LoggerFactory.getLogger(Util.class);
    private ExecutorService executorService = Executors.newFixedThreadPool(1);

    public class Randomizer implements AutoCloseable, Runnable {
        private Properties props;
        private String topic;
        private Producer<String, String> producer;
        private boolean closed;

        public Randomizer(Properties producerProps, String topic) {
            this.closed = false;
            this.topic = topic;
            this.props = producerProps;
            this.props.setProperty("client.id", "faker");
        }

        public void run() {
            try (KafkaProducer producer = new KafkaProducer<String, String>(props)) {
                Faker faker = new Faker();
                while (!closed) {
                    try {
                        Object result = producer.send(new ProducerRecord<>(
                                this.topic,
                                faker.chuckNorris().fact())).get();
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Exception ex) {
                logger.error(ex.toString());
            }
        }
        public void close()  {
            closed = true;
        }
    }

    public Randomizer startNewRandomizer(Properties producerProps, String topic) {
        Randomizer rv = new Randomizer(producerProps, topic);
        executorService.submit(rv);
        return rv;
    }

    public void createTopics(final Properties allProps, List<NewTopic> topics)
            throws InterruptedException, ExecutionException, TimeoutException {
        try (final AdminClient client = AdminClient.create(allProps)) {
            logger.info("Creating topics");

            client.createTopics(topics).values().forEach( (topic, future) -> {
                try {
                    future.get();
                } catch (Exception ex) {
                    logger.info(ex.toString());
                }
            });

            Collection<String> topicNames = topics
                .stream()
                .map(t -> t.name())
                .collect(Collectors.toCollection(LinkedList::new));

            logger.info("Asking cluster for topic descriptions");
            client
                .describeTopics(topicNames)
                .all()
                .get(10, TimeUnit.SECONDS)
                .forEach((name, description) -> logger.info("Topic Description: {}", description.toString()));
        }
    }

    public void close() {
        if (executorService != null) {
            executorService.shutdownNow();
            executorService = null;
        }
    }
}

8
Create the Kafka Streams topology

Kafka Streams applications define their logic in a processor topology, which is a graph of stream processors (nodes) and streams (edges). There are two methods for defining these components in your Kafka Streams application, the Streams DSL and the Processor API. The Streams DSL provides built-in abstractions for common event stream processing concepts like streams, tables, and transformations, while the Processor API can be used for advanced cases not supported by the DSL.

The Streams DSL is recommended for most use cases and this tutorial will use it to define a basic text processing application. To get started, let’s focus on the important bits of Kafka Streams application code, highlighting the DSL usage.

static Topology buildTopology(String inputTopic, String outputTopic) {
    Serde<String> stringSerde = Serdes.String();
    StreamsBuilder builder = new StreamsBuilder();
    builder
        .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
        .peek((k,v) -> logger.info("Observed event: {}", v))
        .mapValues(s -> s.toUpperCase())
        .peek((k,v) -> logger.info("Transformed event: {}", v))
        .to(outputTopic, Produced.with(stringSerde, stringSerde));
    return builder.build();
}

In the code above, the StreamsBuilder class is used to construct the design of the topology. The DSL API allows you to construct your application by chaining together the desired behaviors using a fluent API.

A typical topology follows a common pattern:

  • Consume one or more input streams using the stream function which accepts the names of the Kafka topics to consume from along with the deserializers required to decode the data.

  • Transform events by chaining together one or more transformations. In our example, we use mapValues to convert incoming String events to their upper case value.

  • Transformed events are streamed as the output of the topology using the to function specifying a destination topic as well as the serializers required to encode the data.

The peek function allows you to observe and act on events and they flow through the topology stages. In our example it is used to debug the topology by printing events as they flow through the topology.

Once the topology is defined within the builder, the buildTopology function returns an instance of the Topology created from builder.build. Separating the building of the Topology in a function is useful for testing purposes, which we will see in the Test It section of the tutorial.

Now go ahead and create the full Java source file at src/main/java/io/confluent/developer/KafkaStreamsApplication.java.

package io.confluent.developer;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class KafkaStreamsApplication {

    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class);

    static void runKafkaStreams(final KafkaStreams streams) {
        final CountDownLatch latch = new CountDownLatch(1);
        streams.setStateListener((newState, oldState) -> {
            if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) {
                latch.countDown();
            }
        });

        streams.start();

        try {
            latch.await();
        } catch (final InterruptedException e) {
            throw new RuntimeException(e);
        }

        logger.info("Streams Closed");
    }
    static Topology buildTopology(String inputTopic, String outputTopic) {
        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder builder = new StreamsBuilder();

        builder
            .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
            .peek((k,v) -> logger.info("Observed event: {}", v))
            .mapValues(s -> s.toUpperCase())
            .peek((k,v) -> logger.info("Transformed event: {}", v))
            .to(outputTopic, Produced.with(stringSerde, stringSerde));

        return builder.build();
    }
    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to a configuration file.");
        }

        Properties props = new Properties();
        try (InputStream inputStream = new FileInputStream(args[0])) {
            props.load(inputStream);
        }

        final String inputTopic = props.getProperty("input.topic.name");
        final String outputTopic = props.getProperty("output.topic.name");

        try (Util utility = new Util()) {

            utility.createTopics(
                    props,
                    Arrays.asList(
                            new NewTopic(inputTopic, Optional.empty(), Optional.empty()),
                            new NewTopic(outputTopic, Optional.empty(), Optional.empty())));

            // Ramdomizer only used to produce sample data for this application, not typical usage
            try (Util.Randomizer rando = utility.startNewRandomizer(props, inputTopic)) {

                KafkaStreams kafkaStreams = new KafkaStreams(
                        buildTopology(inputTopic, outputTopic),
                        props);

                Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

                logger.info("Kafka Streams 101 App Started");
                runKafkaStreams(kafkaStreams);

            }
        }
    }
}

9
Compile and run the Kafka Streams program

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar configuration/dev.properties

If the Kafka Streams application has started properly, you should see the debugging log output from the peek functions.

Something similar to:

[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris does not get compiler errors, the language changes itself to accommodate Chuck Norris.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS DOES NOT GET COMPILER ERRORS, THE LANGUAGE CHANGES ITSELF TO ACCOMMODATE CHUCK NORRIS.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris can write infinite recursion functions... and have them return.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS CAN WRITE INFINITE RECURSION FUNCTIONS... AND HAVE THEM RETURN.

10
Stream events using a console consumer

Now that the Kafka Streams application is running, run a command line consumer using the ccloud CLI to view the events (your ccloud context should be set to the proper environment, cluster, and API Key (see Step 4 above and Confluent CLI Reference for additional details).

Then, in a new terminal window, run the following console consumer to view the events being generated by the data generator and produced to the random-strings topic from the Randomizer class in your Kafka Streams application. These are the events that have been streamed into the topology (.stream(inputTopic, Consumed.with(stringSerde, stringSerde)).

confluent kafka topic consume random-strings

You should see output that looks like this (notice the mixed case of the string):

Starting Kafka Consumer. Use Ctrl-C to exit.
Chuck Norris can write multi-threaded applications with a single thread.
No statement can catch the ChuckNorrisException.
Chuck Norris can divide by zero.
Chuck Norris can binary search unsorted data.

Next, look at the transformed events in the tall-random-strings topic. These are the events that have been transformed (.mapValues) and written to the output topic .to(outputTopic, Produced.with(stringSerde, stringSerde)).

confluent kafka topic consume tall-random-strings

You should see output events that are entirely upper case:

Starting Kafka Consumer. Use Ctrl-C to exit.
CHUCK NORRIS CAN WRITE MULTI-THREADED APPLICATIONS WITH A SINGLE THREAD.
NO STATEMENT CAN CATCH THE CHUCKNORRISEXCEPTION.
CHUCK NORRIS CAN DIVIDE BY ZERO.
CHUCK NORRIS CAN BINARY SEARCH UNSORTED DATA

Once you are done with observing the behavior of the application, stop the consumers and the Kafka Streams application with ctrl-c in the appropriate terminal windows.

11
Teardown Confluent Cloud resources

You may try another Kafka tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all the resources you created. Verify they are destroyed to avoid unexpected charges.

Test it

1
Create a test configuration file

First, create a test file at configuration/test.properties:

application.id=kafka-streams-101
bootstrap.servers=localhost:29092

input.topic.name=random-strings
output.topic.name=tall-random-strings

2
Write a test

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Now create the following file at src/test/java/io/confluent/developer/KafkaStreamsApplicationTest.java. Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class KafkaStreamsApplicationTest {

  private final static String TEST_CONFIG_FILE = "configuration/test.properties";

  @Test
  public void topologyShouldUpperCaseInputs() throws IOException {

    final Properties props = new Properties();
    try (InputStream inputStream = new FileInputStream(TEST_CONFIG_FILE)) {
        props.load(inputStream);
    }

    final String inputTopicName = props.getProperty("input.topic.name");
    final String outputTopicName = props.getProperty("output.topic.name");

    final Topology topology = KafkaStreamsApplication.buildTopology(inputTopicName, outputTopicName);

    try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, props)) {
      Serde<String> stringSerde = Serdes.String();

      final TestInputTopic<String, String> inputTopic = testDriver
              .createInputTopic(inputTopicName, stringSerde.serializer(), stringSerde.serializer());
      final TestOutputTopic<String, String> outputTopic = testDriver
              .createOutputTopic(outputTopicName, stringSerde.deserializer(), stringSerde.deserializer());

      List<String> inputs = Arrays.asList(
        "Chuck Norris can write multi-threaded applications with a single thread.",
        "No statement can catch the ChuckNorrisException.",
        "Chuck Norris can divide by zero.",
        "Chuck Norris can binary search unsorted data."
      );
      List<String> expectedOutputs = inputs.stream()
        .map(String::toUpperCase).collect(Collectors.toList());

      inputs.forEach(inputTopic::pipeInput);
      final List<String> actualOutputs = outputTopic.readValuesToList();

      assertThat(expectedOutputs, equalTo(actualOutputs));

    }

  }
}

3
Invoke the tests

Now run the test, which is as simple as:

./gradlew test

Take it to production

1
Create a production configuration file

First, create a new configuration file at configuration/prod.properties with the following content. Be sure to fill in the addresses of your production hosts, security values, and change any other parameters that make sense for your setup.

application.id=kafka-streams-101
bootstrap.servers=<<FILL ME IN>>

security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='<<FILL ME IN>>' password='<<FILL ME IN>>';
sasl.mechanism=PLAIN

# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# May be used by standard Producers & Consumers
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

input.topic.name=<<FILL ME IN>>
output.topic.name=<<FILL ME IN>>

2
Build a Docker image

In your terminal, execute the following to invoke the Jib plugin to build an image:

gradle jibDockerBuild --image=io.confluent.developer/creating-first-apache-kafka-streams-application-join:0.0.1

3
Launch the container

Finally, launch the container using your preferred container orchestration service. If you want to run it locally, you can execute the following:

docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/creating-first-apache-kafka-streams-application-join:0.0.1 config.properties