How to generate mock data to a Kafka topic in Confluent Cloud using the fully-managed Kafka Connect Datagen

Question:

How can I produce mock data to Kafka topics in Confluent Cloud to test my Kafka applications?

Edit this page

Example use case:

You will run a fully-managed instance of the Kafka Connect Datagen connector to produce mock data to a Kafka topic in Confluent Cloud. This helps you test your applications in the cloud.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir kafka-connect-datagen-ccloud && cd kafka-connect-datagen-ccloud

2
Get Confluent Cloud and the CLI

If you don’t have an account yet, sign up for Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

Install the Confluent Cloud CLI and login with the command ccloud login --save, using your Confluent Cloud username and password. The --save argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc file.

3
Provision a new ccloud-stack on Confluent Cloud

We recommend you run this tutorial in a new Confluent Cloud environment so it doesn’t interfere with your other work, and the easiest way to do this is to use the ccloud-stack utility. This provisions a new Confluent Cloud stack with a new environment, a new service account, a new Kafka cluster and associated credentials, enables Schema Registry and associated credentials, ACLs with wildcard for the service account, and a local configuration file with all above connection information. For more information on ccloud-stack, see the documentation.

Get the open source library ccloud_library.sh which has functions to interact with Confluent Cloud, including ccloud-stack.

wget -O ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
source ./ccloud_library.sh

Create your stack of Confluent Cloud resources by running the following command. Set CLUSTER_CLOUD and CLUSTER_REGION as needed (defaults are shown below).

To avoid unexpected charges, carefully evaluate the cost of resources before launching the script and ensure all resources are destroyed after you are done running the tutorial.
CLUSTER_CLOUD=aws
CLUSTER_REGION=us-west-2
ccloud::create_ccloud_stack
If you need to re-run the ccloud::create_ccloud_stack command, you’ll need to open a new terminal window and source the cloud_library.sh script again

4
View cluster connection info

View the local configuration file that was created after you provisioned a new ccloud-stack, where * in this case is the new service account id:

cat stack-configs/java-service-account-*.config

Your output should resemble:

# ENVIRONMENT ID: <ENVIRONMENT ID>
# SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
# KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
# SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
# ------------------------------
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=<BROKER ENDPOINT>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<API KEY>" password="<API SECRET>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
schema.registry.url=https://<SR ENDPOINT>

Notice the service account id <SERVICE ACCOUNT ID> and its credentials for the Kafka cluster <API KEY> and <API SECRET>.

5
Create the Kafka topic

Create a Kafka topic called mytopic in Confluent Cloud.

ccloud kafka topic create mytopic

This should yield the following output:

Created topic "mytopic".

6
Create the connector configuration file

You can provision the Kafka Connect Datagen connector through the Confluent Cloud UI, but in this tutorial you can choose to use the Confluent Cloud CLI or the Confluent Cloud REST API.

First, create a file called datagen-source-config.json with the below connector configuration for the Kafka Connect Datagen source connector for Confluent Cloud. Substitute <API KEY> and <API SECRET> with the credentials created by ccloud-stack.

In this sample configuration, the connector uses the PAGEVIEWS quickstart to produce JSON records simulating website pageviews. The records will be formatted with a schema specification called PAGEVIEWS to a Kafka topic called mytopic. For a full explanation of all connector configuration parameters, see documentation.

{
    "name" : "datagen_ccloud_01",
    "connector.class": "DatagenSource",
    "kafka.api.key": "<API KEY>",
    "kafka.api.secret" : "<API SECRET>",
    "kafka.topic" : "mytopic",
    "output.data.format" : "JSON",
    "quickstart" : "PAGEVIEWS",
    "tasks.max" : "1"
}

7
Provision the connector in Confluent Cloud

You can choose one of two options for provisioning a fully managed Kafka Connect Datagen source connector in Confluent Cloud. Either option will use the connector configuration file datagen-source-config.json that you created in the previous step.

Option 1. You can use the Confluent Cloud CLI which provides the ccloud connector create command allowing you to pass in the configuration file from the previous step. The ccloud CLI is already logged in the ccloud_library configured it to use a newly created environment and Kafka cluster.

ccloud connector create --config datagen-source-config.json

