How to build your first Apache Kafka Streams application

Question:

How do I get started in building my first Kafka Streams application?

Edit this page

Example use case:

You'd like to get started with Kafka Streams, but you're not sure where to start. In this tutorial you'll build a small stream processing application and produce some sample data to test it. You can use the code in this tutorial as an example of how to use Kafka Streams. After you complete this tutorial, you can go more in-depth in the Kafka Streams 101 Course.

Hands-on code example:





Short Answer

Using the Apache Kafka Streams DSL, create a stream processing topology to define your business logic. The example below reads events from the input topic using the stream function, processes events using the mapValues transformation, allows for debugging with peek, and writes the transformed events to an output topic using to.

static Topology buildTopology(String inputTopic, String outputTopic) {
    Serde<String> stringSerde = Serdes.String();
    StreamsBuilder builder = new StreamsBuilder();
    builder
        .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
        .peek((k,v) -> logger.info("Observed event: {}", v))
        .mapValues(s -> s.toUpperCase())
        .peek((k,v) -> logger.info("Transformed event: {}", v))
        .to(outputTopic, Produced.with(stringSerde, stringSerde));
    return builder.build();
}

Run it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir creating-first-apache-kafka-streams-application && cd creating-first-apache-kafka-streams-application

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR

And launch it by running:

docker-compose up -d

3
Configure the project

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
    }
}

plugins {
    id "java"
    id "com.google.cloud.tools.jib" version "3.1.1"
    id "idea"
    id "eclipse"
    id "application"
}

version = "0.0.1"
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
application {
  mainClassName = "io.confluent.developer.KafkaStreamsApplication"
}

repositories {
    mavenCentral()


    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:1.7.30"
    implementation "org.apache.kafka:kafka-streams:2.8.1"
    implementation 'com.github.javafaker:javafaker:1.0.2'
    testImplementation "org.apache.kafka:kafka-streams-test-utils:2.8.1"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaStreamsApplication"
    )
  }
}

shadowJar {
    archivesBaseName = "creating-first-apache-kafka-streams-application-standalone"
    archiveClassifier = ''
}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a configuration file for development at configuration/dev.properties:

application.id=kafka-streams-101
bootstrap.servers=localhost:29092

input.topic.name=random-strings
output.topic.name=tall-random-strings

key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

4
Create a Utility class

First, create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Now create a utility class that provides functions to support our tutorial. You may decide not to include these types of functions in the production version of your application, however, they are useful for getting started quickly. This utility class includes functions to create our Kafka topics and generate sample event data we can use to exercise our Kafka Streams topology.

Create the following file at src/main/java/io/confluent/developer/Util.java.

