How to generate mock data to a Kafka topic using the Datagen Source Connector

Question:

How can you produce mock data to Kafka topics to test your Kafka applications?

Edit this page

Example use case:

In this tutorial, you will learn about testing your Kafka applications. You'll run an instance of the Kafka Connect Datagen connector to produce mock data to a Kafka cluster.

Hands-on code example:

New to Confluent Cloud? Get started here.

Run it

Initialize the project

1

To get started, make a new directory anywhere you’d like for this project:

mkdir kafka-connect-datagen && cd kafka-connect-datagen

Next, create a directory for configuration data:

mkdir configuration

Provision your Kafka cluster

2

This tutorial requires access to an Apache Kafka cluster, and the quickest way to get started free is on Confluent Cloud, which provides Kafka as a fully managed service.

Take me to Confluent Cloud
  1. After you log in to Confluent Cloud, click Environments in the lefthand navigation, click on Add cloud environment, and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  2. From the Billing & payment section in the menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  3. Click on LEARN and follow the instructions to launch a Kafka cluster and enable Schema Registry.

Confluent Cloud

Write the cluster information into a local file

3

From the Confluent Cloud Console, navigate to your Kafka cluster and then select CLI and Tools in the lefthand navigation. Click the CLI Tools header and get the connection information customized to your cluster.

Create new credentials for your Kafka cluster and Schema Registry, writing in appropriate descriptions so that the keys are easy to find and delete later. The Confluent Cloud Console will show a configuration similar to below with your new credentials automatically populated (make sure Show API keys is checked). Copy and paste it into a configuration/ccloud.properties file on your machine.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BOOTSTRAP_SERVERS }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url={{ SR_URL }}
basic.auth.credentials.source=USER_INFO
basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}
Do not directly copy and paste the above configuration. You must copy it from the Confluent Cloud Console so that it includes your Confluent Cloud information and credentials.

Download and set up the Confluent CLI

4

This tutorial has some steps for Kafka topic management and producing and consuming events, for which you can use the Confluent Cloud Console or the Confluent CLI. Follow the instructions here to install the Confluent CLI, and then follow these steps connect the CLI to your Confluent Cloud cluster.

Create the Kafka topic

5

Create a Kafka topic called mytopic in Confluent Cloud.

confluent kafka topic create mytopic

This should yield the following output:

Created topic "mytopic".

Create the connector configuration file

6

You can provision the Kafka Connect Datagen connector through the Confluent Cloud Console, but in this tutorial you can choose to use the Confluent CLI or the Confluent Cloud REST API.

First, create a file called datagen-source-config.json with the below connector configuration for the Kafka Connect Datagen source connector for Confluent Cloud. Substitute <CLUSTER_API_KEY> and <CLUSTER_API_SECRET> with the credentials from the configuration/ccloud.properties file.

In this sample configuration, the connector uses the PAGEVIEWS quickstart to produce JSON records simulating website pageviews. The records will be formatted with a schema specification called PAGEVIEWS to a Kafka topic called mytopic. For a full explanation of all connector configuration parameters, see documentation.

{
    "name" : "datagen_ccloud_01",
    "connector.class": "DatagenSource",
    "kafka.api.key": "<CLUSTER_API_KEY>",
    "kafka.api.secret" : "<CLUSTER_API_SECRET>",
    "kafka.topic" : "mytopic",
    "output.data.format" : "JSON",
    "quickstart" : "PAGEVIEWS",
    "tasks.max" : "1"
}

Provision the connector in Confluent Cloud

7

You can choose one of two options for provisioning a fully managed Kafka Connect Datagen source connector in Confluent Cloud. Either option will use the connector configuration file datagen-source-config.json that you created in the previous step.

Option 1. You can use the Confluent CLI which provides the confluent connector create command allowing you to pass in the configuration file from the previous step.

confluent connector create --config datagen-source-config.json

