How to count messages in a Kafka topic

Question:

How can you count the number of messages in a Kafka topic?

Edit this page

Example use case:

It can be useful to know how many messages are currently in a topic, but you cannot calculate this directly based on the offsets, because you need to consider the topic's retention policy, log compaction, and potential duplicate messages. In this example, we'll take a topic of pageview data and see how we can count all of the messages in the topic. Note that the time complexity for this tutorial is O(n) (linear); processing time will depend on the number of messages in the topic, and large data sets will require long running times.

Hands-on code example:

Short Answer

Create a ksqlDB stream over the Kafka topic, and then use the COUNT function to count the number of messages.

SELECT 'X' AS X,
       COUNT(*) AS MSG_CT
  FROM PAGEVIEWS
  GROUP BY 'X'
  EMIT CHANGES LIMIT 1;

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir count-messages && cd count-messages

Then make the following directories to set up its structure:

mkdir src test

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    container_name: broker
    ports:
    - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  ksqldb:
    image: confluentinc/ksqldb-server:0.28.2
    container_name: ksqldb
    depends_on:
    - broker
    ports:
    - 8088:8088
    user: root
    environment:
      KSQL_CONFIG_DIR: /etc/ksqldb
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: ^_.*
      KSQL_KSQL_CONNECT_WORKER_CONFIG: /etc/ksqldb/connect.properties
      KSQL_CONNECT_BOOTSTRAP_SERVERS: broker:29092
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: ksqldb
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m
        (%c:%L)%n'
      KSQL_CONNECT_PLUGIN_PATH: /usr/share/java
    command:
    - bash
    - -c
    - |
      echo "Installing connector plugins"
      # I miss the confluent-hub client :-/
      # mkdir -p /usr/share/confluent-hub-components/
      # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.3.3
      # ------ hack to workaround absence of confluent-hub client
      curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-datagen/versions/0.4.0/confluentinc-kafka-connect-datagen-0.4.0.zip -o /tmp/kafka-connect-datagen.zip
      yum install -y unzip
      unzip /tmp/kafka-connect-datagen.zip -d /usr/share/java/kafka-connect-datagen
      # ----------------------------------------------------------
      #
      echo "Launching ksqlDB"
      /usr/bin/docker/run &

       echo "Waiting for Kafka Connect to start listening on localhost ⏳"
       while : ; do
        curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
        echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
        if [ $$curl_status -eq 200 ] ; then
          break
        fi
        sleep 5
      done

      echo -e "\n--\n+> Creating Data Generators"
      curl -i -X PUT http://localhost:8083/connectors/datagen_01/config \
           -H "Content-Type: application/json" \
           -d '{
                  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                  "value.converter.schemas.enable":false,
                  "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                  "kafka.topic": "pageviews",
                  "quickstart": "pageviews",
                  "iterations": 42,
                  "tasks.max": "1"
              }'

      curl -i -X PUT http://localhost:8083/connectors/datagen_02/config \
           -H "Content-Type: application/json" \
            -d '{
                  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                  "value.converter.schemas.enable":false,
                  "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                  "kafka.topic": "trades",
                  "quickstart": "Stock_Trades",
                  "max.interval": 1000,
                  "iterations": 4242424242,
                  "tasks.max": "1"
              }'

      sleep infinity
    volumes:
    - ./src:/opt/app/src
    - ./test:/opt/app/test
  kcat:
    image: edenhill/kcat:1.7.1
    container_name: kcat
    links:
    - broker
    entrypoint:
    - /bin/sh
    - -c
    - "apk add jq; \nwhile [ 1 -eq 1 ];do sleep 60;done\n"

And bring up the stack of components by running:

docker compose up -d

Run this snippet of code which will block until ksqlDB is available

docker exec -it ksqldb bash -c 'echo -e "\n\n  Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done'

Launch the ksqlDB CLI and declare a stream

4

To begin developing interactively, open up the ksqlDB CLI:

docker exec -it ksqldb ksql http://ksqldb:8088

