Community contribution ✨

Emit a final result from a time window

Question:

How can I count the number of messages in a Kafka topic, per key over a time window, getting a final result that takes into account late arrivals?

Edit this page

Example use case:

Consider a topic with events that represent sensor warnings (pressure on robotic arms). One warning per time slot is fine, but you don't want to have too much warnings at the same time. In this tutorial, we'll write a program that counts the messages of a same sensor and sends a result at the end of the window.

Code example:

Try it

1
Get Confluent Platform

To get started, make a new directory anywhere you’d like for this project:

mkdir window-final-result && cd window-final-result

Next, create the following docker-compose.yml file to obtain Confluent Platform:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:6.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

And launch it by running:

docker-compose up -d

2
Initialize the project

Create the following Gradle build file, named build.gradle for the project:

plugins {
    id "java"
    id "application"
    id "com.google.cloud.tools.jib" version "2.6.0"
    id "com.github.johnrengelman.shadow" version "6.1.0"
    id "com.commercehub.gradle.plugin.avro" version "0.17.0"
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1-SNAPSHOT"


mainClassName = "io.confluent.developer.WindowFinalResult"

repositories {
    jcenter()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

dependencies {
    implementation "org.apache.avro:avro:1.10.0"

    implementation group: 'com.typesafe', name: 'config', version: '1.4.1'
    implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'

    implementation group: 'org.apache.kafka', name: 'kafka-streams', version: '2.5.0'
    implementation group: 'io.confluent', name: 'kafka-streams-avro-serde', version: '5.5.1'

    testImplementation "junit:junit:4.13.1"
    testImplementation "org.apache.kafka:kafka-streams-test-utils:2.5.0"
    testImplementation "com.github.grantwest.eventually:hamcrest-eventually-matchers:0.0.3"

    // helpers
    implementation group: 'com.jason-goodwin', name: 'better-monads', version: '0.4.1'
    implementation group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.13', version: '1.0.5'
}

shadowJar {
    archiveBaseName = "kstreams-${rootProject.name}"
    archiveClassifier = ''
}


task createTopics(type: JavaExec) {
    main = 'io.confluent.developer.helper.TopicCreation'
    classpath = sourceSets.main.runtimeClasspath
}

task publishSchemas(type: JavaExec) {
    main = 'io.confluent.developer.helper.SchemaPublication'
    classpath = sourceSets.main.runtimeClasspath
}

task consumeResult(type: JavaExec) {
    main = 'io.confluent.developer.helper.ResultConsumer'
    classpath = sourceSets.main.runtimeClasspath
}

run.dependsOn {
    [createTopics, publishSchemas]
}

jib {
    container.mainClass = mainClassName
}

test {
    testLogging {
        events "passed", "skipped", "failed"
        exceptionFormat "full"
    }
}

Note: In addition to our main class, this tutorial brings two Java executions responsible for creating the topics and schemas. In a real life application, these may be outside your project.

Create the following Gradle settings file, named settings.gradle for the project:

pluginManagement {
    repositories {
        gradlePluginPortal()
        jcenter()
        maven {
            name "JCenter Gradle Plugins"
            url  "https://dl.bintray.com/gradle/gradle-plugins"
        }
    }
}

rootProject.name = 'window-final-result'

Run the following command to obtain the Gradle wrapper:

gradle wrapper

Create a directory for the project resources:

mkdir -p src/main/resources

Add the config file src/main/resources/application.conf to setup your application:

application.id: "final-results-tutorial"
application.id: ${?APP_ID}

bootstrap.servers: "localhost:29092"
bootstrap.servers: ${?BOOTSTRAP_SERVERS}

schema.registry.url: "http://localhost:8081"
schema.registry.url: ${?SCHEMA_REGISTRY_URL}

window {

  size: 10 seconds
  size: ${?WINDOW_SIZE}

  grace.period: 20 seconds
  grace.period: ${?GRACE_PERIOD}
}

# you may play with the pattern, but ALWAYS include the Zone Offset (Z)!
# It is used to create a java.time.ZonedDateTime by parsing the event in the value message
sensor.datetime.pattern: "yyyy-MM-dd'T'HH:mm:ss.Z"

# adapt this part with YOUR preferd or location, It is used to diplay the result
local.date {
  lang: "fr"
  pattern: "EEE d MMM yyyy" # date only
}

input.topic {

  name: "input-topic"
  name: ${?INPUT_TOPIC}
  partitions: 1
  partitions: ${?INPUT_TOPIC_PARTITIONS}
  replication.factor: 1
  replication.factor: ${?INPUT_TOPIC_REPLICATION}
}

output.topic {

  name: "output-topic"
  name: ${?OUTPUT_TOPIC}
  partitions: 1
  partitions: ${?OUTPUT_TOPIC_PARTITIONS}
  replication.factor: 1
  replication.factor: ${?OUTPUT_TOPIC_REPLICATION}
}

You may want to adapt the blocks local.date.lang and local.date.pattern

This file contains default configs. In production they will be overridden by environment variables.

Add the logging configuration in the file: src/main/resources/logback.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="KSTREAMS" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>
                %yellow(%d{yyyy-MM-dd HH:mm:ss}) %cyan(${HOSTNAME}) %highlight([%p]) %green((%file:%line\)) - %msg%n
            </pattern>
        </encoder>
    </appender>

    <appender name="CONSUMER" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>
                %yellow(%d{yyyy-MM-dd HH:mm:ss}) %highlight([%p]) %magenta((%file:%line\)) - %msg%n
            </pattern>
        </encoder>
    </appender>

    <logger name="io.confluent.developer.helper.ResultConsumer" level="DEBUG" additivity="false">
        <appender-ref ref="CONSUMER" />
    </logger>

    <logger name="io.confluent.developer" level="DEBUG" additivity="false">
        <appender-ref ref="KSTREAMS" />
    </logger>

    <root level="WARN">
        <appender-ref ref="KSTREAMS" />
    </root>

</configuration>

3
Create a schema for the events

Create a directory for the pressure event schemas:

mkdir -p src/main/avro

Then create the following Avro schema file at src/main/avro/pressure-alert.avsc for the publication events:

{
  "type": "record",
  "name": "PressureAlert",
  "namespace": "io.confluent.developer.avro",
  "doc": "Object used for the recipe: Last Window Result",
  "fields": [
    {
      "name": "id",
      "type": "string",
      "doc": "Id of a robotic arm sensor"
    },
    {
      "name": "datetime",
      "type": "string",
      "doc": "Event time of a pressure alert"
    },
    {
      "name": "pressure",
      "type": "int",
      "doc": "Actual pressure level in Pascal (Pa), yeah metric system rules!"
    }
  ]
}

Because this Avro schema is used in the Java code, it needs to compile it. Run the following:

./gradlew build

4
Add the helper gradle tasks

Topic creation and avro schema declaration are often part of an external process. For the sake of clarity in this tutorial, we won’t include these steps as part of the main application, but isolate theme in a dedicated package.

Create a directory for the package helper:

mkdir -p src/main/java/io/confluent/developer/helper

Add the following class at src/main/java/io/confluent/developer/helper/TopicCreation.java package

package io.confluent.developer.helper;

import com.jasongoodwin.monads.Try;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class TopicCreation {

    private static final Logger logger = LoggerFactory.getLogger(TopicCreation.class);

    public static void main(String[] args) {

        Config config = ConfigFactory.load();

        Properties properties = new Properties();

        properties.put("bootstrap.servers", config.getString("bootstrap.servers"));

        AdminClient client = AdminClient.create(properties);

        HashMap<String, NewTopic> topics = new HashMap<>();

        topics.put(
                config.getString("input.topic.name"),
                new NewTopic(
                        config.getString("input.topic.name"),
                        config.getNumber("input.topic.partitions").intValue(),
                        config.getNumber("input.topic.replication.factor").shortValue())
        );

        topics.put(
                config.getString("output.topic.name"),
                new NewTopic(
                        config.getString("output.topic.name"),
                        config.getNumber("output.topic.partitions").intValue(),
                        config.getNumber("output.topic.replication.factor").shortValue())
        );

        try {
            logger.info("Starting the topics creation");

            CreateTopicsResult result = client.createTopics(topics.values());

            result.values().forEach((topicName, future) -> {
                NewTopic topic = topics.get(topicName);
                future.whenComplete((aVoid, maybeError) ->
                        Optional
                                .ofNullable(maybeError)
                                .map(Try::<Void>failure)
                                .orElse(Try.successful(null))

                                .onFailure(throwable -> logger.error("Topic creation didn't complete:", throwable))
                                .onSuccess((anOtherVoid) -> logger.info(
                                        String.format(
                                                "Topic %s, has been successfully created " +
                                                        "with %s partitions and replicated %s times",
                                                topic.name(),
                                                topic.numPartitions(),
                                                topic.replicationFactor() - 1
                                        )
                                )));
            });

            result.all().get();
        } catch (InterruptedException | ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) e.printStackTrace();
        } finally {
            client.close();
        }
    }
}

Add the following class in the src/main/java/io/confluent/developer/helper/SchemaPublication.java package

package io.confluent.developer.helper;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;

public class SchemaPublication {

    private static final Logger logger = LoggerFactory.getLogger(SchemaPublication.class);

    public static void main(String[] args) {

        Config config = ConfigFactory.load();

        String registryUrl = config.getString("schema.registry.url");

        CachedSchemaRegistryClient schemaRegistryClient  = new CachedSchemaRegistryClient(registryUrl, 10);

        try {
            logger.info(String.format("Schemas publication at: %s", registryUrl));

            schemaRegistryClient.register(
                String.format("%s-value", config.getString("input.topic.name")),
                new AvroSchema(PressureAlert.SCHEMA$)
            );
        } catch (IOException | RestClientException e) {
            e.printStackTrace();
        }
    }
}

Now the topics can be created separately with the following command.

./gradlew createTopics

Same thing for the schemas.

./gradlew publishSchemas

Check the build.gradle again. You will find the tasks declared as JavaExec with a main class corresponding to the two last files

5
Create the timestamp exctractor

There are multiple timing perspectives to consider, and each event may arrive from a different time zone.

  1. Event time, time of the sensor that is different rather it comes from Paris (UTC+02:00) or Tokyo (UTC+19:00)

  2. Processing time, the time of the Kafka Stream instances. Here the zone depends of your deployment (e.g., your fancy managed kubernetes cluster deployed in us-west-b :p)

  3. Ingestion time, less relevant, this is the time when the Kafka message has been published

Since our operations will be time based, you need to ensure the right time is considered. In this example, our data producer is not aware of message timestamp and places the time of the alert in the message value. We need to extract it from there. This can be performed by implementing a TimestampExtractor. Add the next class at src/main/java/io/confluent/developer/PressureDatetimeExtractor.java package.

package io.confluent.developer;

import com.jasongoodwin.monads.Try;
import com.typesafe.config.Config;
import io.confluent.developer.avro.PressureAlert;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;


public class PressureDatetimeExtractor implements TimestampExtractor {

    private final DateTimeFormatter formatter;
    private static final Logger logger = LoggerFactory.getLogger(TimestampExtractor.class);

    public PressureDatetimeExtractor(Config config) {
        this.formatter = DateTimeFormatter.ofPattern(config.getString("sensor.datetime.pattern"));
    }

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        return Try

                .ofFailable(() -> ((PressureAlert) record.value()).getDatetime())

                .onFailure((ex) -> logger.error("fail to cast the PressureAlert: ", ex))

                .map((stringDatetimeString) ->  ZonedDateTime.parse(stringDatetimeString, this.formatter))

                .onFailure((ex) -> logger.error("fail to parse the event datetime due to: ", ex))

                .map((zonedDatetime) -> zonedDatetime.toInstant().toEpochMilli())

                .onFailure((ex) -> logger.error("fail to convert the datetime to instant due to: ", ex))

                .orElse(-1L);
    }
}

