How to maintain message ordering and no message duplication

Question:

How can you maintain the order of messages and prevent message duplication in a Kafka topic partition?

Edit this page

Example use case:

If your application needs to maintain message ordering and prevent duplication, you can enable idempotency for your Apache Kafka producer. An idempotent producer has a unique producer ID and uses sequence IDs for each message, allowing the broker to ensure, on a per-partition basis, that it is committing ordered messages with no duplication.

Hands-on code example:

Short Answer

Set the ProducerConfig configuration parameters relevant to the idempotent producer:

enable.idempotence=true
acks=all

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Initialize the project

2

To get started, make a new directory anywhere you’d like for this project:

mkdir message-ordering && cd message-ordering

Make the following directories to set up its structure:

mkdir src test

Get Confluent Platform

3

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud) (this tutorial uses just ZooKeeper and the Kafka broker):

version: '2'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk

And launch it by running:

docker compose up -d

Create the Kafka topic

4

Create the Kafka topic myTopic with 2 partitions:

docker exec -t broker kafka-topics --bootstrap-server broker:9092 --topic myTopic --create --replication-factor 1 --partitions 2

Describe the topic

5

Describe the properties of the topic that you just created.

docker exec -t broker kafka-topics --bootstrap-server broker:9092 --topic myTopic --describe

The output should be the following. Notice that mytopic has two partitions numbered 0 and 1.

Topic: myTopic	PartitionCount: 2	ReplicationFactor: 1	Configs:
	Topic: myTopic	Partition: 0	Leader: 101	Replicas: 101	Isr: 101
	Topic: myTopic	Partition: 1	Leader: 101	Replicas: 101	Isr: 101

Configure the project application

6

Create the following Gradle build file for the project, named build.gradle:

buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath "gradle.plugin.com.github.jengelman.gradle.plugins:shadow:7.0.0"
    }
}

plugins {
    id "java"
    id "idea"
    id "eclipse"
}

sourceCompatibility = JavaVersion.VERSION_17
targetCompatibility = JavaVersion.VERSION_17
version = "0.0.1"

repositories {
    mavenCentral()

    maven {
        url "https://packages.confluent.io/maven"
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation "org.slf4j:slf4j-simple:2.0.7"
    implementation 'org.apache.kafka:kafka-streams:3.4.0'
    implementation ('org.apache.kafka:kafka-clients') {
       version {
           strictly '3.4.0'
        }
      }
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.KafkaProducerApplication"
    )
  }
}

shadowJar {
    archiveBaseName = "message-ordering-standalone"
    archiveClassifier = ''
}

Run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Set the application properties

7

Create a development properties file at configuration/dev.properties:

bootstrap.servers=localhost:29092

#Properties below this line are specific to code in this application
output.topic.name=myTopic

Create the Kafka Producer application

8

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Before you create your full application code, let’s highlight some of the most important ProducerConfig configuration parameters relevant to the idempotent producer:

enable.idempotence=true
acks=all
max.in.flight.requests.per.connection=5
retries=2147483647

The following parameter is required to be explicitly configured:

  • enable.idempotence: when set to true, it enables an idempotent producer which ensures that exactly one copy of each message is written to the brokers, and in order. The default value is enable.idempotence=false, so you must explicitly set this to enable.idempotence=true.

The other parameters may not be required to be explicitly set, but there are some noteworthy caveats:

  • acks: the KafkaProducer uses the acks configuration to tell the leader broker how many acknowledgments to wait for to consider a produce request complete. This value must be acks=all for the idempotent producer to work, otherwise the producer cannot guarantee idempotence. The default value is acks=1, so you have two choices: (a) do not explicitly set it in the configuration and allow the producer automatically override it, or (b) explicitly set this to acks=all. The producer will fail to start if enable.idempotence=true and your application configures this to anything but acks=all.

  • max.in.flight.requests.per.connection: the maximum number of unacknowledged requests the producer sends on a single connection before blocking. The idempotent producer still maintains message order even with pipelining (i.e., max.in.flight.requests.per.connection can be greater than 1), and the maximum value supported with idempotency is 5. The default value is already max.in.flight.requests.per.connection=5, so no change is required for the idempotent producer.

  • retries: setting a value greater than zero will cause the producer to resend any record whose send fails with a potentially transient error. The only requirement for idempotency is that this is greater than zero. The default value is already retries=2147483647, so no change is required for the idempotent producer.