First, we’ll declare a ksqlDB stream on the Kafka topic with which we want to work. A ksqlDB stream is a Kafka topic (which here already exists), with a schema.

CREATE STREAM pageviews (msg VARCHAR)
  WITH (KAFKA_TOPIC ='pageviews',
        VALUE_FORMAT='JSON');

Note that at this stage we’re just interested in counting the messages in their entirety, so we define the loosest schema possible (msg VARCHAR) for speed.

Count all the messages in a topic

5

Since we want to count all of the messages in the topic (and not just those that arrive after the query has started) we need to tell ksqlDB to query from the beginning of the topic:

SET 'auto.offset.reset' = 'earliest';

You should get message confirming the change:

Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

Now let’s count all of the messages in the topic:

SELECT 'X' AS X,
       COUNT(*) AS MSG_CT
  FROM PAGEVIEWS
  GROUP BY 'X'
  EMIT CHANGES LIMIT 1;

We’re specifying a LIMIT 1 clause here so that the query exits after the first row has returned. Without this, ksqlDB will continue to write any changes in the count to the screen as new messages arrive.

You should now see a count of all the messages in the topic:

+------------------------+------------------------+
|X                       |MSG_CT                  |
+------------------------+------------------------+
|X                       |42                      |
Limit Reached
Query terminated
You may be wondering about the purpose of X in the query. It’s a dummy field to persuade ksqlDB to do an aggregation across all messages. For more information see ksqlDB GitHub issue #430.

As we saw from the query above, it will scan through all of the messages in the topic and then output the total, including changes as new messages arrive. What if we would like to maintain a count of messages that we can query with low-latency whenever we want?

This is where ksqlDB tables come in. They are stateful aggregations held by ksqlDB, backed both by Kafka topics, and an internal state store.

Run this SQL to create a table holding the results of the same COUNT(*) that we ran above.

CREATE TABLE MSG_COUNT AS
    SELECT 'X' AS X,
        COUNT(*) AS MSG_CT
    FROM PAGEVIEWS
    GROUP BY 'X'
    EMIT CHANGES;

The output from this is a confirmation that the table has been created.

 Created query with ID CTAS_MSG_COUNT_0

Often we will want to just query the current number of messages in a topic from the materialised view that we built in the ksqlDB table and exit. Compare this to the query above with EMIT CHANGES in which the query continues to run until we cancel it (or add a LIMIT clause).

SELECT * FROM MSG_COUNT WHERE X='X';

As before we get the total number of messages, but instead of querying the stream directly—and performing the aggregation whilst we wait—we are querying the stateful aggregation that ksqlDB has built.

+------------------------+------------------------+
|X                       |MSG_CT                  |
+------------------------+------------------------+
|X                       |42                      |
Query terminated

This type of query is known as a pull query and can be used against ksqlDB tables with materialised state with certain conditions around the predicate used.

Query the number of messages using the REST API

6

The neat thing about it is that you can use ksqlDB’s REST API (or Java client) to run pull queries from your own application to directly look up a value. Consider, instead of needing an external datastore into which to hold state for your application to query, you can actually do this directly from Kafka and ksqlDB itself.

Exit the ksqlDB command prompt and run this from your shell directly:

docker exec ksqldb \
    curl --silent --show-error \
         --http2 'http://localhost:8088/query-stream' \
         --data-raw '{"sql":"SELECT MSG_CT FROM MSG_COUNT WHERE X='\''X'\'';"}'

Now you get the count of messages in the Kafka topic, queried directly from the materialized view that’s built and populated in ksqlDB.

{"queryId":null,"columnNames":["MSG_CT"],"columnTypes":["BIGINT"]}
[42]

Write your statements to a file

7

Now that you have a series of statements that’s doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at src/statements.sql with the following content:

CREATE STREAM pageviews (msg VARCHAR)
  WITH (KAFKA_TOPIC ='pageviews',
        VALUE_FORMAT='JSON');

