Add key to data ingested through Kafka Connect

Problem:

You have data in a source system (such as a database) that you want to stream into Kafka using Kafka Connect. You want to add a key to the data as part of the ingest.

Edit this page

Example use case:

Kafka Connect is the integration API for Apache Kafka. It enables you to stream data from source systems (such databases, message queues, SaaS platforms, and flat files) into Kafka, and from Kafka to target systems. When you stream data into Kafka you often need to set the key correctly for partitioning and application logic reasons. In this example there is data about cities in a database, and we want to key the resulting Kafka message by the city_id field. There are different ways to set the key correctly and these tutorials will show you how. It will also cover how to declare the schema and use Kafka Streams to process the data using SpecificAvro.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir connect-add-key-to-source && cd connect-add-key-to-source

2
Prepare the source data

Create a file cities.sql with commands to pre-populate the database table with city information:

DROP TABLE IF EXISTS cities;
CREATE TABLE cities (city_id INTEGER PRIMARY KEY NOT NULL, name VARCHAR(255), state VARCHAR(255));
INSERT INTO cities (city_id, name, state) VALUES (1, 'Raleigh', 'NC');
INSERT INTO cities (city_id, name, state) VALUES (2, 'Mountain View', 'CA');
INSERT INTO cities (city_id, name, state) VALUES (3, 'Knoxville', 'TN');
INSERT INTO cities (city_id, name, state) VALUES (4, 'Houston', 'TX');
INSERT INTO cities (city_id, name, state) VALUES (5, 'Olympia', 'WA');
INSERT INTO cities (city_id, name, state) VALUES (6, 'Bismarck', 'ND');

3
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform. Make sure that you create this file in the same place as the cities.sql file that you created above.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.4.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:5.4.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect:5.4.1
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"

  kafkacat:
    image: edenhill/kafkacat:1.5.0
    container_name: kafkacat
    links:
      - broker
    entrypoint:
      - /bin/sh
      - -c
      - |
        apk add jq;
        while [ 1 -eq 1 ];do sleep 60;done

  postgres:
    # *-----------------------------*
    # To connect to the DB:
    #   docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
    # *-----------------------------*
    image: postgres:11
    container_name: postgres
    environment:
     - POSTGRES_USER=postgres
     - POSTGRES_PASSWORD=postgres
    volumes:
     - ./cities.sql:/docker-entrypoint-initdb.d/cities.sql

Now launch Confluent Platform by running:

docker-compose up -d

4
Check the source data

Check the data in the source database. Observe the city_id primary key:

echo 'SELECT * FROM cities;' | docker exec -i postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
 city_id |     name      | state
---------+---------------+-------
       1 | Raleigh       | NC
       2 | Mountain View | CA
       3 | Knoxville     | TN
       4 | Houston       | TX
       5 | Olympia       | WA
       6 | Bismarck      | ND
(6 rows)

5
Create the connector

Create the JDBC source connector. Note the transforms stanza which is responsible for setting the key to the value of the city_id field.

curl -i -X PUT http://localhost:8083/connectors/jdbc_source_postgres_01/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url": "jdbc:postgresql://postgres:5432/postgres",
            "connection.user": "postgres",
            "connection.password": "postgres",
            "mode":"incrementing",
            "incrementing.column.name":"city_id",
            "topic.prefix":"postgres_",
            "transforms":"copyFieldToKey,extractValuefromStruct",
            "transforms.copyFieldToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
            "transforms.copyFieldToKey.fields":"city_id",
            "transforms.extractValuefromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
            "transforms.extractValuefromStruct.field":"city_id"
        }'

If you run this before Kafka Connect has finished starting up you’ll get the error curl: (52) Empty reply from server - in which case, rerun the above command.

Check that the connector is running:

curl -s http://localhost:8083/connectors/jdbc_source_postgres_01/status

You should see that the state if RUNNING for both connector and tasks elements

{"name":"jdbc_source_postgres_01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

If you get the message {"error_code":404,"message":"No status found for connector jdbc_source_postgres_01"} then check that the step above in which you created the connector actually succeeded.

6
Consume events from the output topic

With the connector running let’s now inspect the data on the Kafka topic. Here we’ll use kafkacat because of its rich capabilities for inspecting and displaying details of Kafka messages:

docker exec -i kafkacat kafkacat -b broker:9092 -t postgres_cities \
            -C -s avro -r http://schema-registry:8081 -e \
            -f 'Key     (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n'
Key     (6 bytes):      1
Payload (19 bytes):     {"city_id": 1, "name": {"string": "Raleigh"}, "state": {"string": "NC"}}
--
Key     (6 bytes):      2
Payload (25 bytes):     {"city_id": 2, "name": {"string": "Mountain View"}, "state": {"string": "CA"}}
--
Key     (6 bytes):      3
Payload (21 bytes):     {"city_id": 3, "name": {"string": "Knoxville"}, "state": {"string": "TN"}}
--
Key     (6 bytes):      4
Payload (19 bytes):     {"city_id": 4, "name": {"string": "Houston"}, "state": {"string": "TX"}}
--
Key     (6 bytes):      5
Payload (19 bytes):     {"city_id": 5, "name": {"string": "Olympia"}, "state": {"string": "WA"}}
--
Key     (6 bytes):      6
Payload (20 bytes):     {"city_id": 6, "name": {"string": "Bismarck"}, "state": {"string": "ND"}}
--

7
Clean up

Shut down the stack by running:

docker-compose down