Ok, lets translate this extract method from Java to English. First of all, we try to realise the following operation that may raise an exception:

  1. we cast the value Object as PressureAlert and call its .getDatetime method

  2. then we parse the string datetime base on the defined pattern

  3. then we convert it as Instant, in case the kafka message suffer from jet lag

  4. and get the epoch in milliseconds

If one this steps fail we will log the error and set the timestamp to a negative number, so it will silently ignored.

6
Create the Kafka Streams topology

In the main function we create time-based windows with a given size and the same step size. This results in non-overlapping windows called Tumbling Windows. Also we add a extra period were even if messages come late, if their datetime key correspond to a window they may join the window. Finally we pass this window, to a function that takes also StreamsBuilder and return a Topology. Add the next class at src/main/java/io/confluent/developer/WindowFinalResult.java package.

package io.confluent.developer;


import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Properties;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;

public class WindowFinalResult {

    private static final Logger logger = LoggerFactory.getLogger(WindowFinalResult.class);

    public static Properties buildProperties(Config config) {
        Properties properties = new Properties();

        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("bootstrap.servers"));
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, config.getString("application.id"));
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

        return properties;
    }

    public static Topology buildTopology(Config config,
                                         TimeWindows windows,
                                         SpecificAvroSerde<PressureAlert> pressureSerde) {

        StreamsBuilder builder = new StreamsBuilder();

        String inputTopic = config.getString("input.topic.name");
        String outputTopic = config.getString("output.topic.name");

        Produced<Windowed<String>, Long> producedCount = Produced
                .with(timeWindowedSerdeFrom(String.class), Serdes.Long());

        Consumed<String, PressureAlert> consumedPressure = Consumed
                .with(Serdes.String(), pressureSerde)
                .withTimestampExtractor(new PressureDatetimeExtractor(config));

        Grouped<String, PressureAlert> groupedPressure = Grouped.with(Serdes.String(), pressureSerde);

        builder

                .stream(inputTopic, consumedPressure)

                .selectKey((key, value) -> value.getId())

                .groupByKey(groupedPressure)

                .windowedBy(windows)

                .count()

                .suppress(Suppressed.untilWindowCloses(unbounded()))

                .toStream()

                .to(outputTopic, producedCount);

        return builder.build();
    }

    public static void main(String[] args) {

        final Config config = ConfigFactory.load();

        final Properties properties = buildProperties(config);

        Map<String, Object> serdeConfig =
                singletonMap(SCHEMA_REGISTRY_URL_CONFIG, config.getString("schema.registry.url"));

        SpecificAvroSerde<PressureAlert> pressureSerde = new SpecificAvroSerde<>();

        pressureSerde.configure(serdeConfig, false);

        TimeWindows windows = TimeWindows

                .of(config.getDuration("window.size"))

                .advanceBy(config.getDuration("window.size"))

                .grace(config.getDuration("window.grace.period"));

        Topology topology = buildTopology(config, windows, pressureSerde);

        logger.debug(topology.describe().toString());

        final KafkaStreams streams = new KafkaStreams(topology, properties);

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

        streams.cleanUp();
        streams.start();
    }
}

