How to produce and consume records from Confluent Cloud using ccloud CLI

Question:

How do I use the ccloud CLI to produce and consume records from a Confluent Cloud topic

Edit this page

Example use case:

In this tutorial you learn how to use the ccloud CLI to produce and consume records from a Kafka topic in Confluent Cloud. This can help you build and debug your Confluent Cloud based event streaming applications.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir ccloud-produce-consume && cd ccloud-produce-consume

2
Get Confluent Cloud and the CLI

If you don’t have an account yet, sign up for Confluent Cloud. Use the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

Install the Confluent Cloud CLI and login with the command ccloud login --save, using your Confluent Cloud username and password. The --save argument saves your Confluent Cloud user login credentials or refresh token (in the case of SSO) to the local netrc file.

3
Provision a new ccloud-stack on Confluent Cloud

We recommend you run this tutorial in a new Confluent Cloud environment so it doesn’t interfere with your other work, and the easiest way to do this is to use the ccloud-stack utility. The ccloud-stack utility provisions a new Confluent Cloud stack with a new environment, a new service account, a new Kafka cluster, and associated credentials. It also enables Schema Registry and credentials, wildcard ACLs for the service account, and a local configuration file with all the above connection information. For more information on ccloud-stack, read the documentation.

Get the open source library ccloud_library.sh which has functions to interact with Confluent Cloud, including ccloud-stack.

wget -O ccloud_library.sh https://raw.githubusercontent.com/confluentinc/examples/latest/utils/ccloud_library.sh
source ./ccloud_library.sh

Create your stack of Confluent Cloud resources by running the following command. Set CLUSTER_CLOUD and CLUSTER_REGION as needed (defaults are shown below).

To avoid unexpected charges, carefully evaluate the cost of resources before launching the script and ensure all resources are destroyed after you are done running the tutorial.
CLUSTER_CLOUD=aws
CLUSTER_REGION=us-west-2
ccloud::create_ccloud_stack false

The false passed to the command indicates we do not want to create a ksqlDB application which we do not need for this tutorial.

After running the ccloud::create_ccloud_stack function, you should output similar to the following:

Creating Confluent Cloud stack for service account demo-app-1234, ID: 12345.
Set Kafka cluster "lkc-xyz123" as the active cluster for environment "env-abc123".

Waiting up to 720 seconds for Confluent Cloud cluster to be ready and for credentials to propagate
.
Sleeping an additional 80 seconds to ensure propagation of all metadata
Set API Key "ABC123ABC123" as the active API key for "lkc-123abc".

Client configuration file saved to: stack-configs/java-service-account-12345.config

4
View cluster connection info

View the local configuration file that was created after you provisioned a new ccloud-stack, where * in this case is the new service account id:

cat stack-configs/java-service-account-*.config

Your output should resemble:

# ENVIRONMENT ID: <ENVIRONMENT ID>
# SERVICE ACCOUNT ID: <SERVICE ACCOUNT ID>
# KAFKA CLUSTER ID: <KAFKA CLUSTER ID>
# SCHEMA REGISTRY CLUSTER ID: <SCHEMA REGISTRY CLUSTER ID>
# ------------------------------
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
bootstrap.servers=<BROKER ENDPOINT>
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<API KEY>" password="<API SECRET>";
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=<SR API KEY>:<SR API SECRET>
schema.registry.url=https://<SR ENDPOINT>

Note the credentials for the Schema Registry (<SR API KEY> and <SR API SECRET>), you will need them shortly.

5
Create the Kafka topic

Verify that your ccloud CLI is configured to your new ccloud-stack Kafka cluster.

ccloud kafka cluster list

This should result in the output similar to this, with your specific cluster Id and Name:

      Id      |           Name            | Type  | Provider |  Region   | Availability | Status
+-------------+---------------------------+-------+----------+-----------+--------------+--------+
  * lkc-mpm07 | demo-kafka-cluster-114344 | BASIC | aws      | us-west-2 | single-zone  | UP

Create a Kafka topic called order-detail in Confluent Cloud.

ccloud kafka topic create order-detail

This should yield the following output:

Created topic "order-detail".

6
Create a schema for your records

We are going to use the Confluent Cloud managed Schema Registry to control our record format. The first step is creating a schema definition which we will use when producing new records.

Create the following order-detail-schema.json file:

{
"type": "record",
"namespace": "io.confluent.tutorial",
"name": "OrderDetail",
"fields": [
    {"name": "number", "type": "long", "doc": "The order number."},
    {"name": "date", "type": "long", "logicalType": "date", "doc": "The date the order was submitted."},
    {"name": "shipping_address", "type": "string", "doc": "The shipping address."},
    {"name": "subtotal", "type": "double", "doc": "The amount without shipping cost and tax."},
    {"name": "shipping_cost", "type": "double", "doc": "The shipping cost."},
    {"name": "tax", "type": "double", "doc": "The applicable tax."},
    {"name": "grand_total", "type": "double", "doc": "The order grand total ."}
    ]
}

7
Start a console consumer

Next, let’s open up a consumer to read records from the new topic.

From the same terminal you used to create the topic above, run the following command to start a console consumer with the ccloud CLI:

ccloud kafka topic consume order-detail --value-format avro

The ccloud CLI will prompt you for your Confluent Cloud Schema Registry API Key and Secret. Enter the values noted from the newly provisioned ccloud-stack above.

Once the Schema Registry values are entered, the consumer will start up and block waiting for records, you won’t see any output until after the next step.

8
Produce events to the Kafka topic

Now we are going to produce records to our new topic using the schema created a few steps back. Open a second terminal window and start the producer:

ccloud kafka topic produce order-detail --value-format avro --schema order-detail-schema.json

The producer will start with some information and then wait for you to enter input.

Successfully registered schema with ID 100001
Starting Kafka Producer. ^C or ^D to exit

Below are example records in JSON format with each line representing a single record. In this case we are producing records in Avro format, however, first they are passed to the producer in JSON and the producer converts them to Avro based on the order-detail-schema.json schema prior to sending them to Kafka.

Copy each line and paste it into the producer terminal, pressing enter after each one to produce the new record.

{"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.00,"tax":10.00,"grand_total":120.00,"shipping_cost":0.00}
{"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.00,"tax":0.53,"grand_total":6.53,"shipping_cost":1.00}
{"number":3,"date":18502,"shipping_address":"5014  Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"tax":9.34,"grand_total":102.79,"shipping_cost":0.00}
{"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.00,"tax":1.00,"grand_total":51.00,"shipping_cost":0.00}
{"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.00,"tax":3.33,"grand_total":38.33,"shipping_cost":2.00}

As you produce records you can observe them in the consumer terminal.

9
Cleanup your environment

When you are done producing and consuming records, press Ctrl-c in the producer and consumer terminals to stop them.

Because your Confluent Cloud cluster is using real cloud resources and is billable, delete the connector and clean up your Confluent Cloud environment when you complete this tutorial. You can use Confluent Cloud CLI or Confluent UI, but for this tutorial you can use the ccloud_library.sh library again. Pass in the SERVICE_ACCOUNT_ID that was generated when the ccloud-stack was created.

ccloud::destroy_ccloud_stack $SERVICE_ACCOUNT_ID