Option 2. The Confluent Cloud REST API can provision the connector using the configuration file you created from the previous step. This API requires we provide a Confluent Cloud resource ID for both the environment and Kafka cluster we wish to deploy the connector to. These values can be obtained by using the Confluent Cloud Console or using the confluent kafka cluster describe command.

Additionally, we must provide an API Key and Secret which authorizes us to control our cloud account. This API key is independent of the one you use to connect to Kafka or the Schema Registry, so we need to generate it before the HTTP command will succeed.

confluent api-key create --resource cloud -o json > cloud-api-key.json

The cloud-api-key.json file now contains an API key and secret authorized to control your cloud account. Protect this file as you would any secret value.

You will also need to set:

  • Confluent Cloud environment id into the configuration parameter ENVIRONMENT (use the command confluent environment list to view the active environment).

  • Kafka cluster id into the configuration parameter CLUSTER (use the command confluent kafka cluster list to view the active cluster).

Run the following curl command to provision the connector. The command will read the API key and secret from the cloud-api-key.json file (using the jq dev tool) and PUT the new connector config to the REST API in the appropriate environment and Kafka cluster.

curl -XPUT -H 'Content-Type: application/json' --data "@datagen-source-config.json" --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/config

Verify the status of the connector in Confluent Cloud

8

To check the status of the connector from the command line, you have the same two options as provisioning.

Option 1. Using the ccloud CLI.

confluent connector list

Rerun this command to get the updated Status, it will change from PROVISIONING to RUNNING when the Connector is ready.

     ID     |       Name        | Status  |  Type  | Trace
+-----------+-------------------+---------+--------+-------+
  lcc-6g1p6 | datagen_ccloud_01 | RUNNING | source |

Option 2. The Confluent Cloud REST API provides a connector_name/status endpoint you can use to verify the status of a provisioned connector. Note the connector.state field in the returned JSON.

As described in Step 7 above, an API Key is required for all REST API calls to succeed.

curl -s -XGET -H 'Content-Type: application/json' --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/status | jq
{
  "name": "datagen_ccloud_01",
  "connector": {
    "state": "RUNNING",
    "worker_id": "datagen_ccloud_01",
    "trace": ""
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "datagen_ccloud_01",
      "msg": ""
    }
  ],
  "type": "source"
}

Consume events from the Kafka topic

9

Now that the Kafka Connect Datagen is running in Confluent Cloud, it is producing messages to your Kafka topic. View the messages being produced to the Kafka topic in Confluent Cloud.

There are many ways to do this, including the Confluent Cloud Console, but for this tutorial we will show you how to it with the Confluent CLI.

confluent kafka topic consume mytopic --print-key

After the consumer starts, you should see the following output in a few seconds:

2871	{"viewtime":2871,"userid":"User_6","pageid":"Page_34"}
2881	{"viewtime":2881,"userid":"User_3","pageid":"Page_16"}
2901	{"viewtime":2901,"userid":"User_2","pageid":"Page_44"}
2961	{"viewtime":2961,"userid":"User_7","pageid":"Page_97"}
2971	{"viewtime":2971,"userid":"User_1","pageid":"Page_54"}
3151	{"viewtime":3151,"userid":"User_3","pageid":"Page_21"}
3171	{"viewtime":3171,"userid":"User_5","pageid":"Page_65"}
3271	{"viewtime":3271,"userid":"User_3","pageid":"Page_85"}
3361	{"viewtime":3361,"userid":"User_9","pageid":"Page_41"}
3421	{"viewtime":3421,"userid":"User_3","pageid":"Page_60"}
3431	{"viewtime":3431,"userid":"User_7","pageid":"Page_57"}
3501	{"viewtime":3501,"userid":"User_3","pageid":"Page_52"}

When you are done, type Ctrl-C.

Delete the connector

10

Because fully managed connectors in Confluent Cloud may be billed hourly, it’s a good idea to delete the connector when you are done with this tutorial.

Run the following Confluent CLI command to list the provisioned connectors. Find the datagen connector’s id.

confluent connector list

Delete the connector. Verify the deletion in Confluent Cloud Console.

confluent connector delete <connector id>