How to generate mock data to a Kafka topic using the Kafka Connect Datagen

Question:

How can I produce mock data to Kafka topics to test my Kafka applications?

Edit this page

Example use case:

You will run an instance of the Kafka Connect Datagen connector to produce mock data to a Kafka cluster. This facilitates learning about Kafka and testing your applications.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir kafka-connect-datagen && cd kafka-connect-datagen

Next, create a directory for configuration data:

mkdir configuration

2
Provision your fully managed Kafka cluster in Confluent Cloud

  1. Sign up for Confluent Cloud, a fully-managed Apache Kafka service.

  2. After you log in to Confluent Cloud, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

3
Write the cluster information into a local file

From the Confluent Cloud Console, navigate to your Kafka cluster. From the Clients view, get the connection information customized to your cluster (select Java).

Create new credentials for your Kafka cluster and Schema Registry, and then Confluent Cloud will show a configuration similar to below with your new credentials automatically populated (make sure show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='{{ CLUSTER_API_KEY }}'   password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials.

4
Download and setup the Confluent Cloud CLI

This tutorial has some steps for Kafka topic management and/or reading from or writing to Kafka topics, for which you can use the Confluent Cloud Console or install the Confluent Cloud CLI. Instructions for installing Confluent Cloud CLI and configuring it to your Confluent Cloud environment is available from within the Confluent Cloud Console: navigate to your Kafka cluster, click on the CLI and tools section, and run through the steps in the CCloud CLI tab.

5
Create the Kafka topic

Create a Kafka topic called mytopic in Confluent Cloud.

ccloud kafka topic create mytopic

This should yield the following output:

Created topic "mytopic".

6
Create the connector configuration file

You can provision the Kafka Connect Datagen connector through the Confluent Cloud Console, but in this tutorial you can choose to use the Confluent Cloud CLI or the Confluent Cloud REST API.

First, create a file called datagen-source-config.json with the below connector configuration for the Kafka Connect Datagen source connector for Confluent Cloud. Substitute <CLUSTER_API_KEY> and <CLUSTER_API_SECRET> with the credentials from the configuration/ccloud.properties file.

In this sample configuration, the connector uses the PAGEVIEWS quickstart to produce JSON records simulating website pageviews. The records will be formatted with a schema specification called PAGEVIEWS to a Kafka topic called mytopic. For a full explanation of all connector configuration parameters, see documentation.

{
    "name" : "datagen_ccloud_01",
    "connector.class": "DatagenSource",
    "kafka.api.key": "<CLUSTER_API_KEY>",
    "kafka.api.secret" : "<CLUSTER_API_SECRET>",
    "kafka.topic" : "mytopic",
    "output.data.format" : "JSON",
    "quickstart" : "PAGEVIEWS",
    "tasks.max" : "1"
}

7
Provision the connector in Confluent Cloud

You can choose one of two options for provisioning a fully managed Kafka Connect Datagen source connector in Confluent Cloud. Either option will use the connector configuration file datagen-source-config.json that you created in the previous step.

Option 1. You can use the Confluent Cloud CLI which provides the ccloud connector create command allowing you to pass in the configuration file from the previous step.

ccloud connector create --config datagen-source-config.json

Option 2. The Confluent Cloud REST API can provision the connector using the configuration file you created from the previous step. This API requires we provide a Confluent Cloud resource ID for both the environment and Kafka cluster we wish to deploy the connector to. These values can be obtained by using the Confluent Cloud Console or using the ccloud kafka cluster describe command.

Additionally, we must provide an API Key and Secret which authorizes us to control our cloud account. This API key is independent of the one you use to connect to Kafka or the Schema Registry, so we need to generate it before the HTTP command will succeed.

ccloud api-key create --resource cloud -o json > cloud-api-key.json

The cloud-api-key.json file now contains an API key and secret authorized to control your cloud account. Protect this file as you would any secret value.

You will also need to set:

  • Confluent Cloud environment id into the configuration parameter ENVIRONMENT (use the command ccloud environment list to view the active environment).

  • Kafka cluster id into the configuration parameter CLUSTER (use the command ccloud kafka cluster list to view the active cluster).

Run the following curl command to provision the connector. The command will read the API key and secret from the cloud-api-key.json file (using the jq dev tool) and PUT the new connector config to the REST API in the appropriate environment and Kafka cluster.

curl -XPUT -H 'Content-Type: application/json' --data "@datagen-source-config.json" --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/config

8
Verify the status of the connector in Confluent Cloud

To check the status of the connector from the command line, you have the same two options as provisioning.

Option 1. Using the ccloud CLI.

ccloud connector list

Rerun this command to get the updated Status, it will change from PROVISIONING to RUNNING when the Connector is ready.

     ID     |       Name        | Status  |  Type  | Trace
+-----------+-------------------+---------+--------+-------+
  lcc-6g1p6 | datagen_ccloud_01 | RUNNING | source |

Option 2. The Confluent Cloud REST API provides a connector_name/status endpoint you can use to verify the status of a provisioned connector. Note the connector.state field in the returned JSON.

As described in Step 7 above, an API Key is required for all REST API calls to succeed.

curl -s -XGET -H 'Content-Type: application/json' --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/status | jq
{
  "name": "datagen_ccloud_01",
  "connector": {
    "state": "RUNNING",
    "worker_id": "datagen_ccloud_01",
    "trace": ""
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "datagen_ccloud_01",
      "msg": ""
    }
  ],
  "type": "source"
}

9
Consume events from the Kafka topic

Now that the Kafka Connect Datagen is running in Confluent Cloud, it is producing messages to your Kafka topic. View the messages being produced to the Kafka topic in Confluent Cloud.

There are many ways to do this, including the Confluent Cloud Console, but for this tutorial we will show you how to it with the Confluent Cloud CLI.

ccloud kafka topic consume mytopic --print-key

After the consumer starts, you should see the following output in a few seconds:

2871	{"viewtime":2871,"userid":"User_6","pageid":"Page_34"}
2881	{"viewtime":2881,"userid":"User_3","pageid":"Page_16"}
2901	{"viewtime":2901,"userid":"User_2","pageid":"Page_44"}
2961	{"viewtime":2961,"userid":"User_7","pageid":"Page_97"}
2971	{"viewtime":2971,"userid":"User_1","pageid":"Page_54"}
3151	{"viewtime":3151,"userid":"User_3","pageid":"Page_21"}
3171	{"viewtime":3171,"userid":"User_5","pageid":"Page_65"}
3271	{"viewtime":3271,"userid":"User_3","pageid":"Page_85"}
3361	{"viewtime":3361,"userid":"User_9","pageid":"Page_41"}
3421	{"viewtime":3421,"userid":"User_3","pageid":"Page_60"}
3431	{"viewtime":3431,"userid":"User_7","pageid":"Page_57"}
3501	{"viewtime":3501,"userid":"User_3","pageid":"Page_52"}

When you are done, type Ctrl+C.

10
Delete the connector

Because fully-managed connectors in Confluent Cloud may be billed hourly, it’s a good idea to delete the connector when you are done with this tutorial.

Run the following Confluent Cloud CLI command to list the provisioned connectors. Find the datagen connector’s id.

ccloud connector list

Delete the connector. Verify the deletion in Confluent Cloud Console.

ccloud connector delete <connector id>