How do I use the ccloud CLI to produce and consume records from a Confluent Cloud topic
In this tutorial you learn how to use the ccloud CLI to produce and consume records from a Kafka topic in Confluent Cloud. This can help you build and debug your Confluent Cloud based event streaming applications.
To get started, make a new directory anywhere you’d like for this project:
mkdir ccloud-produce-consume && cd ccloud-produce-consume
If you don’t have an account yet, sign up for Confluent Cloud.
Use the promo code CC100KTS
to receive an additional $100 free usage on Confluent Cloud (details).
Install the Confluent Cloud CLI and login with the command ccloud login --save
, using your Confluent Cloud username and password.
The --save
argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc
file.
We recommend you run this tutorial in a new Confluent Cloud environment so it doesn’t interfere with your other work, and the easiest way to do this is to use the ccloud-stack
utility.
The ccloud-stack
utility provisions a new Confluent Cloud stack with a new environment, a new service account, a new Kafka cluster, and associated credentials. It also enables Schema Registry and credentials, wildcard ACLs for the service account, and a local configuration file with all the above connection information.
For more information on ccloud-stack
, read the documentation.
Get the open source library ccloud_library.sh which has functions to interact with Confluent Cloud, including ccloud-stack
.
wget -O ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
source ./ccloud_library.sh
Create your stack of Confluent Cloud resources by running the following command.
Set CLUSTER_CLOUD
and CLUSTER_REGION
as needed (defaults are shown below).
To avoid unexpected charges, carefully evaluate the cost of resources before launching the script and ensure all resources are destroyed after you are done running the tutorial. |
CLUSTER_CLOUD=aws
CLUSTER_REGION=us-west-2
ccloud::create_ccloud_stack
After running the ccloud::create_ccloud_stack
function, you should see output similar to the following:
Creating Confluent Cloud stack for service account demo-app-1234, ID: 12345.
Set Kafka cluster "lkc-xyz123" as the active cluster for environment "env-abc123".
Waiting up to 720 seconds for Confluent Cloud cluster to be ready and for credentials to propagate
.
Sleeping an additional 80 seconds to ensure propagation of all metadata
Set API Key "ABC123ABC123" as the active API key for "lkc-123abc".
Client configuration file saved to: stack-configs/java-service-account-12345.config
View the local configuration file that was created after you provisioned a new ccloud-stack
, where *
in this case is the new service account id:
cat stack-configs/java-service-account-*.config
Your output should resemble:
# ENVIRONMENT ID: <ENVIRONMENT ID>
# SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
# KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
# SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
# ------------------------------
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=<BROKER ENDPOINT>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<API KEY>" password="<API SECRET>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
schema.registry.url=https://<SR ENDPOINT>
Note the credentials for the Schema Registry (<SR API KEY>
and <SR API SECRET>
), you will need them shortly.
Verify that your ccloud
CLI is configured to your new ccloud-stack
Kafka cluster.
ccloud kafka cluster list
This should result in the output similar to this, with your specific cluster Id and Name:
Id | Name | Type | Provider | Region | Availability | Status
+-------------+---------------------------+-------+----------+-----------+--------------+--------+
* lkc-mpm07 | demo-kafka-cluster-114344 | BASIC | aws | us-west-2 | single-zone | UP
Create a Kafka topic called order-detail
in Confluent Cloud.
ccloud kafka topic create order-detail
This should yield the following output:
Created topic "order-detail".
We are going to use the Confluent Cloud managed Schema Registry to control our record format. The first step is creating a schema definition which we will use when producing new records.
Create the following order-detail-schema.json
file:
{
"type": "record",
"namespace": "io.confluent.tutorial",
"name": "OrderDetail",
"fields": [
{"name": "number", "type": "long", "doc": "The order number."},
{"name": "date", "type": "long", "logicalType": "date", "doc": "The date the order was submitted."},
{"name": "shipping_address", "type": "string", "doc": "The shipping address."},
{"name": "subtotal", "type": "double", "doc": "The amount without shipping cost and tax."},
{"name": "shipping_cost", "type": "double", "doc": "The shipping cost."},
{"name": "tax", "type": "double", "doc": "The applicable tax."},
{"name": "grand_total", "type": "double", "doc": "The order grand total ."}
]
}
Next, let’s open up a consumer to read records from the new topic.
From the same terminal you used to create the topic above, run the following command to start a console consumer with the ccloud
CLI:
ccloud kafka topic consume order-detail --value-format avro
The ccloud CLI will prompt you for your Confluent Cloud Schema Registry API Key and Secret. Enter the values noted from the newly provisioned ccloud-stack
above.
Once the Schema Registry values are entered, the consumer will start up and block waiting for records, you won’t see any output until after the next step.
Now we are going to produce records to our new topic using the schema created a few steps back. Open a second terminal window and start the producer:
ccloud kafka topic produce order-detail --value-format avro --schema order-detail-schema.json
The producer will start with some information and then wait for you to enter input.
Successfully registered schema with ID 100001
Starting Kafka Producer. ^C or ^D to exit
Below are example records in JSON format with each line representing a single record. In this case we are producing records in Avro format, however, first they are passed to the producer in JSON and the producer converts them to Avro based on the order-detail-schema.json
schema prior to sending them to Kafka.
Copy each line and paste it into the producer terminal, pressing enter after each one to produce the new record.
{"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.00,"tax":10.00,"grand_total":120.00,"shipping_cost":0.00}
{"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.00,"tax":0.53,"grand_total":6.53,"shipping_cost":1.00}
{"number":3,"date":18502,"shipping_address":"5014 Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"tax":9.34,"grand_total":102.79,"shipping_cost":0.00}
{"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.00,"tax":1.00,"grand_total":51.00,"shipping_cost":0.00}
{"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.00,"tax":3.33,"grand_total":38.33,"shipping_cost":2.00}
As you produce records you can observe them in the consumer terminal.
When you are done producing and consuming records, press Ctrl-c
in the producer and consumer terminals to stop them.
Because your Confluent Cloud cluster is using real cloud resources and is billable, delete the connector and clean up your Confluent Cloud environment when you complete this tutorial.
You can use Confluent Cloud CLI or Confluent UI, but for this tutorial you can use the ccloud_library.sh
library again.
Pass in the SERVICE_ACCOUNT_ID
that was generated when the ccloud-stack
was created.
ccloud::destroy_ccloud_stack $SERVICE_ACCOUNT_ID