How to count messages in a Kafka topic

Question:

How can I count the number of messages in a Kafka topic?

Edit this page

Example use case:

It can be useful to know how many messages there are currently in a topic. However, you cannot calculate this straight from the offsets, because you need to take into account the topic's retention policy, log compaction, and potential duplicate messages. In this example we'll take a topic of pageview data and see how we can count all of the messages in the topic. Note that the time complexity for this tutorial is O(n) (linear). Processing time will be dependent on the number of messages in the topic and large data sets will require long running times.

Code example:





Short Answer

Consume the entire Kafka topic using kafkacat, and count how many messages are read.

docker run -it --network=host \
    -v ${PWD}/configuration/ccloud.properties:/tmp/configuration/ccloud.properties \
    edenhill/kcat:1.7.0 kafkacat \
         -F /tmp/configuration/ccloud.properties \
         -C -t test-topic \
         -e -q \
         | grep -v "Reading configuration from file" | wc -l

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir count-messages && cd count-messages

2
Provision your fully managed Kafka cluster in Confluent Cloud

  1. Sign up for Confluent Cloud, a fully-managed Apache Kafka service.

  2. After you log in to Confluent Cloud, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

3
Write the cluster information into a local file

From the Confluent Cloud UI, navigate to your Kafka cluster. From the Clients view, get the connection information customized to your cluster (select C/C++).

Create new credentials for your Kafka cluster, and then Confluent Cloud will show a configuration similar to below with your new credentials automatically populated (make sure show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Kafka
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username={{ CLUSTER_API_KEY }}
sasl.password={{ CLUSTER_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the UI so that it includes your Confluent Cloud information and credentials.

4
Download and setup the Confluent Cloud CLI

This tutorial has some steps for Kafka topic management and/or reading from or writing to Kafka topics, for which you can use the Confluent Cloud Console or install the Confluent Cloud CLI. Instructions for installing Confluent Cloud CLI and configuring it to your Confluent Cloud environment is available from within the Confluent Cloud Console: navigate to your Kafka cluster, click on the CLI and tools section, and run through the steps in the CCloud CLI tab.

5
Create the Kafka topic

In this step we’re going to create a topic for use during this tutorial. Use the following command to create the topic:

ccloud kafka topic create test-topic

6
Produce messages to the topic

Produce some messages to the Kafka topic.

ccloud kafka topic produce test-topic

Enter a few records and press <ctrl-c> when finished.

Apache
Kafka
Is
The
Best

7
Count the messages

You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read.

To do this from the commandline you can use the kafkacat tool which is built around the Unix philosophy of pipelines. This means that you can pipe the output (messages) from kafkacat into another tool like wc to count the number of messages.

As input, pass in the configuration/ccloud.properties file that you created in an earlier step.

docker run -it --network=host \
    -v ${PWD}/configuration/ccloud.properties:/tmp/configuration/ccloud.properties \
    edenhill/kcat:1.7.0 kafkacat \
         -F /tmp/configuration/ccloud.properties \
         -C -t test-topic \
         -e -q \
         | grep -v "Reading configuration from file" | wc -l

Let’s take a close look at the commandline soup we’ve used here to count the messages.

  • docker exec kafkacat runs the following command with its arguments in the Docker container called kafkacat

  • \ is a line continuation character

    • kafkacat runs kafkacat itself, passing in arguments as follows:

      • -F Kafka cluster connection information

      • -C act as a consumer

      • -t read data from the test-topic topic

      • -e exit once at the end of the topic

      • -q run quietly

    • | pipes the messages from kafkacat to the next command

    • grep -v "Reading configuration from file" skip the log message

    • wc -l reads the piped messages and writes the number of lines in total (one message per line) to screen

Finally, the output of the command is the message count.

      5

8
Teardown Confluent Cloud resources

You may try another Kafka tutorial, but if you don’t plan on doing other tutorials, use the Confluent Cloud Console or CLI to destroy all the resources you created. Verify they are destroyed to avoid unexpected charges.