Here are several notes about the WindowFinalResult#buildTopology function:

  • To consume events, we create a SpecificAvroSerde based on the generated source code in part 3.

  • The serde used to produce aggregated result is a windowed serde. It will store the key but also the window start time.

  • Our custom timestamp extractor is added thank to the Consumed#withTimestampExtractor method.

Then we stream, selectKey and groupByKey and finally apply the Suppress operator.

The suppress operator will delete every intermediate change once the grace period is over. By doing so it will also emit the final result

Note: even after suppress operator applied, you will need the next event to advance the stream time and get your result.

7
Compile and run the Kafka Streams program

In a new terminal, use the run gradle task to start the main class LastWindowEvent.

./gradlew run

Note: this will apply the topic creation step and schema publication step before running the app.

Alternatively, you may also build a jar archive and run it with a java command. If you do, don’t forget to create the topics first.

./gradlew shadowJar
java -cp build/libs/kstreams-window-final-result*.jar io.confluent.developer.helper.TopicCreation
java -cp build/libs/kstreams-window-final-result*.jar io.confluent.developer.helper.SchemaPublication
java -jar build/libs/kstreams-window-final-result*.jar #-Dconfig.file=./any-other-conf-file.properties
# OR
# APP_ID=LOCAL_DEV_APP_ID java -jar build/libs/kstreams-window-final-result*.jar

