Console Producer and Consumer with (de)serializers

Question:

What is the simplest way to write messages to and read messages from Kafka, using (de)serializers and Schema Registry?

Edit this page

Example use case:

You'd like to produce and consume some basic messages, using (de)serializers and Schema Registry. In this tutorial we'll show you how to produce and consume messages from the command line with no code! Unlike CLI Basics tutorial, this tutorial showcases Avro and Schema Registry


TweetShare your tutorial progress

Hands-on code example:





Short Answer

Console producer:

kafka-avro-console-producer \
  --topic orders-avro \
  --bootstrap-server broker:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --property value.schema="$(< /etc/tutorial/orders-avro-schema.json)" \
  --property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
  --property parse.key=true \
  --property key.separator=":"

Console consumer:

kafka-avro-console-consumer \
  --topic orders-avro \
  --property schema.registry.url=http://localhost:8081 \
  --bootstrap-server broker:9092 \
  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
  --property print.key=true \
  --property key.separator="-" \
  --from-beginning

Run it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir console-consumer-producer-avro && cd console-consumer-producer-avro

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.2.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:6.2.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    volumes:
      - ${PWD}/:/etc/tutorial/
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

Now launch Confluent Platform by running:

docker-compose up -d

3
Create the Kafka topic

Your first step is to create a topic to produce to and consume from. Use the following command to create the topic:

docker-compose exec broker kafka-topics --create --topic orders-avro --bootstrap-server broker:9092

4
Create a schema for your records

We are going to use Schema Registry to control our record format. The first step is creating a schema definition which we will use when producing new records.

Create the following orders-avro-schema.json file:

{
"type": "record",
"namespace": "io.confluent.tutorial",
"name": "OrderDetail",
"fields": [
    {"name": "number", "type": "long", "doc": "The order number."},
    {"name": "date", "type": "long", "logicalType": "date", "doc": "The date the order was submitted."},
    {"name": "shipping_address", "type": "string", "doc": "The shipping address."},
    {"name": "subtotal", "type": "double", "doc": "The amount without shipping cost and tax."},
    {"name": "shipping_cost", "type": "double", "doc": "The shipping cost."},
    {"name": "tax", "type": "double", "doc": "The applicable tax."},
    {"name": "grand_total", "type": "double", "doc": "The order grand total ."}
    ]
}

5
Start a console consumer

Next let’s open up a console consumer to read records sent to the topic you created in the previous step.

From the same terminal you used to create the topic above, run the following command to open a terminal on the broker container:

docker-compose exec schema-registry bash

From within the terminal on the broker container, run this command to start a console consumer:

kafka-avro-console-consumer \
  --topic orders-avro \
  --bootstrap-server broker:9092 \
  --property schema.registry.url=http://localhost:8081

The consumer will start up and block waiting for records, you won’t see any output until after the next step.

6
Produce your first records

To produce your first record into Kafka, open another terminal window and run the following command to open a second shell on the broker container:

docker-compose exec schema-registry bash

From inside the second terminal on the broker container, run the following command to start a console producer:

kafka-avro-console-producer \
  --topic orders-avro \
  --bootstrap-server broker:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --property value.schema="$(< /etc/tutorial/orders-avro-schema.json)"

The producer will start and wait for you to enter input. Each line represents one record and to send it you’ll hit the enter key. If you type multiple words and then hit enter, the entire line is considered one record.

Try typing one line at a time, hit enter and go back to the console consumer window and look for the output. Or, you can select all the records and send at one time.

