How to generate mock data to a local Kafka topic using the Kafka Connect Datagen

Question:

How can I produce mock data to Kafka topics to test my Kafka applications?

Edit this page

Example use case:

You will run a local instance of the Kafka Connect Datagen connector to produce mock data to a local Kafka cluster. This facilitates learning about Kafka and testing your applications.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir kafka-connect-datagen-local && cd kafka-connect-datagen-local

2
Get Confluent Platform

Create a Dockerfile that builds a custom container for Kafka Connect bundled with the free and open source Kafka Connect Datagen connector, installed from Confluent Hub.

FROM confluentinc/cp-kafka-connect-base:5.5.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:0.3.2

Next, create the following docker-compose.yml file to obtain Confluent Platform. Note that it also is configured to build a local image.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: localimage/kafka-connect-datagen:latest
    build:
      context: .
      dockerfile: Dockerfile
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"

Now launch Confluent Platform by running the following command. Note the --build argument which automatically builds the Docker image for Kafka Connect and the bundled kafka-connect-datagen connector.

docker-compose up -d --build

3
Create the connector

Create the Kafka Connect Datagen source connector. It automatically creates the Kafka topic pageviews and produces data to it with a schema specification from https://github.com/confluentinc/kafka-connect-datagen/blob/master/src/main/resources/pageviews_schema.avro

curl -i -X PUT http://localhost:8083/connectors/datagen_local_01/config \
     -H "Content-Type: application/json" \
     -d '{
            "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
            "key.converter": "org.apache.kafka.connect.storage.StringConverter",
            "kafka.topic": "pageviews",
            "quickstart": "pageviews",
            "max.interval": 1000,
            "iterations": 10000000,
            "tasks.max": "1"
        }'

If you run this before Kafka Connect has finished starting up you’ll get the error curl: (52) Empty reply from server - in which case, rerun the above command.

Check that the connector is running:

curl -s http://localhost:8083/connectors/datagen_local_01/status

You should see that the state is RUNNING for both connector and tasks elements

{"name":"datagen_local_01","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

If you get the message {"error_code":404,"message":"No status found for connector datagen_local_01"} then check that the step above in which you created the connector actually succeeded.

4
Consume events from the Kafka topic

Now that the kafka-connect-datagen is running, run the Kafka Avro console consumer to see the data streaming into the Kafka topic.

Note the added properties of print.key and key.separator.

docker-compose exec connect kafka-avro-console-consumer \
 --bootstrap-server broker:9092 \
 --property schema.registry.url=http://schema-registry:8081 \
 --topic pageviews \
 --property print.key=true \
 --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
 --property key.separator=" : " \
 --max-messages 10

After the consumer starts you should see the following output in a few seconds:


461 : {"viewtime":461,"userid":"User_5","pageid":"Page_43"}
471 : {"viewtime":471,"userid":"User_6","pageid":"Page_54"}
481 : {"viewtime":481,"userid":"User_3","pageid":"Page_59"}
491 : {"viewtime":491,"userid":"User_9","pageid":"Page_65"}
501 : {"viewtime":501,"userid":"User_9","pageid":"Page_80"}
511 : {"viewtime":511,"userid":"User_5","pageid":"Page_26"}
521 : {"viewtime":521,"userid":"User_2","pageid":"Page_96"}
531 : {"viewtime":531,"userid":"User_6","pageid":"Page_94"}
541 : {"viewtime":541,"userid":"User_3","pageid":"Page_67"}
551 : {"viewtime":551,"userid":"User_7","pageid":"Page_99"}
Processed a total of 10 messages

Take it to production

1
Cleanup Docker containers

This step is for Docker cleanup. You can shut down the stack by running:

docker-compose down

Deploy on Confluent Cloud

1
Run your app to Confluent Cloud

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

First, create your Kafka cluster in Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage (details).

Next, from the Confluent Cloud UI, click on Tools & client config to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.

Now you’re all set to your run application locally while your Kafka topics and stream processing is backed to your Confluent Cloud instance.