8
Produce events to the input topic

Now we want to send sensor events in a more convenient way to focus on our aggregation result.

In a new terminal, define a produce function and tail the content of temporary file.

set +m
function produce () { echo $1 | docker exec -i schema-registry /usr/bin/kafka-avro-console-producer --broker-list broker:9092 --topic input-topic --property value.schema="$(< src/main/avro/pressure-alert.avsc)" & }

Then, we call the function by passing the correct JSON payload.

produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
# {"id":"101","datetime":"2019-09-17T01:22:15.+0200","pressure":30}

Send multiple events

produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"102","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"101","datetime":"'$(date -v-10S +%FT%T.%z)'","pressure":30}' # late of 10 sec
produce '{"id":"101","datetime":"'$(date -v-15S +%FT%T.%z)'","pressure":30}' # late of 15 sec
produce '{"id":"101","datetime":"'$(date -v-60S +%FT%T.%z)'","pressure":30}' # late of 01 min
produce '{"id":"102","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"102","datetime":"'$(date -v-60S +%FT%T.%z)'","pressure":30}' # out of the grace period
export TZ=Asia/Tokyo
produce '{"id":"301","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"301","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"XXX","datetime":"'$(date +%FT%T.%z)'","pressure":30}'

You may also consume the input topic to see what are the sensors dates:

docker exec -it schema-registry /usr/bin/kafka-avro-console-consumer --topic input-topic --bootstrap-server broker:9092 --from-beginning

For example:

# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"102","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:07.+0200","pressure":30} # late
# {"id":"101","datetime":"2019-09-21T05:44:13.+0200","pressure":30} # out of time
# {"id":"102","datetime":"2019-09-21T05:45:13.+0200","pressure":30} # new window
# {"id":"102","datetime":"2019-09-21T05:43:23.+0200","pressure":30} # out of time
# {"id":"301","datetime":"2019-09-21T12:45:23.+0900","pressure":30} # different time zone
# {"id":"301","datetime":"2019-09-21T12:45:24.+0900","pressure":30} # different time zone
# {"id":"XXX","datetime":"2019-09-21T06:00:00.+0200","pressure":30}

9
Consume events from the output topic

Trying to consume the windows start serialised is a bit difficult, so the tutorial comes with a consumer that you can use as a black box to explore the output of the streaming application. In the helper package add the class ResultConsumer:

package io.confluent.developer.helper;


import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.scaladsl.Sink;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Windowed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;

import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.UUID;

import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;

public class ResultConsumer {

    private static final Logger logger = LoggerFactory.getLogger(ResultConsumer.class);

    public static void main(String[] args) {

        final Config config = ConfigFactory.load();
        final String outputTopic = config.getString("output.topic.name");

        final ActorSystem system = ActorSystem.create();
        final Materializer materializer = ActorMaterializer.create(system);

        final ConsumerSettings<Windowed<String>, Long> consumerSettings =
                ConsumerSettings
                        .create(
                                system,
                                timeWindowedSerdeFrom(
                                        String.class,
                                        config.getDuration("window.size").toMillis()
                                ).deserializer(),
                                Serdes.Long().deserializer()
                        )
                        .withGroupId(UUID.randomUUID().toString())
                        .withBootstrapServers(config.getString("bootstrap.servers"))
                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        Consumer.plainSource(
                consumerSettings,
                Subscriptions.topics(outputTopic))
                .to(Sink.foreach((record) -> {
                            logger.info(printWindowedKey(config, record));
                            return BoxedUnit.UNIT;
                        })
                ).run(materializer);

    }

    private static String printWindowedKey(Config config, ConsumerRecord<Windowed<String>, Long> windowedKeyValue) {

        return String.format("Count = %s for Key = %s, at window [%s-%s] %s (%s)",
                windowedKeyValue.value().toString(),
                windowedKeyValue.key().key(),
                DateTimeFormatter
                        .ofPattern("HH:mm:ss")
                        .withLocale(Locale.getDefault())
                        .withZone(ZoneId.systemDefault())
                        .format(windowedKeyValue.key().window().startTime()),
                DateTimeFormatter
                        .ofPattern("HH:mm:ss")
                        .withLocale(Locale.getDefault())
                        .withZone(ZoneId.systemDefault())
                        .format(windowedKeyValue.key().window().endTime()),
                DateTimeFormatter
                        .ofPattern(config.getString("local.date.pattern"))
                        .withLocale(Locale.forLanguageTag(config.getString("local.date.lang")))
                        .withZone(ZoneId.systemDefault())
                        .format(windowedKeyValue.key().window().startTime()),
                ZoneId.systemDefault().getId()
        );
    }
}

This consumer will only format and log the messages it gets. It also has its own tasks

./gradlew consumeResult

At the end, you should be able to see the output count for the key 101 and the key 102.

2019-09-21 05:46:03 [...] Count = 5 for Key = 101, at window [05:45:00-05:45:10] Sat 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 1 for Key = 102, at window [05:45:00-05:45:10] Sam. 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 1 for Key = 102, at window [05:45:10-05:45:20] Sam. 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 2 for Key = 301, at window [05:45:20-05:45:30] Sam. 21 Sep 2019 (Europe/Paris)