package io.confluent.developer;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class Util implements AutoCloseable {

    private final Logger logger = LoggerFactory.getLogger(Util.class);
    private ExecutorService executorService = Executors.newFixedThreadPool(1);

    public class Randomizer implements AutoCloseable, Runnable {
        private Properties props;
        private String topic;
        private Producer<String, String> producer;
        private boolean closed;

        public Randomizer(Properties producerProps, String topic) {
            this.closed = false;
            this.topic = topic;
            this.props = producerProps;
            this.props.setProperty("client.id", "faker");
        }

        public void run() {
            try (KafkaProducer producer = new KafkaProducer<String, String>(props)) {
                Faker faker = new Faker();
                while (!closed) {
                    try {
                        Object result = producer.send(new ProducerRecord<>(
                                this.topic,
                                faker.chuckNorris().fact())).get();
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Exception ex) {
                logger.error(ex.toString());
            }
        }
        public void close()  {
            closed = true;
        }
    }

    public Randomizer startNewRandomizer(Properties producerProps, String topic) {
        Randomizer rv = new Randomizer(producerProps, topic);
        executorService.submit(rv);
        return rv;
    }

    public void createTopics(final Properties allProps, List<NewTopic> topics)
            throws InterruptedException, ExecutionException, TimeoutException {
        try (final AdminClient client = AdminClient.create(allProps)) {
            logger.info("Creating topics");

            client.createTopics(topics).values().forEach( (topic, future) -> {
                try {
                    future.get();
                } catch (Exception ex) {
                    logger.info(ex.toString());
                }
            });

            Collection<String> topicNames = topics
                .stream()
                .map(t -> t.name())
                .collect(Collectors.toCollection(LinkedList::new));

            logger.info("Asking cluster for topic descriptions");
            client
                .describeTopics(topicNames)
                .all()
                .get(10, TimeUnit.SECONDS)
                .forEach((name, description) -> logger.info("Topic Description: {}", description.toString()));
        }
    }

    public void close() {
        if (executorService != null) {
            executorService.shutdownNow();
            executorService = null;
        }
    }
}

5
Create the Kafka Streams topology

Kafka Streams applications define their logic in a processor topology, which is a graph of stream processors (nodes) and streams (edges). There are two methods for defining these components in your Kafka Streams application, the Streams DSL and the Processor API. The Streams DSL provides built-in abstractions for common event stream processing concepts like streams, tables, and transformations, while the Processor API can be used for advanced cases not supported by the DSL.

The Streams DSL is recommended for most use cases and this tutorial will use it to define a basic text processing application. To get started, let’s focus on the important bits of Kafka Streams application code, highlighting the DSL usage.

static Topology buildTopology(String inputTopic, String outputTopic) {
    Serde<String> stringSerde = Serdes.String();
    StreamsBuilder builder = new StreamsBuilder();
    builder
        .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
        .peek((k,v) -> logger.info("Observed event: {}", v))
        .mapValues(s -> s.toUpperCase())
        .peek((k,v) -> logger.info("Transformed event: {}", v))
        .to(outputTopic, Produced.with(stringSerde, stringSerde));
    return builder.build();
}

In the code above, the StreamsBuilder class is used to construct the design of the topology. The DSL API allows you to construct your application by chaining together the desired behaviors using a fluent API.

A typical topology follows a common pattern:

  • Consume one or more input streams using the stream function which accepts the names of the Kafka topics to consume from along with the deserializers required to decode the data.

  • Transform events by chaining together one or more transformations. In our example, we use mapValues to convert incoming String events to their upper case value.

  • Transformed events are streamed as the output of the topology using the to function specifying a destination topic as well as the serializers required to encode the data.

The peek function allows you to observe and act on events and they flow through the topology stages. In our example it is used to debug the topology by printing events as they flow through the topology.

Once the topology is defined within the builder, the buildTopology function returns an instance of the Topology created from builder.build. Separating the building of the Topology in a function is useful for testing purposes, which we will see in the Test It section of the tutorial.

Now go ahead and create the full Java source file at src/main/java/io/confluent/developer/KafkaStreamsApplication.java.

package io.confluent.developer;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class KafkaStreamsApplication {

    private static final Logger logger = LoggerFactory.getLogger(KafkaStreamsApplication.class);

    static void runKafkaStreams(final KafkaStreams streams) {
        final CountDownLatch latch = new CountDownLatch(1);
        streams.setStateListener((newState, oldState) -> {
            if (oldState == KafkaStreams.State.RUNNING && newState != KafkaStreams.State.RUNNING) {
                latch.countDown();
            }
        });

        streams.start();

        try {
            latch.await();
        } catch (final InterruptedException e) {
            throw new RuntimeException(e);
        }

        logger.info("Streams Closed");
    }
    static Topology buildTopology(String inputTopic, String outputTopic) {
        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder builder = new StreamsBuilder();

        builder
            .stream(inputTopic, Consumed.with(stringSerde, stringSerde))
            .peek((k,v) -> logger.info("Observed event: {}", v))
            .mapValues(s -> s.toUpperCase())
            .peek((k,v) -> logger.info("Transformed event: {}", v))
            .to(outputTopic, Produced.with(stringSerde, stringSerde));

        return builder.build();
    }
    public static void main(String[] args) throws Exception {

        if (args.length < 1) {
            throw new IllegalArgumentException("This program takes one argument: the path to a configuration file.");
        }

        Properties props = new Properties();
        try (InputStream inputStream = new FileInputStream(args[0])) {
            props.load(inputStream);
        }

        final String inputTopic = props.getProperty("input.topic.name");
        final String outputTopic = props.getProperty("output.topic.name");

        try (Util utility = new Util()) {

            utility.createTopics(
                    props,
                    Arrays.asList(
                            new NewTopic(inputTopic, Optional.empty(), Optional.empty()),
                            new NewTopic(outputTopic, Optional.empty(), Optional.empty())));

            // Ramdomizer only used to produce sample data for this application, not typical usage
            try (Util.Randomizer rando = utility.startNewRandomizer(props, inputTopic)) {

                KafkaStreams kafkaStreams = new KafkaStreams(
                        buildTopology(inputTopic, outputTopic),
                        props);

                Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

                logger.info("Kafka Streams 101 App Started");
                runKafkaStreams(kafkaStreams);

            }
        }
    }
}

6
Compile and run the Kafka Streams program

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/creating-first-apache-kafka-streams-application-*.jar configuration/dev.properties

If the Kafka Streams application has started properly, you should see the debugging log output from the peek functions.

Something similar to:

[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris does not get compiler errors, the language changes itself to accommodate Chuck Norris.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS DOES NOT GET COMPILER ERRORS, THE LANGUAGE CHANGES ITSELF TO ACCOMMODATE CHUCK NORRIS.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Observed event: Chuck Norris can write infinite recursion functions... and have them return.
[faker-StreamThread-1] INFO io.confluent.developer.KafkaStreamsApplication - Transformed event: CHUCK NORRIS CAN WRITE INFINITE RECURSION FUNCTIONS... AND HAVE THEM RETURN.

7
Stream events using a console consumer

Now that the Kafka Streams application is running, run a command line consumer using the kafka-console-consumer CLI to view the events.

In a new terminal window, run the following console consumer to view the events being generated by the data generator and produced to the random-strings topic from the Randomizer class in your Kafka Streams application. These are the events that have been streamed into the topology (.stream(inputTopic, Consumed.with(stringSerde, stringSerde)).

docker exec -it broker /usr/bin/kafka-console-consumer --topic random-strings --bootstrap-server broker:9092

You should see output that looks like this (notice the mixed case of the string):

Chuck Norris has root access to your system.
Chuck Norris can't test for equality because he has no equal.
Chuck Norris went out of an infinite loop.
Anonymous methods and anonymous types are really all called Chuck Norris. They just don't like to boast.

Next, look at the transformed events in the tall-random-strings topic. These are the events that have been transformed (.mapValues) and written to the output topic .to(outputTopic, Produced.with(stringSerde, stringSerde)).

docker exec -it broker /usr/bin/kafka-console-consumer --topic tall-random-strings --bootstrap-server broker:9092

You should see output events that are entirely upper case:

CHUCK NORRIS HAS ROOT ACCESS TO YOUR SYSTEM.
CHUCK NORRIS CAN'T TEST FOR EQUALITY BECAUSE HE HAS NO EQUAL.
CHUCK NORRIS WENT OUT OF AN INFINITE LOOP.
ANONYMOUS METHODS AND ANONYMOUS TYPES ARE REALLY ALL CALLED CHUCK NORRIS. THEY JUST DON'T LIKE TO BOAST.

Once you are done with observing the behavior of the application, stop the consumers and the Kafka Streams application with ctrl-c in the appropriate terminal windows.

Finally, shutdown Confluent Platform by invoking docker-compose down -v.

Test it

1
Create a test configuration file

First, create a test file at configuration/test.properties:

application.id=kafka-streams-101
bootstrap.servers=localhost:29092

input.topic.name=random-strings
output.topic.name=tall-random-strings

2
Write a test

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Now create the following file at src/test/java/io/confluent/developer/KafkaStreamsApplicationTest.java. Testing a Kafka streams application requires a bit of test harness code, but happily the org.apache.kafka.streams.TopologyTestDriver class makes this much more pleasant that it would otherwise be.

package io.confluent.developer;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;

public class KafkaStreamsApplicationTest {

  private final static String TEST_CONFIG_FILE = "configuration/test.properties";

  @Test
  public void topologyShouldUpperCaseInputs() throws IOException {

    final Properties props = new Properties();
    try (InputStream inputStream = new FileInputStream(TEST_CONFIG_FILE)) {
        props.load(inputStream);
    }

    final String inputTopicName = props.getProperty("input.topic.name");
    final String outputTopicName = props.getProperty("output.topic.name");

    final Topology topology = KafkaStreamsApplication.buildTopology(inputTopicName, outputTopicName);

    try (final TopologyTestDriver testDriver = new TopologyTestDriver(topology, props)) {
      Serde<String> stringSerde = Serdes.String();

      final TestInputTopic<String, String> inputTopic = testDriver
              .createInputTopic(inputTopicName, stringSerde.serializer(), stringSerde.serializer());
      final TestOutputTopic<String, String> outputTopic = testDriver
              .createOutputTopic(outputTopicName, stringSerde.deserializer(), stringSerde.deserializer());

      List<String> inputs = Arrays.asList(
        "Chuck Norris can write multi-threaded applications with a single thread.",
        "No statement can catch the ChuckNorrisException.",
        "Chuck Norris can divide by zero.",
        "Chuck Norris can binary search unsorted data."
      );
      List<String> expectedOutputs = inputs.stream()
        .map(String::toUpperCase).collect(Collectors.toList());

      inputs.forEach(inputTopic::pipeInput);
      final List<String> actualOutputs = outputTopic.readValuesToList();

      assertThat(expectedOutputs, equalTo(actualOutputs));

    }

  }
}

3
Invoke the tests

Now run the test, which is as simple as:

./gradlew test

Take it to production

1
Create a production configuration file

First, create a new configuration file at configuration/prod.properties with the following content. Be sure to fill in the addresses of your production hosts and change any other parameters that make sense for your setup.

application.id=kafka-streams-101
bootstrap.servers=<<FILL ME IN>>

input.topic.name=<<FILL ME IN>>
input.topic.partitions=<<FILL ME IN>>
input.topic.replication.factor=<<FILL ME IN>>

output.topic.name=<<FILL ME IN>>
output.topic.partitions=<<FILL ME IN>>
output.topic.replication.factor=<<FILL ME IN>>

2
Build a Docker image

In your terminal, execute the following to invoke the Jib plugin to build an image:

gradle jibDockerBuild --image=io.confluent.developer/creating-first-apache-kafka-streams-application-join:0.0.1

3
Launch the container

Finally, launch the container using your preferred container orchestration service. If you want to run it locally, you can execute the following:

docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/creating-first-apache-kafka-streams-application-join:0.0.1 config.properties

Deploy on Confluent Cloud

1
Run your app to Confluent Cloud

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully-managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.