How to generate mock data to a Kafka topic in Confluent Cloud using the fully-managed Kafka Connect Datagen

Question:

How can I produce mock data to Kafka topics in Confluent Cloud to test my Kafka applications?

Edit this page

Example use case:

You will run a fully-managed instance of the Kafka Connect Datagen connector to produce mock data to a Kafka topic in Confluent Cloud. This helps you test your applications in the cloud.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir kafka-connect-datagen-ccloud && cd kafka-connect-datagen-ccloud

2
Get Confluent Cloud and the CLI

If you don’t have an account yet, sign up for Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

Install the Confluent Cloud CLI and login with the command ccloud login --save, using your Confluent Cloud username and password. The --save argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc file.

3
Provision a new ccloud-stack on Confluent Cloud

We recommend you run this tutorial in a new Confluent Cloud environment so it doesn’t interfere with your other work, and the easiest way to do this is to use the ccloud-stack utility. This provisions a new Confluent Cloud stack with a new environment, a new service account, a new Kafka cluster and associated credentials, enables Schema Registry and associated credentials, ACLs with wildcard for the service account, and a local configuration file with all above connection information. For more information on ccloud-stack, read the documentation.

Get the open source library ccloud_library.sh which has functions to interact with Confluent Cloud, including ccloud-stack.

wget -O ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
source ./ccloud_library.sh

Create your stack of Confluent Cloud resources by running the following command. Set CLUSTER_CLOUD and CLUSTER_REGION as needed (defaults are shown below).

To avoid unexpected charges, carefully evaluate the cost of resources before launching the script and ensure all resources are destroyed after you are done running the tutorial.
CLUSTER_CLOUD=aws
CLUSTER_REGION=us-west-2
ccloud::create_ccloud_stack

4
View cluster connection info

View the local configuration file that was created after you provisioned a new ccloud-stack, where * in this case is the new service account id:

cat stack-configs/java-service-account-*.config

Your output should resemble:

# ENVIRONMENT ID: <ENVIRONMENT ID>
# SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
# KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
# SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
# ------------------------------
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=<BROKER ENDPOINT>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<API KEY>" password="<API SECRET>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
schema.registry.url=https://<SR ENDPOINT>

Notice the service account id <SERVICE ACCOUNT ID> and its credentials for the Kafka cluster <API KEY> and <API SECRET>.

5
Create the Kafka topic

Create a Kafka topic called mytopic in Confluent Cloud.

ccloud kafka topic create mytopic

This should yield the following output:

Created topic "mytopic".

6
Create the connector configuration file

You can provision the Kafka Connect Datagen connector through the Confluent Cloud UI, but this tutorial uses the Confluent Cloud CLI.

First, create a file called datagen-source-config.json with the below connector configuration for the Kafka Connect Datagen source connector for Confluent Cloud. Substitute <API KEY> and <API SECRET> with the credentials created by ccloud-stack.

In this sample configuration, the connector uses the PAGEVIEWS quickstart to produce JSON records simulating website pageviews. The records will be formatted with a schema specification called PAGEVIEWS to a Kafka topic called mytopic. For a full explanation of all connector configuration parameters, see documentation.

{
    "name" : "datagen_ccloud_01",
    "connector.class": "DatagenSource",
    "kafka.api.key": "<API KEY>",
    "kafka.api.secret" : "<API SECRET>",
    "kafka.topic" : "mytopic",
    "output.data.format" : "JSON",
    "quickstart" : "PAGEVIEWS",
    "tasks.max" : "1"
}

7
Provision the connector in Confluent Cloud

Provision the Kafka Connect Datagen source connector with the following command. Notice that it references the connector configuration file datagen-source-config.json that you created in the previous step.

ccloud connector create --config datagen-source-config.json

Check the status of the connector:

ccloud connector list

After a minute or two, rerun this command to get the updated Status, it will change from PROVISIONING to RUNNING when the Connector is ready.

     ID     |       Name        | Status  |  Type  | Trace
+-----------+-------------------+---------+--------+-------+
  lcc-6g1p6 | datagen_ccloud_01 | RUNNING | source |

8
Consume events from the Kafka topic

Now that the Kafka Connect Datagen is running in Confluent Cloud, it is producing messages to your Kafka topic. View the messages being produced to the Kafka topic in Confluent Cloud.

There are many ways to do this, including the Confluent Cloud UI, but for this tutorial we will show you how to it with the Confluent Cloud CLI.

ccloud kafka topic consume mytopic --print-key

After the consumer starts, you should see the following output in a few seconds:

2871	{"viewtime":2871,"userid":"User_6","pageid":"Page_34"}
2881	{"viewtime":2881,"userid":"User_3","pageid":"Page_16"}
2901	{"viewtime":2901,"userid":"User_2","pageid":"Page_44"}
2961	{"viewtime":2961,"userid":"User_7","pageid":"Page_97"}
2971	{"viewtime":2971,"userid":"User_1","pageid":"Page_54"}
3151	{"viewtime":3151,"userid":"User_3","pageid":"Page_21"}
3171	{"viewtime":3171,"userid":"User_5","pageid":"Page_65"}
3271	{"viewtime":3271,"userid":"User_3","pageid":"Page_85"}
3361	{"viewtime":3361,"userid":"User_9","pageid":"Page_41"}
3421	{"viewtime":3421,"userid":"User_3","pageid":"Page_60"}
3431	{"viewtime":3431,"userid":"User_7","pageid":"Page_57"}
3501	{"viewtime":3501,"userid":"User_3","pageid":"Page_52"}

When you are done, type <ctrl-c>.

9
Cleanup your environment

Because your Confluent Cloud cluster is using real cloud resources and is billable, delete the connector and clean up your Confluent Cloud environment when you complete this tutorial. You can use Confluent Cloud CLI or Confluent UI, but for this tutorial you can use the ccloud_library.sh library again. Pass in the SERVICE_ACCOUNT_ID that was generated when the ccloud-stack was created.

ccloud::destroy_ccloud_stack $SERVICE_ACCOUNT_ID