How can I count the number of messages in a Kafka topic, per key over a time window, getting a final result that takes into account late arrivals?
Consider a topic with events that represent sensor warnings (pressure on robotic arms). One warning per time slot is fine, but you don't want to have too much warnings at the same time. In this tutorial, we'll write a program that counts the messages of a same sensor and sends a result at the end of the window.
To get started, make a new directory anywhere you’d like for this project:
mkdir window-final-result && cd window-final-result
Next, create the following docker-compose.yml
file to obtain Confluent Platform:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:6.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN
And launch it by running:
docker-compose up -d
Create the following Gradle build file, named build.gradle
for the project:
plugins {
id "java"
id "application"
id "com.google.cloud.tools.jib" version "2.8.0"
id "com.github.johnrengelman.shadow" version "6.1.0"
id "com.commercehub.gradle.plugin.avro" version "0.99.99"
}
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1-SNAPSHOT"
mainClassName = "io.confluent.developer.WindowFinalResult"
repositories {
jcenter()
maven {
url "https://packages.confluent.io/maven"
}
}
dependencies {
implementation "org.apache.avro:avro:1.10.1"
implementation group: 'com.typesafe', name: 'config', version: '1.4.1'
implementation group: 'ch.qos.logback', name: 'logback-classic', version: '1.2.3'
implementation group: 'org.apache.kafka', name: 'kafka-streams', version: '2.5.0'
implementation group: 'io.confluent', name: 'kafka-streams-avro-serde', version: '5.5.1'
testImplementation "junit:junit:4.13.2"
testImplementation "org.apache.kafka:kafka-streams-test-utils:2.5.0"
testImplementation "com.github.grantwest.eventually:hamcrest-eventually-matchers:0.0.3"
// helpers
implementation group: 'com.jason-goodwin', name: 'better-monads', version: '0.4.1'
implementation group: 'com.typesafe.akka', name: 'akka-stream-kafka_2.13', version: '1.0.5'
}
shadowJar {
archiveBaseName = "kstreams-${rootProject.name}"
archiveClassifier = ''
}
task createTopics(type: JavaExec) {
main = 'io.confluent.developer.helper.TopicCreation'
classpath = sourceSets.main.runtimeClasspath
}
task publishSchemas(type: JavaExec) {
main = 'io.confluent.developer.helper.SchemaPublication'
classpath = sourceSets.main.runtimeClasspath
}
task consumeResult(type: JavaExec) {
main = 'io.confluent.developer.helper.ResultConsumer'
classpath = sourceSets.main.runtimeClasspath
}
run.dependsOn {
[createTopics, publishSchemas]
}
jib {
container.mainClass = mainClassName
}
test {
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}
Note: In addition to our main class, this tutorial brings two Java executions responsible for creating the topics and schemas. In a real life application, these may be outside your project.
Create the following Gradle settings file, named settings.gradle
for the project:
pluginManagement {
repositories {
gradlePluginPortal()
jcenter()
maven {
name "JCenter Gradle Plugins"
url "https://dl.bintray.com/gradle/gradle-plugins"
}
}
}
rootProject.name = 'window-final-result'
Run the following command to obtain the Gradle wrapper:
gradle wrapper
Create a directory for the project resources:
mkdir -p src/main/resources
Add the config file src/main/resources/application.conf
to setup your application:
application.id: "final-results-tutorial"
application.id: ${?APP_ID}
bootstrap.servers: "localhost:29092"
bootstrap.servers: ${?BOOTSTRAP_SERVERS}
schema.registry.url: "http://localhost:8081"
schema.registry.url: ${?SCHEMA_REGISTRY_URL}
window {
size: 10 seconds
size: ${?WINDOW_SIZE}
grace.period: 20 seconds
grace.period: ${?GRACE_PERIOD}
}
# you may play with the pattern, but ALWAYS include the Zone Offset (Z)!
# It is used to create a java.time.ZonedDateTime by parsing the event in the value message
sensor.datetime.pattern: "yyyy-MM-dd'T'HH:mm:ss.Z"
# adapt this part with YOUR preferd or location, It is used to diplay the result
local.date {
lang: "fr"
pattern: "EEE d MMM yyyy" # date only
}
input.topic {
name: "input-topic"
name: ${?INPUT_TOPIC}
partitions: 1
partitions: ${?INPUT_TOPIC_PARTITIONS}
replication.factor: 1
replication.factor: ${?INPUT_TOPIC_REPLICATION}
}
output.topic {
name: "output-topic"
name: ${?OUTPUT_TOPIC}
partitions: 1
partitions: ${?OUTPUT_TOPIC_PARTITIONS}
replication.factor: 1
replication.factor: ${?OUTPUT_TOPIC_REPLICATION}
}
You may want to adapt the blocks local.date.lang
and local.date.pattern
This file contains default configs. In production they will be overridden by environment variables.
Add the logging configuration in the file: src/main/resources/logback.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="KSTREAMS" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
%yellow(%d{yyyy-MM-dd HH:mm:ss}) %cyan(${HOSTNAME}) %highlight([%p]) %green((%file:%line\)) - %msg%n
</pattern>
</encoder>
</appender>
<appender name="CONSUMER" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>
%yellow(%d{yyyy-MM-dd HH:mm:ss}) %highlight([%p]) %magenta((%file:%line\)) - %msg%n
</pattern>
</encoder>
</appender>
<logger name="io.confluent.developer.helper.ResultConsumer" level="DEBUG" additivity="false">
<appender-ref ref="CONSUMER" />
</logger>
<logger name="io.confluent.developer" level="DEBUG" additivity="false">
<appender-ref ref="KSTREAMS" />
</logger>
<root level="WARN">
<appender-ref ref="KSTREAMS" />
</root>
</configuration>
Create a directory for the pressure event schemas:
mkdir -p src/main/avro
Then create the following Avro schema file at src/main/avro/pressure-alert.avsc
for the publication events:
{
"type": "record",
"name": "PressureAlert",
"namespace": "io.confluent.developer.avro",
"doc": "Object used for the recipe: Last Window Result",
"fields": [
{
"name": "id",
"type": "string",
"doc": "Id of a robotic arm sensor"
},
{
"name": "datetime",
"type": "string",
"doc": "Event time of a pressure alert"
},
{
"name": "pressure",
"type": "int",
"doc": "Actual pressure level in Pascal (Pa), yeah metric system rules!"
}
]
}
Because this Avro schema is used in the Java code, it needs to compile it. Run the following:
./gradlew build
Topic creation and avro schema declaration are often part of an external process. For the sake of clarity in this tutorial, we won’t include these steps as part of the main application, but isolate theme in a dedicated package.
Create a directory for the package helper:
mkdir -p src/main/java/io/confluent/developer/helper
Add the following class at src/main/java/io/confluent/developer/helper/TopicCreation.java
package
package io.confluent.developer.helper;
import com.jasongoodwin.monads.Try;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.errors.TopicExistsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class TopicCreation {
private static final Logger logger = LoggerFactory.getLogger(TopicCreation.class);
public static void main(String[] args) {
Config config = ConfigFactory.load();
Properties properties = new Properties();
properties.put("bootstrap.servers", config.getString("bootstrap.servers"));
AdminClient client = AdminClient.create(properties);
HashMap<String, NewTopic> topics = new HashMap<>();
topics.put(
config.getString("input.topic.name"),
new NewTopic(
config.getString("input.topic.name"),
config.getNumber("input.topic.partitions").intValue(),
config.getNumber("input.topic.replication.factor").shortValue())
);
topics.put(
config.getString("output.topic.name"),
new NewTopic(
config.getString("output.topic.name"),
config.getNumber("output.topic.partitions").intValue(),
config.getNumber("output.topic.replication.factor").shortValue())
);
try {
logger.info("Starting the topics creation");
CreateTopicsResult result = client.createTopics(topics.values());
result.values().forEach((topicName, future) -> {
NewTopic topic = topics.get(topicName);
future.whenComplete((aVoid, maybeError) ->
Optional
.ofNullable(maybeError)
.map(Try::<Void>failure)
.orElse(Try.successful(null))
.onFailure(throwable -> logger.error("Topic creation didn't complete:", throwable))
.onSuccess((anOtherVoid) -> logger.info(
String.format(
"Topic %s, has been successfully created " +
"with %s partitions and replicated %s times",
topic.name(),
topic.numPartitions(),
topic.replicationFactor() - 1
)
)));
});
result.all().get();
} catch (InterruptedException | ExecutionException e) {
if (!(e.getCause() instanceof TopicExistsException)) e.printStackTrace();
} finally {
client.close();
}
}
}
Add the following class in the src/main/java/io/confluent/developer/helper/SchemaPublication.java
package
package io.confluent.developer.helper;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
public class SchemaPublication {
private static final Logger logger = LoggerFactory.getLogger(SchemaPublication.class);
public static void main(String[] args) {
Config config = ConfigFactory.load();
String registryUrl = config.getString("schema.registry.url");
CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(registryUrl, 10);
try {
logger.info(String.format("Schemas publication at: %s", registryUrl));
schemaRegistryClient.register(
String.format("%s-value", config.getString("input.topic.name")),
new AvroSchema(PressureAlert.SCHEMA$)
);
} catch (IOException | RestClientException e) {
e.printStackTrace();
}
}
}
Now the topics can be created separately with the following command.
./gradlew createTopics
Same thing for the schemas.
./gradlew publishSchemas
Check the build.gradle
again. You will find the tasks declared as JavaExec
with a main class corresponding to the
two last files
There are multiple timing perspectives to consider, and each event may arrive from a different time zone.
Event time, time of the sensor that is different rather it comes from Paris (UTC+02:00) or Tokyo (UTC+19:00)
Processing time, the time of the Kafka Stream instances. Here the zone depends of your deployment (e.g., your fancy managed kubernetes cluster deployed in us-west-b :p)
Ingestion time, less relevant, this is the time when the Kafka message has been published
Since our operations will be time based, you need to ensure the right time is considered. In this example, our data
producer is not aware of message timestamp and places the time of the alert in the message value. We need to extract
it from there. This can be performed by implementing a
TimestampExtractor
.
Add the next class at src/main/java/io/confluent/developer/PressureDatetimeExtractor.java
package.
package io.confluent.developer;
import com.jasongoodwin.monads.Try;
import com.typesafe.config.Config;
import io.confluent.developer.avro.PressureAlert;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
public class PressureDatetimeExtractor implements TimestampExtractor {
private final DateTimeFormatter formatter;
private static final Logger logger = LoggerFactory.getLogger(TimestampExtractor.class);
public PressureDatetimeExtractor(Config config) {
this.formatter = DateTimeFormatter.ofPattern(config.getString("sensor.datetime.pattern"));
}
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
return Try
.ofFailable(() -> ((PressureAlert) record.value()).getDatetime())
.onFailure((ex) -> logger.error("fail to cast the PressureAlert: ", ex))
.map((stringDatetimeString) -> ZonedDateTime.parse(stringDatetimeString, this.formatter))
.onFailure((ex) -> logger.error("fail to parse the event datetime due to: ", ex))
.map((zonedDatetime) -> zonedDatetime.toInstant().toEpochMilli())
.onFailure((ex) -> logger.error("fail to convert the datetime to instant due to: ", ex))
.orElse(-1L);
}
}
Ok, lets translate this extract
method from Java to English. First of all, we try to realise the following operation
that may raise an exception:
we cast the value Object as PressureAlert
and call its .getDatetime
method
then we parse the string datetime base on the defined pattern
then we convert it as Instant
, in case the kafka message suffer from jet lag
and get the epoch in milliseconds
If one this steps fail we will log the error and set the timestamp to a negative number, so it will silently ignored.
In the main function we create time-based windows
with a given size and the same step size. This results in non-overlapping windows called
Tumbling Windows.
Also we add a extra period were even if messages come late, if their datetime key correspond to a window they may join
the window. Finally we pass this window, to a function that takes also StreamsBuilder
and return a Topology
.
Add the next class at src/main/java/io/confluent/developer/WindowFinalResult.java
package.
package io.confluent.developer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Properties;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
public class WindowFinalResult {
private static final Logger logger = LoggerFactory.getLogger(WindowFinalResult.class);
public static Properties buildProperties(Config config) {
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString("bootstrap.servers"));
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, config.getString("application.id"));
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
return properties;
}
public static Topology buildTopology(Config config,
TimeWindows windows,
SpecificAvroSerde<PressureAlert> pressureSerde) {
StreamsBuilder builder = new StreamsBuilder();
String inputTopic = config.getString("input.topic.name");
String outputTopic = config.getString("output.topic.name");
Produced<Windowed<String>, Long> producedCount = Produced
.with(timeWindowedSerdeFrom(String.class), Serdes.Long());
Consumed<String, PressureAlert> consumedPressure = Consumed
.with(Serdes.String(), pressureSerde)
.withTimestampExtractor(new PressureDatetimeExtractor(config));
Grouped<String, PressureAlert> groupedPressure = Grouped.with(Serdes.String(), pressureSerde);
builder
.stream(inputTopic, consumedPressure)
.selectKey((key, value) -> value.getId())
.groupByKey(groupedPressure)
.windowedBy(windows)
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.to(outputTopic, producedCount);
return builder.build();
}
public static void main(String[] args) {
final Config config = ConfigFactory.load();
final Properties properties = buildProperties(config);
Map<String, Object> serdeConfig =
singletonMap(SCHEMA_REGISTRY_URL_CONFIG, config.getString("schema.registry.url"));
SpecificAvroSerde<PressureAlert> pressureSerde = new SpecificAvroSerde<>();
pressureSerde.configure(serdeConfig, false);
TimeWindows windows = TimeWindows
.of(config.getDuration("window.size"))
.advanceBy(config.getDuration("window.size"))
.grace(config.getDuration("window.grace.period"));
Topology topology = buildTopology(config, windows, pressureSerde);
logger.debug(topology.describe().toString());
final KafkaStreams streams = new KafkaStreams(topology, properties);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
streams.cleanUp();
streams.start();
}
}
Here are several notes about the WindowFinalResult#buildTopology
function:
To consume events, we create a SpecificAvroSerde
based on the generated source code in part 3.
The serde used to produce aggregated result is a windowed serde. It will store the key but also the window start time.
Our custom timestamp extractor is added thank to the Consumed#withTimestampExtractor
method.
Then we stream, selectKey and groupByKey and finally apply the Suppress operator.
The suppress operator will delete every intermediate change once the grace period is over. By doing so it will also emit the final result
Note: even after suppress operator applied, you will need the next event to advance the stream time and get your result.
In a new terminal, use the run
gradle task to start the main class LastWindowEvent
.
./gradlew run
Note: this will apply the topic creation step and schema publication step before running the app.
Alternatively, you may also build a jar archive and run it with a java command. If you do, don’t forget to create the topics first.
./gradlew shadowJar
java -cp build/libs/kstreams-window-final-result*.jar io.confluent.developer.helper.TopicCreation
java -cp build/libs/kstreams-window-final-result*.jar io.confluent.developer.helper.SchemaPublication
java -jar build/libs/kstreams-window-final-result*.jar #-Dconfig.file=./any-other-conf-file.properties
# OR
# APP_ID=LOCAL_DEV_APP_ID java -jar build/libs/kstreams-window-final-result*.jar
Now we want to send sensor events in a more convenient way to focus on our aggregation result.
In a new terminal, define a produce function and tail the content of temporary file.
set +m
function produce () { echo $1 | docker exec -i schema-registry /usr/bin/kafka-avro-console-producer --bootstrap-server broker:9092 --topic input-topic --property value.schema="$(< src/main/avro/pressure-alert.avsc)" & }
Then, we call the function by passing the correct JSON payload.
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
# {"id":"101","datetime":"2019-09-17T01:22:15.+0200","pressure":30}
Send multiple events
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"101","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"102","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"101","datetime":"'$(date -v-10S +%FT%T.%z)'","pressure":30}' # late of 10 sec
produce '{"id":"101","datetime":"'$(date -v-15S +%FT%T.%z)'","pressure":30}' # late of 15 sec
produce '{"id":"101","datetime":"'$(date -v-60S +%FT%T.%z)'","pressure":30}' # late of 01 min
produce '{"id":"102","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"102","datetime":"'$(date -v-60S +%FT%T.%z)'","pressure":30}' # out of the grace period
export TZ=Asia/Tokyo
produce '{"id":"301","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
produce '{"id":"301","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
sleep 10
produce '{"id":"XXX","datetime":"'$(date +%FT%T.%z)'","pressure":30}'
You may also consume the input topic to see what are the sensors dates:
docker exec -it schema-registry /usr/bin/kafka-avro-console-consumer --topic input-topic --bootstrap-server broker:9092 --from-beginning
For example:
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"102","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:02.+0200","pressure":30}
# {"id":"101","datetime":"2019-09-21T05:45:07.+0200","pressure":30} # late
# {"id":"101","datetime":"2019-09-21T05:44:13.+0200","pressure":30} # out of time
# {"id":"102","datetime":"2019-09-21T05:45:13.+0200","pressure":30} # new window
# {"id":"102","datetime":"2019-09-21T05:43:23.+0200","pressure":30} # out of time
# {"id":"301","datetime":"2019-09-21T12:45:23.+0900","pressure":30} # different time zone
# {"id":"301","datetime":"2019-09-21T12:45:24.+0900","pressure":30} # different time zone
# {"id":"XXX","datetime":"2019-09-21T06:00:00.+0200","pressure":30}
Trying to consume the windows start serialised is a bit difficult, so the tutorial comes with a consumer that you can
use as a black box to explore the output of the streaming application. In the helper package add the class
ResultConsumer
:
package io.confluent.developer.helper;
import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.scaladsl.Sink;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Windowed;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.BoxedUnit;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Locale;
import java.util.UUID;
import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
public class ResultConsumer {
private static final Logger logger = LoggerFactory.getLogger(ResultConsumer.class);
public static void main(String[] args) {
final Config config = ConfigFactory.load();
final String outputTopic = config.getString("output.topic.name");
final ActorSystem system = ActorSystem.create();
final Materializer materializer = ActorMaterializer.create(system);
final ConsumerSettings<Windowed<String>, Long> consumerSettings =
ConsumerSettings
.create(
system,
timeWindowedSerdeFrom(
String.class,
config.getDuration("window.size").toMillis()
).deserializer(),
Serdes.Long().deserializer()
)
.withGroupId(UUID.randomUUID().toString())
.withBootstrapServers(config.getString("bootstrap.servers"))
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer.plainSource(
consumerSettings,
Subscriptions.topics(outputTopic))
.to(Sink.foreach((record) -> {
logger.info(printWindowedKey(config, record));
return BoxedUnit.UNIT;
})
).run(materializer);
}
private static String printWindowedKey(Config config, ConsumerRecord<Windowed<String>, Long> windowedKeyValue) {
return String.format("Count = %s for Key = %s, at window [%s-%s] %s (%s)",
windowedKeyValue.value().toString(),
windowedKeyValue.key().key(),
DateTimeFormatter
.ofPattern("HH:mm:ss")
.withLocale(Locale.getDefault())
.withZone(ZoneId.systemDefault())
.format(windowedKeyValue.key().window().startTime()),
DateTimeFormatter
.ofPattern("HH:mm:ss")
.withLocale(Locale.getDefault())
.withZone(ZoneId.systemDefault())
.format(windowedKeyValue.key().window().endTime()),
DateTimeFormatter
.ofPattern(config.getString("local.date.pattern"))
.withLocale(Locale.forLanguageTag(config.getString("local.date.lang")))
.withZone(ZoneId.systemDefault())
.format(windowedKeyValue.key().window().startTime()),
ZoneId.systemDefault().getId()
);
}
}
This consumer will only format and log the messages it gets. It also has its own tasks
./gradlew consumeResult
At the end, you should be able to see the output count for the key 101 and the key 102.
2019-09-21 05:46:03 [...] Count = 5 for Key = 101, at window [05:45:00-05:45:10] Sat 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 1 for Key = 102, at window [05:45:00-05:45:10] Sam. 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 1 for Key = 102, at window [05:45:10-05:45:20] Sam. 21 Sep 2019 (Europe/Paris)
2019-09-21 05:46:03 [...] Count = 2 for Key = 301, at window [05:45:20-05:45:30] Sam. 21 Sep 2019 (Europe/Paris)
Here the logging time match the time of the latest result: 05:46:03
. This latest result for the sensor 000 advance the
stream time and a final result gets produced for all window having a terminated grace period. Hours are printed in
the default system time zone. So it was between 05:45:20
and 05:45:30
for me when the sensor 301 experienced 2
pressure alerts. To start investigate on what happened, I would need the time zone of that sensor.
tldr;
First, create a directory for the test configuration:
mkdir -p src/test/resources
Then, create a test file configuration named test.properties
at src/test/resources
:
application.id=final-results-tutorial-test
bootstrap.servers=notused:9092
schema.registry.url=mock://final-results-tutorial-test:8081
window.size=10 seconds
window.grace.period=20 seconds
sensor.datetime.pattern=yyyy-MM-dd'T'HH:mm:ss.Z
local.date.lang=fr
local.date.pattern=EEE d MMM yyyy # date only
input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1
output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1
Then, create a directory for the tests to live in:
mkdir -p src/test/java/io/confluent/developer
Create the following test file at src/test/java/io/confluent/developer/WindowFinalResultTest.java
:
package io.confluent.developer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class WindowFinalResultTest {
private TopologyTestDriver testDriver;
private TestOutputTopic<Windowed<String>, Long> testOutputTopic;
private SpecificAvroSerde<PressureAlert> pressureSerde;
private final Config config = ConfigFactory.load("test.properties");
private final String inputTopic = this.config.getString("input.topic.name");
private final String outputTopic = this.config.getString("output.topic.name");
private final Duration testWindowSize = config.getDuration("window.size");
private final Duration testGracePeriodSize = config.getDuration("window.grace.period");
private final Serde<Windowed<String>> keyResultSerde = timeWindowedSerdeFrom(String.class, testWindowSize.toMillis());
private TimeWindows makeFixedTimeWindow() {
return TimeWindows.of(testWindowSize).advanceBy(testWindowSize).grace(testGracePeriodSize);
}
private SpecificAvroSerde<PressureAlert> makePressureAlertSerde() {
Map<String, String> schemaRegistryConfigMap = Collections.singletonMap(
SCHEMA_REGISTRY_URL_CONFIG,
config.getString(SCHEMA_REGISTRY_URL_CONFIG)
);
SpecificAvroSerde<PressureAlert> serde = new SpecificAvroSerde<>();
serde.configure(schemaRegistryConfigMap, false);
return serde;
}
private List<TestRecord<Windowed<String>, Long>> readAtLeastNOutputs(int size) {
final List<TestRecord<Windowed<String>, Long>> testRecords = testOutputTopic.readRecordsToList();
assertThat(testRecords.size(), equalTo(size));
return testRecords;
}
@Before
public void setUp() {
this.pressureSerde = makePressureAlertSerde();
Topology topology = WindowFinalResult.buildTopology(config, makeFixedTimeWindow(), this.pressureSerde);
this.testDriver = new TopologyTestDriver(topology, WindowFinalResult.buildProperties(config));
this.testOutputTopic =
testDriver.createOutputTopic(outputTopic, this.keyResultSerde.deserializer(), Serdes.Long().deserializer());
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void topologyShouldGroupOverDatetimeWindows() {
final TestInputTopic<Bytes, PressureAlert>
testDriverInputTopic =
testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());
List<PressureAlert> inputs = Arrays.asList(
new PressureAlert("101", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
new PressureAlert("101", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
new PressureAlert("101", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
new PressureAlert("101", "2019-09-21T05:45:01.+0200", Integer.MAX_VALUE),
new PressureAlert("101", "2019-09-21T05:45:03.+0200", Integer.MAX_VALUE),
new PressureAlert("101", "2019-09-21T05:55:10.+0200", Integer.MAX_VALUE),
// ONE LAST EVENT TO TRIGGER TO MOVE THE STREAMING TIME
new PressureAlert("XXX", "2019-09-21T05:55:40.+0200", Integer.MAX_VALUE)
);
inputs.forEach(pressureAlert ->
testDriverInputTopic.pipeInput(null, pressureAlert)
);
List<TestRecord<Windowed<String>, Long>> result = readAtLeastNOutputs(3);
Optional<TestRecord<Windowed<String>, Long>> resultOne = result
.stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569036600000L).findAny();
Optional<TestRecord<Windowed<String>, Long>> resultTwo = result
.stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569037500000L).findAny();
Optional<TestRecord<Windowed<String>, Long>> resultThree = result
.stream().filter(Objects::nonNull).filter(r -> r.key().window().start() == 1569038110000L).findAny();
assertTrue(resultOne.isPresent());
assertTrue(resultTwo.isPresent());
assertTrue(resultThree.isPresent());
assertEquals(3L, resultOne.get().value().longValue());
assertEquals(2L, resultTwo.get().value().longValue());
assertEquals(1L, resultThree.get().value().longValue());
result.forEach((element) ->
assertEquals(
makeFixedTimeWindow().size(),
element.key().window().end() - element.key().window().start()
)
);
}
@Test
public void topologyShouldGroupById() {
final TestInputTopic<Bytes, PressureAlert>
testDriverInputTopic =
testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());
List<PressureAlert> inputs = Arrays.asList(
new PressureAlert("101", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
new PressureAlert("101", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
new PressureAlert("101", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
new PressureAlert("102", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
new PressureAlert("102", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
new PressureAlert("102", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
new PressureAlert("103", "2019-09-21T05:30:01.+0200", Integer.MAX_VALUE),
new PressureAlert("103", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
new PressureAlert("103", "2019-09-21T05:30:03.+0200", Integer.MAX_VALUE),
// ONE LAST EVENT TO TRIGGER TO MOVE THE STREAMING TIME
new PressureAlert("XXX", "2019-09-21T05:55:41.+0200", Integer.MAX_VALUE)
);
inputs.forEach(pressureAlert ->
testDriverInputTopic.pipeInput(null, pressureAlert)
);
List<TestRecord<Windowed<String>, Long>> result = readAtLeastNOutputs(3);
Optional<TestRecord<Windowed<String>, Long>> resultOne =
result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("101")).findAny();
Optional<TestRecord<Windowed<String>, Long>> resultTwo =
result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("102")).findAny();
Optional<TestRecord<Windowed<String>, Long>> resultThree =
result.stream().filter(Objects::nonNull).filter(r -> r.key().key().equals("103")).findAny();
assertTrue(resultOne.isPresent());
assertTrue(resultTwo.isPresent());
assertTrue(resultThree.isPresent());
assertEquals(3L, resultOne.get().value().longValue());
assertEquals(3L, resultTwo.get().value().longValue());
assertEquals(3L, resultThree.get().value().longValue());
//Assert.assertNull(readNext());
}
}
This class tests the following things:
The topology groups element over the datetime
property
The topology outputs a message for each window
The topology outputs the correct count
Duration between the window start and end corresponds to the window passed in argument
The topology also uses the id
property of the sensors to group events
The topology only outputs one element per window
Additionally, a separate test for the timestamp extractor can create at src/test/java/io/confluent/developer/PressureDatetimeExtractorTest.java
:
package io.confluent.developer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import io.confluent.developer.avro.PressureAlert;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import static io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG;
import static java.util.Collections.singletonMap;
public class PressureDatetimeExtractorTest {
private TopologyTestDriver testDriver;
private SpecificAvroSerde<PressureAlert> pressureSerde;
private final Config config = ConfigFactory.load("test.properties");
private final String inputTopic = this.config.getString("input.topic.name");
private final String outputTopic = this.config.getString("output.topic.name");
private final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
.appendPattern(this.config.getString("sensor.datetime.pattern"))
.toFormatter();
private final PressureDatetimeExtractor timestampExtractor = new PressureDatetimeExtractor(config);
private TestOutputTopic<String, PressureAlert> testDriverOutputTopic;
private SpecificAvroSerde<PressureAlert> makePressureAlertSerde() {
Map<String, String> schemaRegistryConfigMap = singletonMap(
SCHEMA_REGISTRY_URL_CONFIG,
config.getString(SCHEMA_REGISTRY_URL_CONFIG)
);
SpecificAvroSerde<PressureAlert> serde = new SpecificAvroSerde<>();
serde.configure(schemaRegistryConfigMap, false);
return serde;
}
private List<TestRecord<String, PressureAlert>> readNOutputs(int size) {
return testDriverOutputTopic.readRecordsToList();
}
@Before
public void setUp() {
this.pressureSerde = makePressureAlertSerde();
Consumed<String, PressureAlert> consumedPressure =
Consumed.with(Serdes.String(), pressureSerde)
.withTimestampExtractor(timestampExtractor);
Produced<String, PressureAlert> producedPressure =
Produced.with(Serdes.String(), pressureSerde);
StreamsBuilder builder = new StreamsBuilder();
builder.stream(this.inputTopic, consumedPressure).to(this.outputTopic, producedPressure);
this.testDriver = new TopologyTestDriver(builder.build(), WindowFinalResult.buildProperties(config));
this.testDriverOutputTopic =
testDriver
.createOutputTopic(this.outputTopic, Serdes.String().deserializer(), this.pressureSerde.deserializer());
}
@After
public void tearDown() {
testDriver.close();
}
@Test
public void extract() {
final TestInputTopic<Bytes, PressureAlert>
testDriverInputTopic =
testDriver.createInputTopic(this.inputTopic, Serdes.Bytes().serializer(), this.pressureSerde.serializer());
List<PressureAlert> inputs = Arrays.asList(
new PressureAlert("101", "2019-09-21T05:25:01.+0200", Integer.MAX_VALUE),
new PressureAlert("102", "2019-09-21T05:30:02.+0200", Integer.MAX_VALUE),
new PressureAlert("103", "2019-09-21T05:45:03.+0200", Integer.MAX_VALUE),
new PressureAlert("104", "DEFINITELY-NOT-PARSABLE!!", Integer.MAX_VALUE),
new PressureAlert("105", "1500-06-24T09:11:03.+0200", Integer.MAX_VALUE)
);
inputs.forEach(pressureAlert ->
testDriverInputTopic.pipeInput(null, pressureAlert)
);
List<TestRecord<String, PressureAlert>> result = readNOutputs(5);
Optional<TestRecord<String, PressureAlert>> resultOne =
result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("101")).findFirst();
Optional<TestRecord<String, PressureAlert>> resultTwo =
result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("102")).findFirst();
Optional<TestRecord<String, PressureAlert>> resultThree =
result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("103")).findFirst();
Optional<TestRecord<String, PressureAlert>> resultFour =
result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("104")).findFirst();
Optional<TestRecord<String, PressureAlert>> resultFive =
result.stream().filter(Objects::nonNull).filter(r -> r.value().getId().equals("105")).findFirst();
Assert.assertTrue(resultOne.isPresent());
Assert.assertTrue(resultTwo.isPresent());
Assert.assertTrue(resultThree.isPresent());
Assert.assertFalse(resultFour.isPresent());
Assert.assertFalse(resultFive.isPresent());
Assert.assertEquals(
formatter.parse("2019-09-21T05:25:01.+0200", Instant::from).toEpochMilli(),
resultOne.get().timestamp().longValue()
);
Assert.assertEquals(
formatter.parse("2019-09-21T05:30:02.+0200", Instant::from).toEpochMilli(),
resultTwo.get().timestamp().longValue()
);
Assert.assertEquals(
formatter.parse("2019-09-21T05:45:03.+0200", Instant::from).toEpochMilli(),
resultThree.get().timestamp().longValue()
);
}
}
Now run the test, which is as simple as:
./gradlew test
First, create a directory for the configuration:
mkdir configuration
Then, create a new configuration file at configuration/prod.properties
with the following content. Be sure to fill in the addresses of your production hosts and change any other parameters that make sense for your setup.
application.id=final-results-tutorial-prod
bootstrap.servers=broker:9092
schema.registry.url=http://schema-registry:8081
window.size=10 seconds
window.grace.period=20 seconds
sensor.datetime.pattern=yyyy-MM-dd'T'HH:mm:ss.Z
local.lang=fr
local.pattern=EEE d MMM yyyy # date only
input.topic.name=input-topic
input.topic.partitions=1
input.topic.replication.factor=1
output.topic.name=output-topic
output.topic.partitions=1
output.topic.replication.factor=1
But, remember that the application has also a default configuration file and you could just replace some of the configuration by defining the corresponding environment variables. Here is the full list of variables:
Variable | Description |
---|---|
|
id of app used as Kafka streams application Id |
|
host:port of your Kafka broker |
|
host:port of your schema registry |
|
name of the input topic |
|
number of partition of the input topic |
|
replication factor of the input topic |
|
name of the output topic |
|
number of partition of the output topic |
|
replication factor of the output topic |
|
size of the aggregation window |
|
how long would you like to wait for late data point ?? |
In your terminal, execute the following command to invoke the Jib plugin to build a docker image:
gradle jibDockerBuild --image=io.confluent.developer/kstreams-window-final-result:0.0.1-SNAPSHOT
Finally, launch the container using your preferred container orchestration service.
If you want to run it locally and pass your custom configuration file, you can execute the following:
docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/kstreams-window-final-result:0.0.1-SNAPSHOT -Dconfig.file=/config.properties
Or, just run the container and set the environment variables you’d like to change:
docker run\
-e "BOOTSTRAP_SERVERS=broker:9092"\
-e "SCHEMA_REGISTRY_URL=http://schema-registry:8081"\
io.confluent.developer/kstreams-window-final-result:0.0.1-SNAPSHOT
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.
First, create your Kafka cluster in Confluent Cloud.
Use the promo code CC100KTS
to receive an additional $100 free usage (details).
Next, from the Confluent Cloud UI, click on Tools & client config
to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.