How to count messages in a Kafka topic

Question:

How can you count the number of messages in a Kafka topic?

Edit this page

Example use case:

It can be useful to know how many messages are currently in a topic, but you cannot calculate this directly based on the offsets, because you need to consider the topic's retention policy, log compaction, and potential duplicate messages. In this example, we'll take a topic of pageview data and see how we can count all of the messages in the topic. Note that the time complexity for this tutorial is O(n) (linear); processing time will depend on the number of messages in the topic, and large data sets will require long running times.

Hands-on code example:

Short Answer

Consume the entire Kafka topic using kcat, and count how many messages are read.

docker exec kcat \
    kcat -b broker:29092 -C -t pageviews -e -q | \
    wc -l

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

Make a local directory anywhere you’d like for this project:

mkdir count-messages && cd count-messages

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    container_name: broker
    ports:
    - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  ksqldb:
    image: confluentinc/ksqldb-server:0.28.2
    container_name: ksqldb
    depends_on:
    - broker
    ports:
    - 8088:8088
    user: root
    environment:
      KSQL_CONFIG_DIR: /etc/ksqldb
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: ^_.*
      KSQL_KSQL_CONNECT_WORKER_CONFIG: /etc/ksqldb/connect.properties
      KSQL_CONNECT_BOOTSTRAP_SERVERS: broker:29092
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: ksqldb
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: 'false'
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: '[%d] %p %X{connector.context}%m
        (%c:%L)%n'
      KSQL_CONNECT_PLUGIN_PATH: /usr/share/java
    command:
    - bash
    - -c
    - |
      echo "Installing connector plugins"
      # I miss the confluent-hub client :-/
      # mkdir -p /usr/share/confluent-hub-components/
      # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.3.3
      # ------ hack to workaround absence of confluent-hub client
      curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-datagen/versions/0.4.0/confluentinc-kafka-connect-datagen-0.4.0.zip -o /tmp/kafka-connect-datagen.zip
      yum install -y unzip
      unzip /tmp/kafka-connect-datagen.zip -d /usr/share/java/kafka-connect-datagen
      # ----------------------------------------------------------
      #
      echo "Launching ksqlDB"
      /usr/bin/docker/run &

       echo "Waiting for Kafka Connect to start listening on localhost ⏳"
       while : ; do
        curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
        echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
        if [ $$curl_status -eq 200 ] ; then
          break
        fi
        sleep 5
      done

      echo -e "\n--\n+> Creating Data Generators"
      curl -i -X PUT http://localhost:8083/connectors/datagen_01/config \
           -H "Content-Type: application/json" \
           -d '{
                  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                  "value.converter.schemas.enable":false,
                  "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                  "kafka.topic": "pageviews",
                  "quickstart": "pageviews",
                  "iterations": 42,
                  "tasks.max": "1"
              }'

      curl -i -X PUT http://localhost:8083/connectors/datagen_02/config \
           -H "Content-Type: application/json" \
            -d '{
                  "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                  "value.converter.schemas.enable":false,
                  "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                  "kafka.topic": "trades",
                  "quickstart": "Stock_Trades",
                  "max.interval": 1000,
                  "iterations": 4242424242,
                  "tasks.max": "1"
              }'

      sleep infinity
    volumes:
    - ./src:/opt/app/src
    - ./test:/opt/app/test
  kcat:
    image: edenhill/kcat:1.7.1
    container_name: kcat
    links:
    - broker
    entrypoint:
    - /bin/sh
    - -c
    - "apk add jq; \nwhile [ 1 -eq 1 ];do sleep 60;done\n"

And bring up the stack of components by running:

docker compose up -d

Run this snippet of code which will block until the necessary components have started

docker exec -it ksqldb bash -c 'echo -e "\n\n  Waiting for startup… \n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done'

Run kcat to count the messages

4

You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

To do this from the commandline you can use the kcat tool which can act as a consumer (and producer) and is built around the Unix philosophy of pipelines. This means that you can pipe the output (messages) from kcat into another tool to count the number of messages.

docker exec kcat \
    kcat -b broker:29092 -C -t pageviews -e -q | \
    wc -l

Let’s take a close look at the commandline soup we’ve used here to count the messages.

  • docker exec kcat runs the following command with its arguments in the Docker container called kcat

  • \ is a line continuation character

    • kcat runs kcat itself, passing in arguments as follows:

      • -b the location of the cluster broker(s)

      • -C act as a consumer

      • -t read data from the pageviews topic

      • -e exit once at the end of the topic

      • -q run quietly

    • | pipes the messages from kcat to the next command

    • wc reads the piped messages and writes the count to screen

      • -l specifies to count the number of lines in total (one message per line). Contrast this to -c which would return the number of bytes.

Finally, the output of the command is the message count.

      42

Clean up

5

Once you’ve finished you can tear down the Docker Compose stack. This will delete all data that you’ve stored in Kafka.

docker compose down

Use kcat with Confluent Cloud

6

In the example above we provision an entire stack include kcat connecting to a broker in a Docker container. You can use kcat from a Docker container to connect to Kafka clusters elsewhere, such as Confluent Cloud.

If you don’t have an account yet, sign up for Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

Using your Confluent Cloud broker address and API key set the following environment variables

export CCLOUD_BROKER_HOST=my.cluster.gcp.confluent.cloud
export CCLOUD_API_KEY=XXXX
export CCLOUD_API_SECRET=YYYY

Now we can run kcat (passing in the necessary Confluent Cloud details from environment variables) to count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

docker run --rm edenhill/kcat:1.7.1 \
    -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
    -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
    -X sasl.username="${CCLOUD_API_KEY}" \
    -X sasl.password="${CCLOUD_API_SECRET}" \
    -b ${CCLOUD_BROKER_HOST}:9092 \
    -t my_topic \
    -C -e -q | \
    wc -l