CREATE TABLE MSG_COUNT AS
    SELECT 'X' AS X,
        COUNT(*) AS MSG_CT
    FROM PAGEVIEWS
    GROUP BY 'X'
    EMIT CHANGES;

Test it

Create the test data

1

Create a file at test/input.json with the inputs for testing:

{
  "inputs": [
    {
      "topic": "pageviews",
      "timestamp": 1600341475094,
      "key": "1",
      "value": {
        "viewtime": 1,
        "userid": "User_7",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341475206,
      "key": "11",
      "value": {
        "viewtime": 11,
        "userid": "User_8",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341475478,
      "key": "21",
      "value": {
        "viewtime": 21,
        "userid": "User_7",
        "pageid": "Page_64"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341475955,
      "key": "31",
      "value": {
        "viewtime": 31,
        "userid": "User_7",
        "pageid": "Page_29"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476073,
      "key": "41",
      "value": {
        "viewtime": 41,
        "userid": "User_4",
        "pageid": "Page_41"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476547,
      "key": "51",
      "value": {
        "viewtime": 51,
        "userid": "User_6",
        "pageid": "Page_68"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476659,
      "key": "61",
      "value": {
        "viewtime": 61,
        "userid": "User_5",
        "pageid": "Page_17"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476677,
      "key": "71",
      "value": {
        "viewtime": 71,
        "userid": "User_1",
        "pageid": "Page_60"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476727,
      "key": "81",
      "value": {
        "viewtime": 81,
        "userid": "User_4",
        "pageid": "Page_80"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476886,
      "key": "91",
      "value": {
        "viewtime": 91,
        "userid": "User_8",
        "pageid": "Page_90"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341477316,
      "key": "101",
      "value": {
        "viewtime": 101,
        "userid": "User_9",
        "pageid": "Page_33"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341477503,
      "key": "111",
      "value": {
        "viewtime": 111,
        "userid": "User_7",
        "pageid": "Page_50"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341477758,
      "key": "121",
      "value": {
        "viewtime": 121,
        "userid": "User_6",
        "pageid": "Page_10"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478251,
      "key": "131",
      "value": {
        "viewtime": 131,
        "userid": "User_4",
        "pageid": "Page_75"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478450,
      "key": "141",
      "value": {
        "viewtime": 141,
        "userid": "User_3",
        "pageid": "Page_13"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478512,
      "key": "151",
      "value": {
        "viewtime": 151,
        "userid": "User_6",
        "pageid": "Page_28"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478868,
      "key": "161",
      "value": {
        "viewtime": 161,
        "userid": "User_2",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479027,
      "key": "171",
      "value": {
        "viewtime": 171,
        "userid": "User_3",
        "pageid": "Page_44"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479064,
      "key": "181",
      "value": {
        "viewtime": 181,
        "userid": "User_1",
        "pageid": "Page_42"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479462,
      "key": "191",
      "value": {
        "viewtime": 191,
        "userid": "User_1",
        "pageid": "Page_59"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479532,
      "key": "201",
      "value": {
        "viewtime": 201,
        "userid": "User_1",
        "pageid": "Page_30"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341480014,
      "key": "211",
      "value": {
        "viewtime": 211,
        "userid": "User_1",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341480252,
      "key": "221",
      "value": {
        "viewtime": 221,
        "userid": "User_2",
        "pageid": "Page_23"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341480652,
      "key": "231",
      "value": {
        "viewtime": 231,
        "userid": "User_9",
        "pageid": "Page_97"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341481044,
      "key": "241",
      "value": {
        "viewtime": 241,
        "userid": "User_7",
        "pageid": "Page_18"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341481304,
      "key": "251",
      "value": {
        "viewtime": 251,
        "userid": "User_1",
        "pageid": "Page_22"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341481749,
      "key": "261",
      "value": {
        "viewtime": 261,
        "userid": "User_5",
        "pageid": "Page_35"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341482080,
      "key": "271",
      "value": {
        "viewtime": 271,
        "userid": "User_5",
        "pageid": "Page_13"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341482470,
      "key": "281",
      "value": {
        "viewtime": 281,
        "userid": "User_3",
        "pageid": "Page_33"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341482957,
      "key": "291",
      "value": {
        "viewtime": 291,
        "userid": "User_2",
        "pageid": "Page_41"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483102,
      "key": "301",
      "value": {
        "viewtime": 301,
        "userid": "User_3",
        "pageid": "Page_40"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483115,
      "key": "311",
      "value": {
        "viewtime": 311,
        "userid": "User_3",
        "pageid": "Page_24"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483594,
      "key": "321",
      "value": {
        "viewtime": 321,
        "userid": "User_5",
        "pageid": "Page_88"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483978,
      "key": "331",
      "value": {
        "viewtime": 331,
        "userid": "User_1",
        "pageid": "Page_18"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341484149,
      "key": "341",
      "value": {
        "viewtime": 341,
        "userid": "User_5",
        "pageid": "Page_35"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341484582,
      "key": "351",
      "value": {
        "viewtime": 351,
        "userid": "User_1",
        "pageid": "Page_85"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341484880,
      "key": "361",
      "value": {
        "viewtime": 361,
        "userid": "User_1",
        "pageid": "Page_29"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485041,
      "key": "371",
      "value": {
        "viewtime": 371,
        "userid": "User_1",
        "pageid": "Page_40"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485271,
      "key": "381",
      "value": {
        "viewtime": 381,
        "userid": "User_3",
        "pageid": "Page_29"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485570,
      "key": "391",
      "value": {
        "viewtime": 391,
        "userid": "User_4",
        "pageid": "Page_15"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485759,
      "key": "401",
      "value": {
        "viewtime": 401,
        "userid": "User_2",
        "pageid": "Page_27"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485951,
      "key": "411",
      "value": {
        "viewtime": 411,
        "userid": "User_9",
        "pageid": "Page_22"
      }
    }
  ]
}

Similarly, create a file at test/output.json with the expected outputs:

{ "outputs": [
        { "topic": "MSG_COUNT", "timestamp":1600341475094, "key": "X", "value": {"MSG_CT": 1}},
        { "topic": "MSG_COUNT", "timestamp":1600341475206, "key": "X", "value": {"MSG_CT": 2}},
        { "topic": "MSG_COUNT", "timestamp":1600341475478, "key": "X", "value": {"MSG_CT": 3}},
        { "topic": "MSG_COUNT", "timestamp":1600341475955, "key": "X", "value": {"MSG_CT": 4}},
        { "topic": "MSG_COUNT", "timestamp":1600341476073, "key": "X", "value": {"MSG_CT": 5}},
        { "topic": "MSG_COUNT", "timestamp":1600341476547, "key": "X", "value": {"MSG_CT": 6}},
        { "topic": "MSG_COUNT", "timestamp":1600341476659, "key": "X", "value": {"MSG_CT": 7}},
        { "topic": "MSG_COUNT", "timestamp":1600341476677, "key": "X", "value": {"MSG_CT": 8}},
        { "topic": "MSG_COUNT", "timestamp":1600341476727, "key": "X", "value": {"MSG_CT": 9}},
        { "topic": "MSG_COUNT", "timestamp":1600341476886, "key": "X", "value": {"MSG_CT": 10}},
        { "topic": "MSG_COUNT", "timestamp":1600341477316, "key": "X", "value": {"MSG_CT": 11}},
        { "topic": "MSG_COUNT", "timestamp":1600341477503, "key": "X", "value": {"MSG_CT": 12}},
        { "topic": "MSG_COUNT", "timestamp":1600341477758, "key": "X", "value": {"MSG_CT": 13}},
        { "topic": "MSG_COUNT", "timestamp":1600341478251, "key": "X", "value": {"MSG_CT": 14}},
        { "topic": "MSG_COUNT", "timestamp":1600341478450, "key": "X", "value": {"MSG_CT": 15}},
        { "topic": "MSG_COUNT", "timestamp":1600341478512, "key": "X", "value": {"MSG_CT": 16}},
        { "topic": "MSG_COUNT", "timestamp":1600341478868, "key": "X", "value": {"MSG_CT": 17}},
        { "topic": "MSG_COUNT", "timestamp":1600341479027, "key": "X", "value": {"MSG_CT": 18}},
        { "topic": "MSG_COUNT", "timestamp":1600341479064, "key": "X", "value": {"MSG_CT": 19}},
        { "topic": "MSG_COUNT", "timestamp":1600341479462, "key": "X", "value": {"MSG_CT": 20}},
        { "topic": "MSG_COUNT", "timestamp":1600341479532, "key": "X", "value": {"MSG_CT": 21}},
        { "topic": "MSG_COUNT", "timestamp":1600341480014, "key": "X", "value": {"MSG_CT": 22}},
        { "topic": "MSG_COUNT", "timestamp":1600341480252, "key": "X", "value": {"MSG_CT": 23}},
        { "topic": "MSG_COUNT", "timestamp":1600341480652, "key": "X", "value": {"MSG_CT": 24}},
        { "topic": "MSG_COUNT", "timestamp":1600341481044, "key": "X", "value": {"MSG_CT": 25}},
        { "topic": "MSG_COUNT", "timestamp":1600341481304, "key": "X", "value": {"MSG_CT": 26}},
        { "topic": "MSG_COUNT", "timestamp":1600341481749, "key": "X", "value": {"MSG_CT": 27}},
        { "topic": "MSG_COUNT", "timestamp":1600341482080, "key": "X", "value": {"MSG_CT": 28}},
        { "topic": "MSG_COUNT", "timestamp":1600341482470, "key": "X", "value": {"MSG_CT": 29}},
        { "topic": "MSG_COUNT", "timestamp":1600341482957, "key": "X", "value": {"MSG_CT": 30}},
        { "topic": "MSG_COUNT", "timestamp":1600341483102, "key": "X", "value": {"MSG_CT": 31}},
        { "topic": "MSG_COUNT", "timestamp":1600341483115, "key": "X", "value": {"MSG_CT": 32}},
        { "topic": "MSG_COUNT", "timestamp":1600341483594, "key": "X", "value": {"MSG_CT": 33}},
        { "topic": "MSG_COUNT", "timestamp":1600341483978, "key": "X", "value": {"MSG_CT": 34}},
        { "topic": "MSG_COUNT", "timestamp":1600341484149, "key": "X", "value": {"MSG_CT": 35}},
        { "topic": "MSG_COUNT", "timestamp":1600341484582, "key": "X", "value": {"MSG_CT": 36}},
        { "topic": "MSG_COUNT", "timestamp":1600341484880, "key": "X", "value": {"MSG_CT": 37}},
        { "topic": "MSG_COUNT", "timestamp":1600341485041, "key": "X", "value": {"MSG_CT": 38}},
        { "topic": "MSG_COUNT", "timestamp":1600341485271, "key": "X", "value": {"MSG_CT": 39}},
        { "topic": "MSG_COUNT", "timestamp":1600341485570, "key": "X", "value": {"MSG_CT": 40}},
        { "topic": "MSG_COUNT", "timestamp":1600341485759, "key": "X", "value": {"MSG_CT": 41}},
        { "topic": "MSG_COUNT", "timestamp":1600341485951, "key": "X", "value": {"MSG_CT": 42}}
  ] }

Note that contrary to the output we saw from the CLI above, in the test execution there is no buffering of the input records and so an aggregate value is emitted for every input record processed (ref).

Invoke the tests

2

Lastly, invoke the tests using the test runner and the statements file that you created earlier:

docker exec ksqldb ksql-test-runner -i /opt/app/test/input.json -s /opt/app/src/statements.sql -o /opt/app/test/output.json 2>&1

Which should pass:

	 >>> Test passed!

Clean up

3

Once the test has completed successfully you can tear down the Docker Compose stack. This will delete all data that you’ve stored in Kafka and ksqlDB.

docker compose down