How to count messages in a Kafka topic

Question:

How can I count the number of messages in a Kafka topic?

Edit this page

Example use case:

Messages are stored in Kafka topics until they are removed by Kafka. It can be useful to know how many messages there are currently in a topic. In this example we'll take a topic of pageview data and see how we can count all of the messages in the topic.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir count-messages && cd count-messages

Then make the following directories to set up its structure:

mkdir src test

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.1
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.11.0
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
    user: root
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
      # Kafka Connect config below
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
      KSQL_CONNECT_REST_PORT: 8083
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'

    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # I miss the confluent-hub client :-/
        # mkdir -p /usr/share/confluent-hub-components/
        # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.3.3
        # ------ hack to workaround absence of confluent-hub client
        curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-datagen/versions/0.3.3/confluentinc-kafka-connect-datagen-0.3.3.zip -o /tmp/kafka-connect-datagen.zip
        yum install -y unzip
        unzip /tmp/kafka-connect-datagen.zip -d /usr/share/java/kafka-connect-datagen
        # ----------------------------------------------------------
        #
        echo "Launching ksqlDB"
        /usr/bin/docker/run &

         echo "Waiting for Kafka Connect to start listening on localhost ⏳"
         while : ; do
          curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
          echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
          if [ $$curl_status -eq 200 ] ; then
            break
          fi
          sleep 5
        done

        echo -e "\n--\n+> Creating Data Generators"
        curl -i -X PUT http://localhost:8083/connectors/datagen_01/config \
             -H "Content-Type: application/json" \
             -d '{
                    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                    "value.converter.schemas.enable":false,
                    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                    "kafka.topic": "pageviews",
                    "quickstart": "pageviews",
                    "iterations": 42,
                    "tasks.max": "1"
                }'

        curl -i -X PUT http://localhost:8083/connectors/datagen_02/config \
             -H "Content-Type: application/json" \
              -d '{
                    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                    "value.converter.schemas.enable":false,
                    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                    "kafka.topic": "trades",
                    "quickstart": "Stock_Trades",
                    "max.interval": 1000,
                    "iterations": 4242424242,
                    "tasks.max": "1"
                }'

        sleep infinity
    volumes:
      - ./src:/opt/app/src
      - ./test:/opt/app/test

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    links:
      - broker
    entrypoint:
      - /bin/sh
      - -c
      - |
        apk add jq;
        while [ 1 -eq 1 ];do sleep 60;done

And bring up the stack of components by running:

docker-compose up -d

Run this snippet of code which will block until ksqlDB is available

docker exec -it ksqldb bash -c 'echo -e "\n\n  Waiting for ksqlDB to be available before launching CLI\n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done'

3
Launch the ksqlDB CLI and declare a stream

To begin developing interactively, open up the ksqlDB CLI:

docker exec -it ksqldb ksql http://ksqldb:8088

First, we’ll declare a ksqlDB stream on the Kafka topic with which we want to work. A ksqlDB stream is a Kafka topic (which here already exists), with a schema.

CREATE STREAM pageviews (msg VARCHAR)
  WITH (KAFKA_TOPIC ='pageviews',
        VALUE_FORMAT='JSON');

Note that at this stage we’re just interested in counting the messages in their entirety, so we define the loosest schema possible (msg VARCHAR) for speed.

4
Count all the messages in a topic

Since we want to count all of the messages in the topic (and not just those that arrive after the query has started) we need to tell ksqlDB to query from the beginning of the topic:

SET 'auto.offset.reset' = 'earliest';

You should get message confirming the change:

Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

Now let’s count all of the messages in the topic:

SELECT 'X' AS X,
       COUNT(*) AS MSG_CT
  FROM PAGEVIEWS
  GROUP BY 'X'
  EMIT CHANGES LIMIT 1;

We’re specifying a LIMIT 1 clause here so that the query exits after the first row has returned. Without this, ksqlDB will continue to write any changes in the count to the screen as new messages arrive.

You should now see a count of all the messages in the topic:

+------------------------+------------------------+
|X                       |MSG_CT                  |
+------------------------+------------------------+
|X                       |42                      |
Limit Reached
Query terminated
You may be wondering about the purpose of X in the query. It’s a dummy field to pursuade ksqlDB to do an aggregation across all messages. For more information see ksqlDB GitHub issue #430.

As we saw from the query above, it will scan through all of the messages in the topic and then output the total, including changes as new messages arrive. What if we would like to maintain a count of messages that we can query with low-latency whenever we want?

This is where ksqlDB tables come in. They are stateful aggregations held by ksqlDB, backed both by Kafka topics, and an internal state store.

Run this SQL to create a table holding the results of the same COUNT(*) that we ran above.

