How can I produce mock data to Kafka topics in Confluent Cloud to test my Kafka applications?
You will run a fully-managed instance of the Kafka Connect Datagen connector to produce mock data to a Kafka topic in Confluent Cloud. This helps you test your applications in the cloud.
To get started, make a new directory anywhere you’d like for this project:
mkdir kafka-connect-datagen-ccloud && cd kafka-connect-datagen-ccloud
If you don’t have an account yet, sign up for Confluent Cloud.
Use the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details).
Install the Confluent Cloud CLI and login with the command ccloud login --save
, using your Confluent Cloud username and password.
The --save
argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc
file.
We recommend you run this tutorial in a new Confluent Cloud environment so it doesn’t interfere with your other work, and the easiest way to do this is to use the ccloud-stack
utility.
This provisions a new Confluent Cloud stack with a new environment, a new service account, a new Kafka cluster and associated credentials, enables Schema Registry and associated credentials, ACLs with wildcard for the service account, and a local configuration file with all above connection information.
For more information on ccloud-stack
, see the documentation.
Get the open source library ccloud_library.sh which has functions to interact with Confluent Cloud, including ccloud-stack
.
wget -O ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
source ./ccloud_library.sh
Create your stack of Confluent Cloud resources by running the following command.
Set CLUSTER_CLOUD
and CLUSTER_REGION
as needed (defaults are shown below).
To avoid unexpected charges, carefully evaluate the cost of resources before launching the script and ensure all resources are destroyed after you are done running the tutorial. |
CLUSTER_CLOUD=aws
CLUSTER_REGION=us-west-2
ccloud::create_ccloud_stack
If you need to re-run the ccloud::create_ccloud_stack command, you’ll need to open a new terminal window and source the cloud_library.sh script again
|
View the local configuration file that was created after you provisioned a new ccloud-stack
, where *
in this case is the new service account id:
cat stack-configs/java-service-account-*.config
Your output should resemble:
# ENVIRONMENT ID: <ENVIRONMENT ID>
# SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
# KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
# SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
# ------------------------------
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=<BROKER ENDPOINT>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<API KEY>" password="<API SECRET>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
schema.registry.url=https://<SR ENDPOINT>
Notice the service account id <SERVICE ACCOUNT ID>
and its credentials for the Kafka cluster <API KEY>
and <API SECRET>
.
Create a Kafka topic called mytopic
in Confluent Cloud.
ccloud kafka topic create mytopic
This should yield the following output:
Created topic "mytopic".
You can provision the Kafka Connect Datagen connector through the Confluent Cloud UI, but in this tutorial you can choose to use the Confluent Cloud CLI or the Confluent Cloud REST API.
First, create a file called datagen-source-config.json
with the below connector configuration for the Kafka Connect Datagen source connector for Confluent Cloud.
Substitute <API KEY>
and <API SECRET>
with the credentials created by ccloud-stack
.
In this sample configuration, the connector uses the PAGEVIEWS quickstart to produce JSON records simulating website pageviews. The records will be formatted with a schema specification called PAGEVIEWS to a Kafka topic called mytopic
.
For a full explanation of all connector configuration parameters, see documentation.
{
"name" : "datagen_ccloud_01",
"connector.class": "DatagenSource",
"kafka.api.key": "<API KEY>",
"kafka.api.secret" : "<API SECRET>",
"kafka.topic" : "mytopic",
"output.data.format" : "JSON",
"quickstart" : "PAGEVIEWS",
"tasks.max" : "1"
}
You can choose one of two options for provisioning a fully managed Kafka Connect Datagen source connector in Confluent Cloud. Either option will use the connector configuration file datagen-source-config.json
that you created in the previous step.
Option 1. You can use the Confluent Cloud CLI which provides the ccloud connector create
command allowing you to pass in the configuration file from the previous step. The ccloud
CLI is already logged in the ccloud_library
configured it to use a newly created environment and Kafka cluster.
ccloud connector create --config datagen-source-config.json
Option 2. The Confluent Cloud REST API can provision the connector using the configuration file you created from the previous step. This API requires we provide a Confluent Cloud resource ID for both the environment and Kafka cluster we wish to deploy the connector to. These values can be obtained by using the Confluent Cloud UI or using the ccloud kafka cluster describe
command. When using ccloud_library.sh
, as we did above, the script sets two environment variables (ENVIRONMENT
and CLUSTER
) with these values.
Additionally, we must provide an API Key and Secret which authorizes us to control our cloud account. This API key is independent of the one you use to connect to Kafka or the Schema Registry, so we need to generate it before the HTTP command will succeed.
ccloud api-key create --resource cloud -o json > cloud-api-key.json
The cloud-api-key.json
file now contains an API key and secret authorized to control your cloud account. Protect this file as you would any secret value.
The following curl
command will read the API key and secret from the cloud-api-key.json
file (using the jq dev tool) and PUT
the new connector config to the REST API in the appropriate environment and Kafka cluster.
curl -XPUT -H 'Content-Type: application/json' --data "@datagen-source-config.json" --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/config
To check the status of the connector from the command line, you have the same two options as provisioning.
Option 1. Using the ccloud
CLI.
ccloud connector list
Rerun this command to get the updated Status, it will change from PROVISIONING
to RUNNING
when the Connector is ready.
ID | Name | Status | Type | Trace
+-----------+-------------------+---------+--------+-------+
lcc-6g1p6 | datagen_ccloud_01 | RUNNING | source |
Option 2. The Confluent Cloud REST API provides a connector_name/status
endpoint you can use to verify the status of a provisioned connector. Note the connector.state
field in the returned JSON.
As described in Step 7 above, an API Key is required for all REST API calls to succeed.
curl -s -XGET -H 'Content-Type: application/json' --user $(cat cloud-api-key.json | jq -r '.key'):$(cat cloud-api-key.json | jq -r '.secret') https://api.confluent.cloud/connect/v1/environments/$ENVIRONMENT/clusters/$CLUSTER/connectors/datagen_ccloud_01/status | jq
{
"name": "datagen_ccloud_01",
"connector": {
"state": "RUNNING",
"worker_id": "datagen_ccloud_01",
"trace": ""
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "datagen_ccloud_01",
"msg": ""
}
],
"type": "source"
}
Now that the Kafka Connect Datagen is running in Confluent Cloud, it is producing messages to your Kafka topic. View the messages being produced to the Kafka topic in Confluent Cloud.
There are many ways to do this, including the Confluent Cloud UI, but for this tutorial we will show you how to it with the Confluent Cloud CLI.
ccloud kafka topic consume mytopic --print-key
After the consumer starts, you should see the following output in a few seconds:
2871 {"viewtime":2871,"userid":"User_6","pageid":"Page_34"}
2881 {"viewtime":2881,"userid":"User_3","pageid":"Page_16"}
2901 {"viewtime":2901,"userid":"User_2","pageid":"Page_44"}
2961 {"viewtime":2961,"userid":"User_7","pageid":"Page_97"}
2971 {"viewtime":2971,"userid":"User_1","pageid":"Page_54"}
3151 {"viewtime":3151,"userid":"User_3","pageid":"Page_21"}
3171 {"viewtime":3171,"userid":"User_5","pageid":"Page_65"}
3271 {"viewtime":3271,"userid":"User_3","pageid":"Page_85"}
3361 {"viewtime":3361,"userid":"User_9","pageid":"Page_41"}
3421 {"viewtime":3421,"userid":"User_3","pageid":"Page_60"}
3431 {"viewtime":3431,"userid":"User_7","pageid":"Page_57"}
3501 {"viewtime":3501,"userid":"User_3","pageid":"Page_52"}
When you are done, type Ctrl+C
.
Because your Confluent Cloud cluster is using real cloud resources and is billable, delete the connector and clean up your Confluent Cloud environment when you complete this tutorial.
You can use Confluent Cloud CLI or Confluent UI, but for this tutorial you can use the ccloud_library.sh
library again.
Pass in the SERVICE_ACCOUNT_ID
that was generated when the ccloud-stack
was created.
ccloud::destroy_ccloud_stack $SERVICE_ACCOUNT_ID