How to read from a specific offset and partition with the Kafka Console Consumer

Question:

How do I read from a specific offset and partition of a Kafka topic?

Edit this page

Example use case:

You are confirming record arrivals and you'd like to read from a specific offset in a topic partition. In this tutorial you'll learn how to use the Kafka console consumer to quickly debug issues by reading from a specific offset as well as control the number of records you read.

Code example:





Short Answer

Use the kafka-console-consumer command with the --partition and --offset flags to read from a specific partition and offset.

docker run -v $PWD/configuration/ccloud.properties:/tmp/ccloud.properties confluentinc/cp-kafka:6.1.1 \
  bash -c 'kafka-console-consumer \
    --topic example-topic \
    --bootstrap-server `grep "^\s*bootstrap.server" /tmp/ccloud.properties | tail -1` \
    --consumer.config /tmp/ccloud.properties \
    --property print.key=true \
    --property key.separator="-" \
    --partition 1 \
    --offset 3'

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir console-consumer-read-specific-offsets-partition && cd console-consumer-read-specific-offsets-partition

Next, create a directory for configuration data:

mkdir configuration

2
Provision your fully managed Kafka cluster in Confluent Cloud

  1. Sign up for Confluent Cloud, a fully-managed Apache Kafka service.

  2. After you log in to Confluent Cloud, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

3
Write the cluster information into a local file

From the Confluent Cloud Console, navigate to your Kafka cluster. From the Clients view, get the connection information customized to your cluster (select Java).

Create new credentials for your Kafka cluster and Schema Registry, and then Confluent Cloud will show a configuration similar to below with your new credentials automatically populated (make sure show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='{{ CLUSTER_API_KEY }}'   password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials.

4
Download and setup the Confluent Cloud CLI

This tutorial has some steps for Kafka topic management and/or reading from or writing to Kafka topics, for which you can use the Confluent Cloud Console or install the Confluent Cloud CLI. Instructions for installing Confluent Cloud CLI and configuring it to your Confluent Cloud environment is available from within the Confluent Cloud Console: navigate to your Kafka cluster, click on the CLI and tools section, and run through the steps in the CCloud CLI tab.

5
Create a topic with multiple partitions

Your first step is to create a topic to produce to and consume from. This time you’ll add more than one partition so you can see how the keys end up on different partitions.

Use the Confluent Cloud CLI to create the topic:

ccloud kafka topic create example-topic --partitions 2

6
Produce records with keys and values

To get started, lets produce some records to your new topic.

Since you’ve created a topic with more than one partition, you’ll send full key-value pairs so you’ll be able to see how different keys end up on different partitions. To send full key-value pairs you’ll specify the parse.key and key.separator options to the console producer command.

Let’s run the following command in the broker container to start a new console producer:

ccloud kafka topic produce example-topic --parse-key --delimiter ":"

Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:

key1:the lazy
key2:fox jumped
key3:over the
key4:brown cow
key1:All
key2:streams
key3:lead
key4:to
key1:Kafka
key2:Go to
key3:Kafka
key4:summit

After you’ve sent the records, you can close the producer with a CTRL+C command.

7
Start a console consumer to read from the first partition

Next let’s open up a console consumer to read records sent to the topic in the previous step, but you’ll only read from the first partition. Kafka partitions are zero based so your two partitions are numbered 0, and 1 respectively.

Confluent Cloud CLI currently does not have the ability to specify reading from a particular partition, so for the next few steps you’ll be using the console consumer built into a Docker image. Notice that you are passing in the path to the configuration/ccloud.properties file that you created earlier.

Lets start a console consumer to read only records from the first partition, 0

docker run -v $PWD/configuration/ccloud.properties:/tmp/ccloud.properties confluentinc/cp-kafka:6.1.1 \
  bash -c 'kafka-console-consumer \
    --topic example-topic \
    --bootstrap-server `grep "^\s*bootstrap.server" /tmp/ccloud.properties | tail -1` \
    --consumer.config /tmp/ccloud.properties \
    --from-beginning \
    --property print.key=true \
    --property key.separator="-" \
    --partition 0'

After a few seconds you should see something like this (your output will vary depending on the hashing algorithm):

key2-fox jumped
key4-brown cow
key2-streams
key4-to
key2-Go to
key4-summit

You’ll notice you sent 12 records, but only 6 went to the first partition. The reason for this is the way Kafka calculates the partition assignment for a given record. Kafka calculates the partition by taking the hash of the key modulo the number of partitions. So, even though you have 2 partitions, depending on what the key hash value is, you aren’t guaranteed an even distribution of records across partitions.

Go ahead and shut down the current consumer with a CTRL+C

8
Start a console consumer to read from the second partition

In the previous step, you consumed records from the first partition of your topic. In this step you’ll consume the rest of your records from the second partition 1.

If you haven’t done so already, close the previous console consumer with a CTRL+C.

Then start a new console consumer to read only records from the second partition:

docker run -v $PWD/configuration/ccloud.properties:/tmp/ccloud.properties confluentinc/cp-kafka:6.1.1 \
  bash -c 'kafka-console-consumer \
    --topic example-topic \
    --bootstrap-server `grep "^\s*bootstrap.server" /tmp/ccloud.properties | tail -1` \
    --consumer.config /tmp/ccloud.properties \
    --from-beginning \
    --property print.key=true \
    --property key.separator="-" \
    --partition 1'

After a few seconds you should see something like this

key1-the lazy
key3-over the
key1-All
key3-lead
key1-Kafka

As you’d expect, the remaining 6 records are on the second partition.

Go ahead and shut down the current consumer with a CTRL+C

9
Read records starting from a specific offset

So far you’ve learned how to consume records from a specific partition. When you specify the partition, you can optionally specify the offset to start consuming from. Specifying a specific offset can be helpful when debugging an issue, in that you can skip consuming records that you know aren’t a potential problem.

If you haven’t done so already, close the previous console consumer with a CTRL+C.

From the previous step you know there are 6 records in the second partition. In this step you’ll only consume records starting from offset 3, so you should only see the last 3 records on the screen. The changes in this command include removing the --from-beginning property and adding an --offset flag

Here’s the command to read records from the second partition starting at offset 6:

docker run -v $PWD/configuration/ccloud.properties:/tmp/ccloud.properties confluentinc/cp-kafka:6.1.1 \
  bash -c 'kafka-console-consumer \
    --topic example-topic \
    --bootstrap-server `grep "^\s*bootstrap.server" /tmp/ccloud.properties | tail -1` \
    --consumer.config /tmp/ccloud.properties \
    --property print.key=true \
    --property key.separator="-" \
    --partition 1 \
    --offset 3'

After a few seconds you should see something like this

key1-All
key3-lead
key1-Kafka

As you can see, you’ve consumed records starting from offset 3 to the end of the log.

Go ahead and shut down the current consumer with a CTRL+C