Option 2. The Confluent Cloud REST API can provision the connector using the configuration file you created from the previous step. This API requires we provide a Confluent Cloud resource ID for both the environment and Kafka cluster we wish to deploy the connector to. These values can be obtained by using the Confluent Cloud UI or using the ccloud kafka cluster describe command. When using ccloud_library.sh, as we did above, the script sets two environment variables (ENVIRONMENT and CLUSTER) with these values.

Additionally, we must provide an API Key and Secret which authorizes us to control our cloud account. This API key is independent of the one you use to connect to Kafka or the Schema Registry, so we need to generate it before the HTTP command will succeed.

ccloud api-key create --resource cloud -o json > cloud-api-key.json

The cloud-api-key.json file now contains an API key and secret authorized to control your cloud account. Protect this file as you would any secret value.

The following curl command will read the API key and secret from the cloud-api-key.json file (using the jq dev tool) and PUT the new connector config to the REST API in the appropriate environment and Kafka cluster.

curl -XPUT -H 'Content-Type: application/json' --data "@datagen-source-config.json" --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/config

8
Verify the status of the connector in Confluent Cloud

To check the status of the connector from the command line, you have the same two options as provisioning.

Option 1. Using the ccloud CLI.

ccloud connector list

Rerun this command to get the updated Status, it will change from PROVISIONING to RUNNING when the Connector is ready.

     ID     |       Name        | Status  |  Type  | Trace
+-----------+-------------------+---------+--------+-------+
  lcc-6g1p6 | datagen_ccloud_01 | RUNNING | source |

Option 2. The Confluent Cloud REST API provides a connector_name/status endpoint you can use to verify the status of a provisioned connector. Note the connector.state field in the returned JSON.

As described in Step 7 above, an API Key is required for all REST API calls to succeed.

curl -s -XGET -H 'Content-Type: application/json' --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/status | jq
{
  "name": "datagen_ccloud_01",
  "connector": {
    "state": "RUNNING",
    "worker_id": "datagen_ccloud_01",
    "trace": ""
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "datagen_ccloud_01",
      "msg": ""
    }
  ],
  "type": "source"
}

9
Consume events from the Kafka topic

Now that the Kafka Connect Datagen is running in Confluent Cloud, it is producing messages to your Kafka topic. View the messages being produced to the Kafka topic in Confluent Cloud.

There are many ways to do this, including the Confluent Cloud UI, but for this tutorial we will show you how to it with the Confluent Cloud CLI.

ccloud kafka topic consume mytopic --print-key

After the consumer starts, you should see the following output in a few seconds:

2871	{"viewtime":2871,"userid":"User_6","pageid":"Page_34"}
2881	{"viewtime":2881,"userid":"User_3","pageid":"Page_16"}
2901	{"viewtime":2901,"userid":"User_2","pageid":"Page_44"}
2961	{"viewtime":2961,"userid":"User_7","pageid":"Page_97"}
2971	{"viewtime":2971,"userid":"User_1","pageid":"Page_54"}
3151	{"viewtime":3151,"userid":"User_3","pageid":"Page_21"}
3171	{"viewtime":3171,"userid":"User_5","pageid":"Page_65"}
3271	{"viewtime":3271,"userid":"User_3","pageid":"Page_85"}
3361	{"viewtime":3361,"userid":"User_9","pageid":"Page_41"}
3421	{"viewtime":3421,"userid":"User_3","pageid":"Page_60"}
3431	{"viewtime":3431,"userid":"User_7","pageid":"Page_57"}
3501	{"viewtime":3501,"userid":"User_3","pageid":"Page_52"}

When you are done, type Ctrl+C.

10
Cleanup your environment

Because your Confluent Cloud cluster is using real cloud resources and is billable, delete the connector and clean up your Confluent Cloud environment when you complete this tutorial. You can use Confluent Cloud CLI or Confluent UI, but for this tutorial you can use the ccloud_library.sh library again. Pass in the SERVICE_ACCOUNT_ID that was generated when the ccloud-stack was created.

ccloud::destroy_ccloud_stack $SERVICE_ACCOUNT_ID

See the Confluent Cloud CLI and REST API documentation for full details on operating managed connectors.