This is only a small subset of producer configuration parameters focused on idempotent producer semantics. For further reading, please see KIP-98. This KIP also discusses other elements of exactly once semantics (EOS), including transactional guarantees which enable applications to produce to multiple partitions atomically, ie. all writes across multiple partitions can succeed or fail as a unit.

Now let’s create the application source code at src/main/java/io/confluent/developer/KafkaProducerApplication.java.

package io.confluent.developer;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

public class KafkaProducerApplication {

    private final Producer<String, String> producer;
    final String outTopic;

    public KafkaProducerApplication(final Producer<String, String> producer,
                                    final String topic) {
        this.producer = producer;
        outTopic = topic;
    }

    public void produce(final String message) {
        final String[] parts = message.split("-");
        final String key, value;
        if (parts.length > 1) {
            key = parts[0];
            value = parts[1];
        } else {
            key = null;
            value = parts[0];
        }
        final ProducerRecord<String, String> producerRecord = new ProducerRecord<>(outTopic, key, value);
        producer.send(producerRecord,
                (recordMetadata, e) -> {
                    if(e != null) {
                       e.printStackTrace();
                    } else {
                      System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
                    }
                }
            );
    }

    public void shutdown() {
        producer.close();
    }

    public static Properties loadProperties(String fileName) throws IOException {
        final Properties envProps = new Properties();
        final FileInputStream input = new FileInputStream(fileName);
        envProps.load(input);
        input.close();

        return envProps;
    }

    public static void main(String[] args) throws Exception {
        if (args.length < 2) {
            throw new IllegalArgumentException(
                    "This program takes two arguments: the path to an environment configuration file and" +
                            "the path to the file with records to send");
        }

        final Properties props = KafkaProducerApplication.loadProperties(args[0]);

        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        final String topic = props.getProperty("output.topic.name");
        final Producer<String, String> producer = new KafkaProducer<>(props);
        final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);

        String filePath = args[1];
        try {
            List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
            linesToProduce.stream()
                          .filter(l -> !l.trim().isEmpty())
                          .forEach(producerApp::produce);
            System.out.println("Offsets and timestamps committed in batch from " + filePath);
        } catch (IOException e) {
            System.err.printf("Error reading file %s due to %s %n", filePath, e);
        } finally {
          producerApp.shutdown();
        }
    }
}

Create data to produce to Kafka

9

Create the following file input.txt in the base directory of the tutorial. The numbers before the - will be the key and the part after will be the value.

a-1
b-2
c-3
d-4
a-5
b-6
c-7
d-8
a-9
b-10
c-11
d-12

Compile and run the Kafka Producer application

10

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar for the KafkaProducerApplication, you can launch it locally.

java -jar build/libs/message-ordering-standalone-0.0.1.jar configuration/dev.properties input.txt

After you run the previous command, the application will process the file and you should some logs like this on the console:

[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=myApp] Instantiated an idempotent producer. (1)
....
[kafka-producer-network-thread | myApp] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=myApp] ProducerId set to 0 with epoch 0 (2)
1 The producer is configured for idempotency
2 This app has been assigned ProducerId=0 (If you were to run the app again, then it would increase to ProducerId=1)

And then you should see the output from the Producer application, which displays confirmation at which offset each record was written to via a Callback lambda expression:

Offsets and timestamps committed in batch from input.txt
key/value a/1   written to topic[partition] myTopic[0] at offset 0
key/value b/2   written to topic[partition] myTopic[0] at offset 1
key/value c/3   written to topic[partition] myTopic[0] at offset 2
key/value a/5   written to topic[partition] myTopic[0] at offset 3
key/value b/6   written to topic[partition] myTopic[0] at offset 4
key/value c/7   written to topic[partition] myTopic[0] at offset 5
key/value a/9   written to topic[partition] myTopic[0] at offset 6
key/value b/10  written to topic[partition] myTopic[0] at offset 7
key/value c/11  written to topic[partition] myTopic[0] at offset 8
key/value d/4   written to topic[partition] myTopic[1] at offset 0
key/value d/8   written to topic[partition] myTopic[1] at offset 1
key/value d/12  written to topic[partition] myTopic[1] at offset 2

Test it

View all records in the topic

1

Run a console consumer to read all the messages from myTopic to confirm the producer published the expected records.

docker exec -t broker kafka-console-consumer --topic myTopic \
 --bootstrap-server broker:9092 \
 --from-beginning \
 --property print.key=true \
 --property key.separator=" : "

