How to count messages in a Kafka topic

Question:

How can I count the number of messages in a Kafka topic?

Edit this page

Example use case:

Messages are stored in Kafka topics until they are removed by Kafka. It can be useful to know how many messages there are currently in a topic. In this example we'll take a topic of pageview data and see how we can count all of the messages in the topic.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir count-messages && cd count-messages

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.1
    container_name: broker
    depends_on:
      - zookeeper
    ports:
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    # An important note about accessing Kafka from clients on other machines:
    # -----------------------------------------------------------------------
    #
    # The config used here exposes port 9092 for _external_ connections to the broker
    # i.e. those from _outside_ the docker network. This could be from the host machine
    # running docker, or maybe further afield if you've got a more complicated setup.
    # If the latter is true, you will need to change the value 'localhost' in
    # KAFKA_ADVERTISED_LISTENERS to one that is resolvable to the docker host from those
    # remote clients
    #
    # For connections _internal_ to the docker network, such as from other services
    # and components, use kafka:29092.
    #
    # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
    # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-
    #
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

  ksqldb:
    # *-----------------------------*
    # To connect to ksqlDB CLI
    #   docker exec --interactive --tty ksqldb ksql http://localhost:8088
    # *-----------------------------*
    image: confluentinc/ksqldb-server:0.11.0
    container_name: ksqldb
    depends_on:
      - broker
    ports:
      - "8088:8088"
    user: root
    environment:
      KSQL_CONFIG_DIR: "/etc/ksqldb"
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:29092
      KSQL_KSQL_SERVICE_ID: confluent_rmoff_01
      KSQL_KSQL_HIDDEN_TOPICS: '^_.*'
      # Setting KSQL_KSQL_CONNECT_WORKER_CONFIG enables embedded Kafka Connect
      KSQL_KSQL_CONNECT_WORKER_CONFIG: "/etc/ksqldb/connect.properties"
      # Kafka Connect config below
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
      KSQL_CONNECT_REST_ADVERTISED_HOST_NAME: 'ksqldb'
      KSQL_CONNECT_REST_PORT: 8083
      KSQL_CONNECT_GROUP_ID: ksqldb-kafka-connect-group-01
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-configs
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-offsets
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: _ksqldb-kafka-connect-group-01-status
      KSQL_CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      KSQL_CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
      KSQL_CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      KSQL_CONNECT_PLUGIN_PATH: '/usr/share/java'

    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        # I miss the confluent-hub client :-/
        # mkdir -p /usr/share/confluent-hub-components/
        # confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ confluentinc/kafka-connect-datagen:0.3.3
        # ------ hack to workaround absence of confluent-hub client
        curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-datagen/versions/0.3.3/confluentinc-kafka-connect-datagen-0.3.3.zip -o /tmp/kafka-connect-datagen.zip
        yum install -y unzip
        unzip /tmp/kafka-connect-datagen.zip -d /usr/share/java/kafka-connect-datagen
        # ----------------------------------------------------------
        #
        echo "Launching ksqlDB"
        /usr/bin/docker/run &

         echo "Waiting for Kafka Connect to start listening on localhost ⏳"
         while : ; do
          curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
          echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
          if [ $$curl_status -eq 200 ] ; then
            break
          fi
          sleep 5
        done

        echo -e "\n--\n+> Creating Data Generators"
        curl -i -X PUT http://localhost:8083/connectors/datagen_01/config \
             -H "Content-Type: application/json" \
             -d '{
                    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                    "value.converter.schemas.enable":false,
                    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                    "kafka.topic": "pageviews",
                    "quickstart": "pageviews",
                    "iterations": 42,
                    "tasks.max": "1"
                }'

        curl -i -X PUT http://localhost:8083/connectors/datagen_02/config \
             -H "Content-Type: application/json" \
              -d '{
                    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
                    "value.converter.schemas.enable":false,
                    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
                    "kafka.topic": "trades",
                    "quickstart": "Stock_Trades",
                    "max.interval": 1000,
                    "iterations": 4242424242,
                    "tasks.max": "1"
                }'

        sleep infinity
    volumes:
      - ./src:/opt/app/src
      - ./test:/opt/app/test

  kafkacat:
    image: edenhill/kafkacat:1.6.0
    container_name: kafkacat
    links:
      - broker
    entrypoint:
      - /bin/sh
      - -c
      - |
        apk add jq;
        while [ 1 -eq 1 ];do sleep 60;done

And bring up the stack of components by running:

docker-compose up -d

Run this snippet of code which will block until the necessary components have started

docker exec -it ksqldb bash -c 'echo -e "\n\n  Waiting for startup… \n"; while : ; do curl_status=$(curl -s -o /dev/null -w %{http_code} http://ksqldb:8088/info) ; echo -e $(date) " ksqlDB server listener HTTP state: " $curl_status " (waiting for 200)" ; if [ $curl_status -eq 200 ] ; then  break ; fi ; sleep 5 ; done'

3
Run kafkacat to count the messages

You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

To do this from the commandline you can use the kafkacat tool which can act as a consumer (and producer) and is built around the Unix philosophy of pipelines. This means that you can pipe the output (messages) from kafkacat into another tool to count the number of messages.

docker exec kafkacat \
    kafkacat -b broker:29092 -C -t pageviews -e -q | \
    wc -l
      42

Let’s take a close look at the commandline soup we’ve used here to count the messages.

  • docker exec kafkacat runs the following command with its arguments in the Docker container called kafkacat

  • \ is a line continuation character

    • kafkacat runs kafkacat itself, passing in arguments as follows:

      • -b the location of the cluster broker(s)

      • -C act as a consumer

      • -t read data from the pageviews topic

      • -e exit once at the end of the topic

      • -q run quietly

    • | pipes the messages from kafkacat to the next command

    • wc reads the piped messages and writes the count to screen

      • -l specifies to count the number of lines in total (one message per line). Contrast this to -c which would return the number of bytes.

4
Clean up

Once you’ve finished you can tear down the Docker Compose stack. This will delete all data that you’ve stored in Kafka.

docker-compose down

5
Use kafkacat with Confluent Cloud

In the example above we provision an entire stack include kafkacat connecting to a broker in a Docker container. You can use kafkacat from a Docker container to connect to Kafka clusters elsewhere, such as Confluent Cloud.

If you don’t have an account yet, sign up for Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

Using your Confluent Cloud broker address and API key set the following environment variables

export CCLOUD_BROKER_HOST=my.cluster.gcp.confluent.cloud
export CCLOUD_API_KEY=XXXX
export CCLOUD_API_SECRET=YYYY

Now we can run kafkacat (passing in the necessary Confluent Cloud details from environment variables) to count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

docker run --rm edenhill/kafkacat:1.6.0 \
    -X security.protocol=SASL_SSL -X sasl.mechanisms=PLAIN \
    -X ssl.ca.location=./etc/ssl/cert.pem -X api.version.request=true \
    -X sasl.username="${CCLOUD_API_KEY}" \
    -X sasl.password="${CCLOUD_API_SECRET}" \
    -b ${CCLOUD_BROKER_HOST}:9092 \
    -t my_topic \
    -C -e -q | \
    wc -l