{"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.00,"tax":10.00,"grand_total":120.00,"shipping_cost":0.00}
{"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.00,"tax":0.53,"grand_total":6.53,"shipping_cost":1.00}
{"number":3,"date":18502,"shipping_address":"5014  Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"tax":9.34,"grand_total":102.79,"shipping_cost":0.00}
{"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.00,"tax":1.00,"grand_total":51.00,"shipping_cost":0.00}
{"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.00,"tax":3.33,"grand_total":38.33,"shipping_cost":2.00}

Once you’ve sent all the records you should see the same output in your console consumer window. After you’ve confirmed receiving all records, go ahead and close the consumer by entering CTRL+C.

7
Produce records with full key-value pairs

Kafka works with key-value pairs, but so far you’ve only sent records with values only. Well to be fair you’ve sent key-value pairs, but the keys are null. Sometimes you’ll need to send a valid key in addition to the value from the command line.

To enable sending full key-value pairs from the command line you add two properties to your console producer, parse.key and key.separator. Since we want the key to use String and not a schema, also set the configuration parameter for key.serializer (by default, kafka-avro-console-producer expects the key to also be serialized as Avro and you would have to pass in key.schema).

Let’s try to send some full key-value records now. If your previous console producer is still running close it with a CTRL+C and run the following command to start a new console producer:

kafka-avro-console-producer \
  --topic orders-avro \
  --bootstrap-server broker:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --property value.schema="$(< /etc/tutorial/orders-avro-schema.json)" \
  --property key.serializer=org.apache.kafka.common.serialization.StringSerializer \
  --property parse.key=true \
  --property key.separator=":"

Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:

6:{"number":6,"date":18505,"shipping_address":"9182 Shipyard Drive, Raleigh, NC. 27609","subtotal":72.00,"tax":3.00,"grand_total":75.00,"shipping_cost":0.00}
7:{"number":7,"date":18506,"shipping_address":"644 Lagon Street, Chicago, IL. 07712","subtotal":11.00,"tax":1.00,"grand_total":14.00,"shipping_cost":2.00}

8
Start a consumer to show full key-value pairs

Next, let’s run the consumer to read records from the topic. Since the key was serialized as just a String and not a schema, also set the configuration parameter for key.deserializer (by default, kafka-avro-console-consumer expects the key to also be deserialized as Avro).

kafka-avro-console-consumer \
  --topic orders-avro \
  --property schema.registry.url=http://localhost:8081 \
  --bootstrap-server broker:9092 \
  --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
  --property print.key=true \
  --property key.separator="-" \
  --from-beginning

After the consumer starts you should see the following output in a few seconds:

null-{"number":1,"date":18500,"shipping_address":"ABC Sesame Street,Wichita, KS. 12345","subtotal":110.0,"shipping_cost":0.0,"tax":10.0,"grand_total":120.0}
null-{"number":2,"date":18501,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":5.0,"shipping_cost":1.0,"tax":0.53,"grand_total":6.53}
null-{"number":3,"date":18502,"shipping_address":"5014  Pinnickinick Street, Portland, WA. 97205","subtotal":93.45,"shipping_cost":0.0,"tax":9.34,"grand_total":102.79}
null-{"number":4,"date":18503,"shipping_address":"4082 Elmwood Avenue, Tempe, AX. 85281","subtotal":50.0,"shipping_cost":0.0,"tax":1.0,"grand_total":51.0}
null-{"number":5,"date":18504,"shipping_address":"123 Cross Street,Irving, CA. 12345","subtotal":33.0,"shipping_cost":2.0,"tax":3.33,"grand_total":38.33}
6-{"number":6,"date":18505,"shipping_address":"9182 Shipyard Drive, Raleigh, NC. 27609","subtotal":72.0,"shipping_cost":0.0,"tax":3.0,"grand_total":75.0}
7-{"number":7,"date":18506,"shipping_address":"644 Lagon Street, Chicago, IL. 07712","subtotal":11.0,"shipping_cost":2.0,"tax":1.0,"grand_total":14.0}

Since we kept the --from-beginning property, you’ll see all the records sent to the topic. You’ll notice the results before you sent keys are formatted as null-<value>.

9
Clean Up

You’re all done now!

Go back to your open windows and stop any console producers and consumers with a CTRL+C then close the container shells with a CTRL+D command.

Then you can shut down the stack by running:

docker-compose down

Deploy on Confluent Cloud

1
Run your app to Confluent Cloud

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully-managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.

2
See other documentation

For more information on how to use the Apache Kafka® command line tools with Confluent Cloud, see https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/kafka-commands.html