The output from the consumer should look something like below. Notice that the messages are not in order—this is expected! This illustrates that for the consumer, message order is not maintained across topic partitions, it is only maintained per partition (as we will see in the next few steps).

a : 1
b : 2
c : 3
a : 5
b : 6
c : 7
a : 9
b : 10
c : 11
d : 4
d : 8
d : 12

Close the consumer with Ctrl-C.

Consume the data in partition 0

2

Consume data from the Kafka topic, specifying only to read from partition 0.

docker exec -t broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic myTopic \
  --property print.key=true \
  --property key.separator=, \
  --partition 0 \
  --from-beginning

You should see only some of the records in this partition.

a,1
b,2
c,3
a,5
b,6
c,7
a,9
b,10
c,11
Processed a total of 9 messages

Close the consumer with Ctrl-C.

View the broker log segment file for partition 0

3

Now let’s look at the Kafka broker’s log segment files using the kafka-dump-log administrative tool. First, examine partition 0, indicated by the 0 in myTopic-0.

docker exec -t broker kafka-dump-log \
  --print-data-log \
  --files '/var/lib/kafka/data/myTopic-0/00000000000000000000.log' \
  --deep-iteration

You should see:

Dumping /var/lib/kafka/data/myTopic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 8 count: 9 baseSequence: 0 lastSequence: 8 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1602295941754 size: 144 magic: 2 compresscodec: NONE crc: 1009740801 isvalid: true
| offset: 0 CreateTime: 1602295941743 keysize: 1 valuesize: 1 sequence: 0 headerKeys: [] key: a payload: 1
| offset: 1 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 1 headerKeys: [] key: b payload: 2
| offset: 2 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 2 headerKeys: [] key: c payload: 3
| offset: 3 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 3 headerKeys: [] key: a payload: 5
| offset: 4 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 4 headerKeys: [] key: b payload: 6
| offset: 5 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 5 headerKeys: [] key: c payload: 7
| offset: 6 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 6 headerKeys: [] key: a payload: 9
| offset: 7 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 7 headerKeys: [] key: b payload: 10
| offset: 8 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 8 headerKeys: [] key: c payload: 11

Note the familiar producerId: 0, which corresponds to the earlier log output from the producer application run. (If the producer were not configured to be idempotent, this would show producerId: -1.)

Also observe that each message has a unique sequence number, starting with sequence: 0 through sequence: 8, that are not duplicated and are all in order. The broker checks the sequence number to ensure idempotency per partition, such that if a producer experiences a retriable exception and resends a message, sequence numbers will not be duplicated or out of order in the committed log. (If the producer were not configured to be idempotent, the messages would show sequence: -1.)

Consume the data in partition 1

4

Consume data from the Kafka topic, specifying only to read from partition 1.

docker exec -t broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic myTopic \
  --property print.key=true \
  --property key.separator=, \
  --partition 1 \
  --from-beginning

You should see only some of the records in this partition.

d,4
d,8
d,12
Processed a total of 3 messages

Close the consumer with Ctrl-C.

View the broker log segment file for partition 1

5

Use the kafka-dump-log administrative tool again to examine partition 1, indicated by the 1 in myTopic-1.

docker exec -t broker kafka-dump-log \
  --print-data-log \
  --files '/var/lib/kafka/data/myTopic-1/00000000000000000000.log' \
  --deep-iteration

You should see:

Dumping /var/lib/kafka/data/myTopic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 2 count: 3 baseSequence: 0 lastSequence: 2 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1602295941754 size: 89 magic: 2 compresscodec: NONE crc: 308733939 isvalid: true
| offset: 0 CreateTime: 1602295941753 keysize: 1 valuesize: 1 sequence: 0 headerKeys: [] key: d payload: 4
| offset: 1 CreateTime: 1602295941754 keysize: 1 valuesize: 1 sequence: 1 headerKeys: [] key: d payload: 8
| offset: 2 CreateTime: 1602295941754 keysize: 1 valuesize: 2 sequence: 2 headerKeys: [] key: d payload: 12

The producerId is the same as shown in the log output from the previous partition, because it is the same producer application with the same producer ID. The sequence numbers in this partition are unique and unrelated to the other partition, so these records have sequence: 0 through sequence: 2.

Deploy on Confluent Cloud

Run your app with Confluent Cloud

1

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g., Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.