CREATE TABLE MSG_COUNT AS
    SELECT 'X' AS X,
        COUNT(*) AS MSG_CT
    FROM PAGEVIEWS
    GROUP BY 'X'
    EMIT CHANGES;

The output from this is a confirmation that the table has been created.

 Created query with ID CTAS_MSG_COUNT_0

Often we will want to just query the current number of messages in a topic from the materialised view that we built in the ksqlDB table and exit. Compare this to the query above with EMIT CHANGES in which the query continues to run until we cancel it (or add a LIMIT clause).

SELECT * FROM MSG_COUNT WHERE X='X';

As before we get the total number of messages, but instead of querying the stream directly—and performing the aggregation whilst we wait—we are querying the stateful aggregation that ksqlDB has built.

+------------------------+------------------------+
|X                       |MSG_CT                  |
+------------------------+------------------------+
|X                       |42                      |
Query terminated

This type of query is known as a pull query and can be used against ksqlDB tables with materialised state with certain conditions around the predicate used.

5
Query the number of messages using the REST API

The neat thing about it is that you can use ksqlDB’s REST API (or Java client) to run pull queries from your own application to directly look up a value. Consider, instead of needing an external datastore into which to hold state for your application to query, you can actually do this directly from Kafka and ksqlDB itself.

Exit the ksqlDB command prompt and run this from your shell directly:

docker exec ksqldb \
    curl --silent --show-error \
         --http2 'http://localhost:8088/query-stream' \
         --data-raw '{"sql":"SELECT MSG_CT FROM MSG_COUNT WHERE X='\''X'\'';"}'

Now you get the count of messages in the Kafka topic, queried directly from the materialized view that’s built and populated in ksqlDB.

{"queryId":null,"columnNames":["MSG_CT"],"columnTypes":["BIGINT"]}
[42]

6
Write your statements to a file

Now that you have a series of statements that’s doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at src/statements.sql with the following content:

CREATE STREAM pageviews (msg VARCHAR)
  WITH (KAFKA_TOPIC ='pageviews',
        VALUE_FORMAT='JSON');

CREATE TABLE MSG_COUNT AS
    SELECT 'X' AS X,
        COUNT(*) AS MSG_CT
    FROM PAGEVIEWS
    GROUP BY 'X'
    EMIT CHANGES;

Test it

1
Create the test data

Create a file at test/input.json with the inputs for testing:

{
  "inputs": [
    {
      "topic": "pageviews",
      "timestamp": 1600341475094,
      "key": "1",
      "value": {
        "viewtime": 1,
        "userid": "User_7",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341475206,
      "key": "11",
      "value": {
        "viewtime": 11,
        "userid": "User_8",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341475478,
      "key": "21",
      "value": {
        "viewtime": 21,
        "userid": "User_7",
        "pageid": "Page_64"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341475955,
      "key": "31",
      "value": {
        "viewtime": 31,
        "userid": "User_7",
        "pageid": "Page_29"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476073,
      "key": "41",
      "value": {
        "viewtime": 41,
        "userid": "User_4",
        "pageid": "Page_41"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476547,
      "key": "51",
      "value": {
        "viewtime": 51,
        "userid": "User_6",
        "pageid": "Page_68"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476659,
      "key": "61",
      "value": {
        "viewtime": 61,
        "userid": "User_5",
        "pageid": "Page_17"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476677,
      "key": "71",
      "value": {
        "viewtime": 71,
        "userid": "User_1",
        "pageid": "Page_60"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476727,
      "key": "81",
      "value": {
        "viewtime": 81,
        "userid": "User_4",
        "pageid": "Page_80"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341476886,
      "key": "91",
      "value": {
        "viewtime": 91,
        "userid": "User_8",
        "pageid": "Page_90"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341477316,
      "key": "101",
      "value": {
        "viewtime": 101,
        "userid": "User_9",
        "pageid": "Page_33"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341477503,
      "key": "111",
      "value": {
        "viewtime": 111,
        "userid": "User_7",
        "pageid": "Page_50"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341477758,
      "key": "121",
      "value": {
        "viewtime": 121,
        "userid": "User_6",
        "pageid": "Page_10"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478251,
      "key": "131",
      "value": {
        "viewtime": 131,
        "userid": "User_4",
        "pageid": "Page_75"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478450,
      "key": "141",
      "value": {
        "viewtime": 141,
        "userid": "User_3",
        "pageid": "Page_13"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478512,
      "key": "151",
      "value": {
        "viewtime": 151,
        "userid": "User_6",
        "pageid": "Page_28"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341478868,
      "key": "161",
      "value": {
        "viewtime": 161,
        "userid": "User_2",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479027,
      "key": "171",
      "value": {
        "viewtime": 171,
        "userid": "User_3",
        "pageid": "Page_44"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479064,
      "key": "181",
      "value": {
        "viewtime": 181,
        "userid": "User_1",
        "pageid": "Page_42"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479462,
      "key": "191",
      "value": {
        "viewtime": 191,
        "userid": "User_1",
        "pageid": "Page_59"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341479532,
      "key": "201",
      "value": {
        "viewtime": 201,
        "userid": "User_1",
        "pageid": "Page_30"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341480014,
      "key": "211",
      "value": {
        "viewtime": 211,
        "userid": "User_1",
        "pageid": "Page_47"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341480252,
      "key": "221",
      "value": {
        "viewtime": 221,
        "userid": "User_2",
        "pageid": "Page_23"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341480652,
      "key": "231",
      "value": {
        "viewtime": 231,
        "userid": "User_9",
        "pageid": "Page_97"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341481044,
      "key": "241",
      "value": {
        "viewtime": 241,
        "userid": "User_7",
        "pageid": "Page_18"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341481304,
      "key": "251",
      "value": {
        "viewtime": 251,
        "userid": "User_1",
        "pageid": "Page_22"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341481749,
      "key": "261",
      "value": {
        "viewtime": 261,
        "userid": "User_5",
        "pageid": "Page_35"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341482080,
      "key": "271",
      "value": {
        "viewtime": 271,
        "userid": "User_5",
        "pageid": "Page_13"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341482470,
      "key": "281",
      "value": {
        "viewtime": 281,
        "userid": "User_3",
        "pageid": "Page_33"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341482957,
      "key": "291",
      "value": {
        "viewtime": 291,
        "userid": "User_2",
        "pageid": "Page_41"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483102,
      "key": "301",
      "value": {
        "viewtime": 301,
        "userid": "User_3",
        "pageid": "Page_40"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483115,
      "key": "311",
      "value": {
        "viewtime": 311,
        "userid": "User_3",
        "pageid": "Page_24"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483594,
      "key": "321",
      "value": {
        "viewtime": 321,
        "userid": "User_5",
        "pageid": "Page_88"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341483978,
      "key": "331",
      "value": {
        "viewtime": 331,
        "userid": "User_1",
        "pageid": "Page_18"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341484149,
      "key": "341",
      "value": {
        "viewtime": 341,
        "userid": "User_5",
        "pageid": "Page_35"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341484582,
      "key": "351",
      "value": {
        "viewtime": 351,
        "userid": "User_1",
        "pageid": "Page_85"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341484880,
      "key": "361",
      "value": {
        "viewtime": 361,
        "userid": "User_1",
        "pageid": "Page_29"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485041,
      "key": "371",
      "value": {
        "viewtime": 371,
        "userid": "User_1",
        "pageid": "Page_40"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485271,
      "key": "381",
      "value": {
        "viewtime": 381,
        "userid": "User_3",
        "pageid": "Page_29"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485570,
      "key": "391",
      "value": {
        "viewtime": 391,
        "userid": "User_4",
        "pageid": "Page_15"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485759,
      "key": "401",
      "value": {
        "viewtime": 401,
        "userid": "User_2",
        "pageid": "Page_27"
      }
    },
    {
      "topic": "pageviews",
      "timestamp": 1600341485951,
      "key": "411",
      "value": {
        "viewtime": 411,
        "userid": "User_9",
        "pageid": "Page_22"
      }
    }
  ]
}

Similarly, create a file at test/output.json with the expected outputs:

{ "outputs": [
        { "topic": "MSG_COUNT", "timestamp":1600341475094, "key": "X", "value": {"MSG_CT": 1}},
        { "topic": "MSG_COUNT", "timestamp":1600341475206, "key": "X", "value": {"MSG_CT": 2}},
        { "topic": "MSG_COUNT", "timestamp":1600341475478, "key": "X", "value": {"MSG_CT": 3}},
        { "topic": "MSG_COUNT", "timestamp":1600341475955, "key": "X", "value": {"MSG_CT": 4}},
        { "topic": "MSG_COUNT", "timestamp":1600341476073, "key": "X", "value": {"MSG_CT": 5}},
        { "topic": "MSG_COUNT", "timestamp":1600341476547, "key": "X", "value": {"MSG_CT": 6}},
        { "topic": "MSG_COUNT", "timestamp":1600341476659, "key": "X", "value": {"MSG_CT": 7}},
        { "topic": "MSG_COUNT", "timestamp":1600341476677, "key": "X", "value": {"MSG_CT": 8}},
        { "topic": "MSG_COUNT", "timestamp":1600341476727, "key": "X", "value": {"MSG_CT": 9}},
        { "topic": "MSG_COUNT", "timestamp":1600341476886, "key": "X", "value": {"MSG_CT": 10}},
        { "topic": "MSG_COUNT", "timestamp":1600341477316, "key": "X", "value": {"MSG_CT": 11}},
        { "topic": "MSG_COUNT", "timestamp":1600341477503, "key": "X", "value": {"MSG_CT": 12}},
        { "topic": "MSG_COUNT", "timestamp":1600341477758, "key": "X", "value": {"MSG_CT": 13}},
        { "topic": "MSG_COUNT", "timestamp":1600341478251, "key": "X", "value": {"MSG_CT": 14}},
        { "topic": "MSG_COUNT", "timestamp":1600341478450, "key": "X", "value": {"MSG_CT": 15}},
        { "topic": "MSG_COUNT", "timestamp":1600341478512, "key": "X", "value": {"MSG_CT": 16}},
        { "topic": "MSG_COUNT", "timestamp":1600341478868, "key": "X", "value": {"MSG_CT": 17}},
        { "topic": "MSG_COUNT", "timestamp":1600341479027, "key": "X", "value": {"MSG_CT": 18}},
        { "topic": "MSG_COUNT", "timestamp":1600341479064, "key": "X", "value": {"MSG_CT": 19}},
        { "topic": "MSG_COUNT", "timestamp":1600341479462, "key": "X", "value": {"MSG_CT": 20}},
        { "topic": "MSG_COUNT", "timestamp":1600341479532, "key": "X", "value": {"MSG_CT": 21}},
        { "topic": "MSG_COUNT", "timestamp":1600341480014, "key": "X", "value": {"MSG_CT": 22}},
        { "topic": "MSG_COUNT", "timestamp":1600341480252, "key": "X", "value": {"MSG_CT": 23}},
        { "topic": "MSG_COUNT", "timestamp":1600341480652, "key": "X", "value": {"MSG_CT": 24}},
        { "topic": "MSG_COUNT", "timestamp":1600341481044, "key": "X", "value": {"MSG_CT": 25}},
        { "topic": "MSG_COUNT", "timestamp":1600341481304, "key": "X", "value": {"MSG_CT": 26}},
        { "topic": "MSG_COUNT", "timestamp":1600341481749, "key": "X", "value": {"MSG_CT": 27}},
        { "topic": "MSG_COUNT", "timestamp":1600341482080, "key": "X", "value": {"MSG_CT": 28}},
        { "topic": "MSG_COUNT", "timestamp":1600341482470, "key": "X", "value": {"MSG_CT": 29}},
        { "topic": "MSG_COUNT", "timestamp":1600341482957, "key": "X", "value": {"MSG_CT": 30}},
        { "topic": "MSG_COUNT", "timestamp":1600341483102, "key": "X", "value": {"MSG_CT": 31}},
        { "topic": "MSG_COUNT", "timestamp":1600341483115, "key": "X", "value": {"MSG_CT": 32}},
        { "topic": "MSG_COUNT", "timestamp":1600341483594, "key": "X", "value": {"MSG_CT": 33}},
        { "topic": "MSG_COUNT", "timestamp":1600341483978, "key": "X", "value": {"MSG_CT": 34}},
        { "topic": "MSG_COUNT", "timestamp":1600341484149, "key": "X", "value": {"MSG_CT": 35}},
        { "topic": "MSG_COUNT", "timestamp":1600341484582, "key": "X", "value": {"MSG_CT": 36}},
        { "topic": "MSG_COUNT", "timestamp":1600341484880, "key": "X", "value": {"MSG_CT": 37}},
        { "topic": "MSG_COUNT", "timestamp":1600341485041, "key": "X", "value": {"MSG_CT": 38}},
        { "topic": "MSG_COUNT", "timestamp":1600341485271, "key": "X", "value": {"MSG_CT": 39}},
        { "topic": "MSG_COUNT", "timestamp":1600341485570, "key": "X", "value": {"MSG_CT": 40}},
        { "topic": "MSG_COUNT", "timestamp":1600341485759, "key": "X", "value": {"MSG_CT": 41}},
        { "topic": "MSG_COUNT", "timestamp":1600341485951, "key": "X", "value": {"MSG_CT": 42}}
  ] }

Note that contrary to the output we saw from the CLI above, in the test execution there is no buffering of the input records and so an aggregate value is emitted for every input record processed (ref).

2
Invoke the tests

Lastly, invoke the tests using the test runner and the statements file that you created earlier:

docker exec ksqldb ksql-test-runner -i /opt/app/test/input.json -s /opt/app/src/statements.sql -o /opt/app/test/output.json 2>&1

Which should pass:

	 >>> Test passed!

3
Clean up

Once the test has completed successfully you can tear down the Docker Compose stack. This will delete all data that you’ve stored in Kafka and ksqlDB.

docker-compose down