How can I stream data from a source system (such as a database) into Kafka using Kafka Connect, and add a key to the data as part of the ingest?
Kafka Connect is the integration API for Apache Kafka. It enables you to stream data from source systems (such databases, message queues, SaaS platforms, and flat files) into Kafka, and from Kafka to target systems. When you stream data into Kafka you often need to set the key correctly for partitioning and application logic reasons. In this example there is data about cities in a database, and we want to key the resulting Kafka message by the city_id field. There are different ways to set the key correctly and these tutorials will show you how. It will also cover how to declare the schema and use Kafka Streams to process the data using SpecificAvro.
To get started, make a new directory anywhere you’d like for this project:
mkdir connect-add-key-to-source && cd connect-add-key-to-source
Create a file cities.sql
with commands to pre-populate the database table with city information:
DROP TABLE IF EXISTS cities;
CREATE TABLE cities (city_id INTEGER PRIMARY KEY NOT NULL, name VARCHAR(255), state VARCHAR(255));
INSERT INTO cities (city_id, name, state) VALUES (1, 'Raleigh', 'NC');
INSERT INTO cities (city_id, name, state) VALUES (2, 'Mountain View', 'CA');
INSERT INTO cities (city_id, name, state) VALUES (3, 'Knoxville', 'TN');
INSERT INTO cities (city_id, name, state) VALUES (4, 'Houston', 'TX');
INSERT INTO cities (city_id, name, state) VALUES (5, 'Olympia', 'WA');
INSERT INTO cities (city_id, name, state) VALUES (6, 'Bismarck', 'ND');
Next, create the following docker-compose.yml
file to obtain Confluent Platform. Make sure that you create this file in the same place as the cities.sql
file that you created above.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.0.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:6.0.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:6.0.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
connect:
image: confluentinc/cp-kafka-connect:5.5.1
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
kafkacat:
image: edenhill/kafkacat:1.5.0
container_name: kafkacat
links:
- broker
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
postgres:
# *-----------------------------*
# To connect to the DB:
# docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
# *-----------------------------*
image: postgres:11
container_name: postgres
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
volumes:
- ./cities.sql:/docker-entrypoint-initdb.d/cities.sql
Now launch Confluent Platform by running:
docker-compose up -d
Check the data in the source database. Observe the city_id
primary key:
echo 'SELECT * FROM cities;' | docker exec -i postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
city_id | name | state
---------+---------------+-------
1 | Raleigh | NC
2 | Mountain View | CA
3 | Knoxville | TN
4 | Houston | TX
5 | Olympia | WA
6 | Bismarck | ND
(6 rows)
Create the JDBC source connector. Note the transforms
stanza which is responsible for setting the key to the value of the city_id
field.
curl -i -X PUT http://localhost:8083/connectors/jdbc_source_postgres_01/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"mode":"incrementing",
"incrementing.column.name":"city_id",
"topic.prefix":"postgres_",
"transforms":"copyFieldToKey,extractKeyFromStruct",
"transforms.copyFieldToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.copyFieldToKey.fields":"city_id",
"transforms.extractKeyFromStruct.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKeyFromStruct.field":"city_id"
}'
If you run this before Kafka Connect has finished starting up you’ll get the error curl: (52) Empty reply from server
- in which case, rerun the above command.
Check that the connector is running:
curl -s http://localhost:8083/connectors/jdbc_source_postgres_01/status
You should see that the state is RUNNING
for both connector
and tasks
elements
{"name":"jdbc_source_postgres_01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}
If you get the message {"error_code":404,"message":"No status found for connector jdbc_source_postgres_01"}
then check that the step above in which you created the connector actually succeeded.
With the connector running let’s now inspect the data on the Kafka topic. Here we’ll use kafkacat
because of its rich capabilities for inspecting and displaying details of Kafka messages:
docker exec -i kafkacat kafkacat -b broker:9092 -t postgres_cities \
-C -s avro -r http://schema-registry:8081 -e \
-f 'Key (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n'
Key (6 bytes): 1
Payload (19 bytes): {"city_id": 1, "name": {"string": "Raleigh"}, "state": {"string": "NC"}}
--
Key (6 bytes): 2
Payload (25 bytes): {"city_id": 2, "name": {"string": "Mountain View"}, "state": {"string": "CA"}}
--
Key (6 bytes): 3
Payload (21 bytes): {"city_id": 3, "name": {"string": "Knoxville"}, "state": {"string": "TN"}}
--
Key (6 bytes): 4
Payload (19 bytes): {"city_id": 4, "name": {"string": "Houston"}, "state": {"string": "TX"}}
--
Key (6 bytes): 5
Payload (19 bytes): {"city_id": 5, "name": {"string": "Olympia"}, "state": {"string": "WA"}}
--
Key (6 bytes): 6
Payload (20 bytes): {"city_id": 6, "name": {"string": "Bismarck"}, "state": {"string": "ND"}}
--
Shut down the stack by running:
docker-compose down
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.
First, create your Kafka cluster in Confluent Cloud.
Use the promo code CC100KTS
to receive an additional $100 free usage (details).
Next, from the Confluent Cloud UI, click on Tools & client config
to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.