Here the logging time match the time of the latest result: 05:46:03. This latest result for the sensor 000 advance the stream time and a final result gets produced for all window having a terminated grace period. Hours are printed in the default system time zone. So it was between 05:45:20 and 05:45:30 for me when the sensor 301 experienced 2 pressure alerts. To start investigate on what happened, I would need the time zone of that sensor.

tldr;

Test it

1
Create a test configuration file

First, create a directory for the test configuration:

mkdir -p src/test/resources

Then, create a test file configuration named test.properties at src/test/resources:

application.id=final-results-tutorial-test
bootstrap.servers=notused:9092
schema.registry.url=mock://final-results-tutorial-test:8081

window.size=10 seconds
window.grace.period=20 seconds

sensor.datetime.pattern=yyyy-MM-dd'T'HH:mm:ss.Z

local.date.lang=fr
local.date.pattern=EEE d MMM yyyy # date only

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

2
Write a test

Then, create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Create the following test file at src/test/java/io/confluent/developer/WindowFinalResultTest.java:

package io.confluent.developer;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class WindowFinalResultTest {

  private TopologyTestDriver testDriver;
  private TestOutputTopic<Windowed<String>, Long> testOutputTopic;
  private SpecificAvroSerde<PressureAlert> pressureSerde;

  private final Config config = ConfigFactory.load("test.properties");

  private final String inputTopic = this.config.getString("input.topic.name");
  private final String outputTopic = this.config.getString("output.topic.name");

  private final Duration testWindowSize = config.getDuration("window.size");
  private final Duration testGracePeriodSize = config.getDuration("window.grace.period");
  private final Serde<Windowed<String>> keyResultSerde = timeWindowedSerdeFrom(String.class, testWindowSize.toMillis());

  private TimeWindows makeFixedTimeWindow() {
    return TimeWindows.of(testWindowSize).advanceBy(testWindowSize).grace(testGracePeriodSize);
  }

  private SpecificAvroSerde<PressureAlert> makePressureAlertSerde() {

    Map<String, String> schemaRegistryConfigMap = Collections.singletonMap(
        SCHEMA_REGISTRY_URL_CONFIG,
        config.getString(SCHEMA_REGISTRY_URL_CONFIG)
    );

    SpecificAvroSerde<PressureAlert> serde = new SpecificAvroSerde<>();
    serde.configure(schemaRegistryConfigMap, false);

    return serde;
  }

  private List<TestRecord<Windowed<String>, Long>> readAtLeastNOutputs(int size) {
    final List<TestRecord<Windowed<String>, Long>> testRecords = testOutputTopic.readRecordsToList();
    assertThat(testRecords.size(), equalTo(size));

    return testRecords;
  }

  @Before
  public void setUp() {
    this.pressureSerde = makePressureAlertSerde();
    Topology topology = WindowFinalResult.buildTopology(config, makeFixedTimeWindow(), this.pressureSerde);
    this.testDriver = new TopologyTestDriver(topology, WindowFinalResult.buildProperties(config));
    this.testOutputTopic =
        testDriver.createOutputTopic(outputTopic, this.keyResultSerde.deserializer(), Serdes.Long().deserializer());
  }

  @After
  public void tearDown() {
    testDriver.close();
  }

  @Test
  public void topologyShouldGroupOverDatetimeWindows() {
    final TestInputTopic<Bytes, PressureAlert>
        testDriverInputTopic =
        testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());

    List<PressureAlert> inputs = Arrays.asList(
        new PressureAlert("101", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:45:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:45:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:55:10.+0200", Integer.MAX_VALUE),
        // ONE LAST EVENT TO TRIGGER TO MOVE THE STREAMING TIME
        new PressureAlert("XXX", "2019-09-21T05:55:40.+0200", Integer.MAX_VALUE)
    );

    inputs.forEach(pressureAlert ->
                       testDriverInputTopic.pipeInput(null, pressureAlert)
    );

    List<TestRecord<Windowed<String>, Long>> result = readAtLeastNOutputs(3);

    Optional<TestRecord<Windowed<String>, Long>> resultOne = result
        .stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569036600000L).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultTwo = result
        .stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569037500000L).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultThree = result
        .stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569038110000L).findAny();

    assertTrue(resultOne.isPresent());
    assertTrue(resultTwo.isPresent());
    assertTrue(resultThree.isPresent());

    assertEquals(3L, resultOne.get().value().longValue());
    assertEquals(2L, resultTwo.get().value().longValue());
    assertEquals(1L, resultThree.get().value().longValue());

    result.forEach((element) ->
                       assertEquals(
                           makeFixedTimeWindow().size(),
                           element.key().window().end() - element.key().window().start()
                       )
    );
  }

  @Test
  public void topologyShouldGroupById() {

    final TestInputTopic<Bytes, PressureAlert>
        testDriverInputTopic =
        testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());

    List<PressureAlert> inputs = Arrays.asList(
        new PressureAlert("101", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("101", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("102", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("102", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("102", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        new PressureAlert("103", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
        new PressureAlert("103", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
        new PressureAlert("103", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
        // ONE LAST EVENT TO TRIGGER TO MOVE THE STREAMING TIME
        new PressureAlert("XXX", "2019-09-21T05:55:41.+0200", Integer.MAX_VALUE)
    );

    inputs.forEach(pressureAlert ->
                       testDriverInputTopic.pipeInput(null, pressureAlert)
    );

    List<TestRecord<Windowed<String>, Long>> result = readAtLeastNOutputs(3);

    Optional<TestRecord<Windowed<String>, Long>> resultOne =
        result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("101")).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultTwo =
        result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("102")).findAny();
    Optional<TestRecord<Windowed<String>, Long>> resultThree =
        result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("103")).findAny();

    assertTrue(resultOne.isPresent());
    assertTrue(resultTwo.isPresent());
    assertTrue(resultThree.isPresent());

    assertEquals(3L, resultOne.get().value().longValue());
    assertEquals(3L, resultTwo.get().value().longValue());
    assertEquals(3L, resultThree.get().value().longValue());

    //Assert.assertNull(readNext());
  }
}

This class tests the following things:

  1. The topology groups element over the datetime property

  2. The topology outputs a message for each window

  3. The topology outputs the correct count

  4. Duration between the window start and end corresponds to the window passed in argument

  5. The topology also uses the id property of the sensors to group events

  6. The topology only outputs one element per window

Additionally, a separate test for the timestamp extractor can create at src/test/java/io/confluent/developer/PressureDatetimeExtractorTest.java:

package io.confluent.developer;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static java.util.Collections.singletonMap;

public class PressureDatetimeExtractorTest {

    private TopologyTestDriver testDriver;
    private SpecificAvroSerde<PressureAlert> pressureSerde;

    private final Config config = ConfigFactory.load("test.properties");

    private final String inputTopic = this.config.getString("input.topic.name");
    private final String outputTopic = this.config.getString("output.topic.name");

    private final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
            .appendPattern(this.config.getString("sensor.datetime.pattern"))
            .toFormatter();

    private final PressureDatetimeExtractor timestampExtractor = new PressureDatetimeExtractor(config);
    private TestOutputTopic<String, PressureAlert> testDriverOutputTopic;

    private SpecificAvroSerde<PressureAlert> makePressureAlertSerde() {

        Map<String, String> schemaRegistryConfigMap = singletonMap(
            SCHEMA_REGISTRY_URL_CONFIG,
            config.getString(SCHEMA_REGISTRY_URL_CONFIG)
        );

        SpecificAvroSerde<PressureAlert> serde = new SpecificAvroSerde<>();
        serde.configure(schemaRegistryConfigMap, false);
        return serde;
    }

    private List<TestRecord<String, PressureAlert>> readNOutputs(int size) {
        return testDriverOutputTopic.readRecordsToList();
    }

    @Before
    public void setUp() {
        this.pressureSerde = makePressureAlertSerde();

        Consumed<String, PressureAlert> consumedPressure =
            Consumed.with(Serdes.String(), pressureSerde)
                .withTimestampExtractor(timestampExtractor);

        Produced<String, PressureAlert> producedPressure =
            Produced.with(Serdes.String(), pressureSerde);

        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(this.inputTopic, consumedPressure).to(this.outputTopic, producedPressure);

        this.testDriver = new TopologyTestDriver(builder.build(), WindowFinalResult.buildProperties(config));
        this.testDriverOutputTopic =
            testDriver
                .createOutputTopic(this.outputTopic, Serdes.String().deserializer(), this.pressureSerde.deserializer());
    }

    @After
    public void tearDown() {
        testDriver.close();
    }

    @Test
    public void extract() {

        final TestInputTopic<Bytes, PressureAlert>
            testDriverInputTopic =
            testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());
        List<PressureAlert> inputs = Arrays.asList(
                new PressureAlert("101", "2019-09-21T05:25:01.+0200", Integer.MAX_VALUE),
                new PressureAlert("102", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
                new PressureAlert("103", "2019-09-21T05:45:03.+0200", Integer.MAX_VALUE),
                new PressureAlert("104", "DEFINITELY-NOT-PARSABLE!!", Integer.MAX_VALUE),
                new PressureAlert("105", "1500-06-24T09:11:03.+0200", Integer.MAX_VALUE)
        );

        inputs.forEach(pressureAlert ->
                           testDriverInputTopic.pipeInput(null, pressureAlert)
        );

        List<TestRecord<String, PressureAlert>> result = readNOutputs(5);

        Optional<TestRecord<String, PressureAlert>> resultOne =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("101")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultTwo =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("102")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultThree =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("103")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultFour =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("104")).findFirst();
        Optional<TestRecord<String, PressureAlert>> resultFive =
                result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("105")).findFirst();

        Assert.assertTrue(resultOne.isPresent());
        Assert.assertTrue(resultTwo.isPresent());
        Assert.assertTrue(resultThree.isPresent());
        Assert.assertFalse(resultFour.isPresent());
        Assert.assertFalse(resultFive.isPresent());

        Assert.assertEquals(
                formatter.parse("2019-09-21T05:25:01.+0200", Instant::from).toEpochMilli(),
                resultOne.get().timestamp().longValue()
        );

        Assert.assertEquals(
                formatter.parse("2019-09-21T05:30:02.+0200", Instant::from).toEpochMilli(),
                resultTwo.get().timestamp().longValue()
        );

        Assert.assertEquals(
                formatter.parse("2019-09-21T05:45:03.+0200", Instant::from).toEpochMilli(),
                resultThree.get().timestamp().longValue()
        );
    }
}

3
Invoke the tests

Now run the test, which is as simple as:

./gradlew test

Take it to production

1
Create a production configuration file

First, create a directory for the configuration:

mkdir configuration

Then, create a new configuration file at configuration/prod.properties with the following content. Be sure to fill in the addresses of your production hosts and change any other parameters that make sense for your setup.

application.id=final-results-tutorial-prod
bootstrap.servers=broker:9092
schema.registry.url=http://schema-registry:8081

window.size=10 seconds
window.grace.period=20 seconds

sensor.datetime.pattern=yyyy-MM-dd'T'HH:mm:ss.Z

local.lang=fr
local.pattern=EEE d MMM yyyy # date only

input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1

output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1

But, remember that the application has also a default configuration file and you could just replace some of the configuration by defining the corresponding environment variables. Here is the full list of variables:

Variable Description

APP_ID

id of app used as Kafka streams application Id

BOOTSTRAP_SERVERS

host:port of your Kafka broker

SCHEMA_REGISTRY_URL

host:port of your schema registry

INPUT_TOPIC

name of the input topic

INPUT_TOPIC_PARTITIONS

number of partition of the input topic

INPUT_TOPIC_REPLICATION

replication factor of the input topic

OUTPUT_TOPIC

name of the output topic

OUTPUT_TOPIC_PARTITIONS

number of partition of the output topic

OUTPUT_TOPIC_REPLICATION

replication factor of the output topic

WINDOW_SIZE

size of the aggregation window

GRACE_PERIOD

how long would you like to wait for late data point ??

2
Build a Docker image

In your terminal, execute the following command to invoke the Jib plugin to build a docker image:

gradle jibDockerBuild --image=io.confluent.developer/kstreams-window-final-result:0.0.1-SNAPSHOT

3
Launch the container

Finally, launch the container using your preferred container orchestration service.

If you want to run it locally and pass your custom configuration file, you can execute the following:

docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/kstreams-window-final-result:0.0.1-SNAPSHOT -Dconfig.file=/config.properties

Or, just run the container and set the environment variables you’d like to change:

docker run\
 -e "BOOTSTRAP_SERVERS=broker:9092"\
 -e "SCHEMA_REGISTRY_URL=http://schema-registry:8081"\
 io.confluent.developer/kstreams-window-final-result:0.0.1-SNAPSHOT

Deploy on Confluent Cloud

1
Run your app to Confluent Cloud

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

First, create your Kafka cluster in Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage (details).

Next, from the Confluent Cloud UI, click on Tools & client config to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.

Now you’re all set to your run application locally while your Kafka topics and stream processing is backed to your Confluent Cloud instance.