What is the simplest way to write and read messages encoded in Avro to and from Kafka?
To get started, make a new directory anywhere you’d like for this project:
mkdir console-consumer-producer-avro && cd console-consumer-producer-avro
Next, create the following docker-compose.yml
file to obtain Confluent Platform.
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
schema-registry:
image: confluentinc/cp-schema-registry:5.5.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
volumes:
- ./schema:/opt/app/schema/
Now launch Confluent Platform by running:
docker-compose up -d
Your first step is to create a topic to produce to and consume from. Use the following command to create the topic:
docker-compose exec broker kafka-topics --create --topic example-topic-avro --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
Before you create a schema, let’s make a directory to put it in:
mkdir schema
Next let’s create the schema to use when formatting the messages to produce to the topic created in previous step.
From the same terminal you used to create the topic, create the following order_detail.avsc
file inside the folder you just created:
{
"type": "record",
"namespace": "io.confluent.tutorial.pojo.avro",
"name": "OrderDetail",
"fields": [
{
"name": "number",
"type": "long",
"doc": "The order number."
},
{
"name": "date",
"type": "long",
"logicalType": "date",
"doc": "The date the order was submitted."
},
{
"name": "shipping_address",
"type": "string",
"doc": "The shipping address."
},
{
"name": "subtotal",
"type": "double",
"doc": "The amount without shipping cost and tax."
},
{
"name": "shipping_cost",
"type": "double",
"doc": "The shipping cost."
},
{
"name": "tax",
"type": "double",
"doc": "The applicable tax."
},
{
"name": "grand_total",
"type": "double",
"doc": "The order grand total ."
}
]
}
Next let’s open up an Avro console consumer to read records sent to the topic you created in previous step.
From the same terminal you used to create the topic and schema above, run the following command to open a terminal on the broker container:
docker-compose exec schema-registry bash
From within the terminal on the schema-registry container, run this command to start an Avro console consumer:
kafka-avro-console-consumer --topic example-topic-avro --bootstrap-server broker:9092
The consumer will start up and block waiting for records, you won’t see any output until after the next step.
To produce your first Avro record into Kafka, open another terminal window and run the following command to open a second shell on the broker container:
docker-compose exec schema-registry bash
From inside the second terminal on the schema-registry container, run the following command to start a console producer with the schema you created previously:
kafka-avro-console-producer --topic example-topic-avro --bootstrap-server broker:9092 --property value.schema="$(< /opt/app/schema/order_detail.avsc)"
The producer will start and wait for you to enter input. Each line represents one Avro record and to send it you’ll hit the enter key.
Try copying and pasting one line at a time, hit enter and go back to the Avro console consumer window and look for the output.
{"number": 2343434, "date": 1596490462, "shipping_address": "456 Everett St, Palo Alto, 94301 CA, USA", "subtotal": 99.0, "shipping_cost": 0.0, "tax": 8.91, "grand_total": 107.91}
{"number": 2343435, "date": 1596491687, "shipping_address": "518 Castro St, Mountain View, 94305 CA, USA", "subtotal": 25.0, "shipping_cost": 0.0, "tax": 2.91, "grand_total": 27.91}
Once you’ve sent all the records you should see the same output in your Avro console consumer window. After you’ve confirmed receiving all records go ahead and close the consumer by entering CTRL+C.
In the first consumer example, you observed all incoming records because the consumer was already running, waiting for incoming records.
But what if the records were produced before you started your consumer? In that case you wouldn’t see the records as the console consumer by default only reads incoming records arriving after it has started up.
But what about reading previously sent records? In that case, you’ll add one property --from-beginning
to the start command for the console consumer.
To demonstrate reading from the beginning, close the current console consumer in the consumer terminal by typing Ctrl-C
Now go back to your producer console and send the following records:
{"number": 2343436, "date": 1596491687, "shipping_address": "89 Addison St, Palo Alto, 94302 CA, USA", "subtotal": 10.0, "shipping_cost": 0.0, "tax": 1, "grand_total": 11.0}
{"number": 2343437, "date": 1596490492, "shipping_address": "456 Charles St, Beverly Hills, 90209 CA, USA", "subtotal": 450.0, "shipping_cost": 10.0, "tax": 28.91, "grand_total": 488.91}
{"number": 2343438, "date": 1596490692, "shipping_address": "456 Preston St, Brooklyn, 11212 NY, USA", "subtotal": 34.0, "shipping_cost": 2.0, "tax": 3, "grand_total": 39.00}
Next let’s open up a console consumer again, but this time you will consume everything including what the producer sent in the previous step.
Run this command in the container shell you created for your first consumer and note the additional property --from-beginning
:
kafka-avro-console-consumer --topic example-topic-avro --bootstrap-server broker:9092 --from-beginning
After the consumer starts you should see the following output in a few seconds:
{"number":2343434,"date":1596490462,"shipping_address":"456 Everett St, Palo Alto, 94301 CA, USA","subtotal":99.0,"shipping_cost":0.0,"tax":8.91,"grand_total":107.91}
{"number":2343435,"date":1596491687,"shipping_address":"518 Castro St, Mountain View, 94305 CA, USA","subtotal":25.0,"shipping_cost":0.0,"tax":2.91,"grand_total":27.91}
{"number":2343436,"date":1596491687,"shipping_address":"89 Addison St, Palo Alto, 94302 CA, USA","subtotal":10.0,"shipping_cost":0.0,"tax":1.0,"grand_total":11.0}
{"number":2343437,"date":1596490492,"shipping_address":"456 Charles St, Beverly Hills, 90209 CA, USA","subtotal":450.0,"shipping_cost":10.0,"tax":28.91,"grand_total":488.91}
{"number":2343438,"date":1596490692,"shipping_address":"456 Preston St, Brooklyn, 11212 NY, USA","subtotal":34.0,"shipping_cost":2.0,"tax":3.0,"grand_total":39.0}
{"number":2343439,"date":1596501510,"shipping_address":"1600 Pennsylvania Avenue NW, Washington, DC 20500, USA","subtotal":1000.0,"shipping_cost":20.0,"tax":0.0,"grand_total":1020.0}
{"number":2343440,"date":1596501510,"shipping_address":"55 Music Concourse Dr, San Francisco, CA 94118, USA","subtotal":345.0,"shipping_cost":10.0,"tax":10.0,"grand_total":365.0}
One word of caution with using the --from-beginning
flag. As the name implies this setting forces the consumer retrieve every record currently on the topic. So it’s best to use when testing and learning and not on a production topic.
Again, once you’ve recieved all records, close this console consumer by entering a CTRL+C
.
Kafka works with key-value pairs, but so far you’ve only sent records with values only. Well to be fair you’ve sent key-value pairs, but the keys are null
.
Sometimes you’ll need to send a valid key in addition to the value from the command line.
To enable sending full key-value pairs from the command line you add two properties to your console producer, parse.keys
and key.separtor
Let’s try to send some full key-value records now. If your previous console producer is still running close it with a CTRL+C
and run the following command to start a new console producer:
kafka-avro-console-producer --topic example-topic-avro --bootstrap-server broker:9092 \
--property key.schema='{"type":"string"}' \
--property value.schema="$(< /opt/app/schema/order_detail.avsc)" \
--property parse.key=true \
--property key.separator=":"
Then enter these records either one at time or copy-paste all of them into the terminal and hit enter:
"122345":{"number": 2343439, "date": 1596501510, "shipping_address": "1600 Pennsylvania Avenue NW, Washington, DC 20500, USA", "subtotal": 1000.0, "shipping_cost": 20.0, "tax": 0.00, "grand_total": 1020.00}
"256743":{"number": 2343440, "date": 1596501510, "shipping_address": "55 Music Concourse Dr, San Francisco, CA 94118, USA", "subtotal": 345.00, "shipping_cost": 10.00, "tax": 10.00, "grand_total": 365.00}
Now that we’ve produced full key-value pairs from the command line, you’ll want to consume full key-value pairs from the command line as well.
If your console consumer from the previous step is still open, shut it down with a CTRL+C
. Then run the following command to re-open the console consumer but now it will print the full key-value pair. Note the added properties of print.key
and key.separator
. You should also take note that there’s a different key separator used here, you don’t have to use the same one between console producers and consumers.
kafka-avro-console-consumer --topic example-topic-avro --bootstrap-server broker:9092 \
--from-beginning \
--property print.key=true \
--property key.separator="-"
After the consumer starts you should see the following output in a few seconds:
null-{"number":2343434,"date":1596490462,"shipping_address":"456 Everett St, Palo Alto, 94301 CA, USA","subtotal":99.0,"shipping_cost":0.0,"tax":8.91,"grand_total":107.91}
null-{"number":2343435,"date":1596491687,"shipping_address":"518 Castro St, Mountain View, 94305 CA, USA","subtotal":25.0,"shipping_cost":0.0,"tax":2.91,"grand_total":27.91}
null-{"number":2343436,"date":1596491687,"shipping_address":"89 Addison St, Palo Alto, 94302 CA, USA","subtotal":10.0,"shipping_cost":0.0,"tax":1.0,"grand_total":11.0}
null-{"number":2343437,"date":1596490492,"shipping_address":"456 Charles St, Beverly Hills, 90209 CA, USA","subtotal":450.0,"shipping_cost":10.0,"tax":28.91,"grand_total":488.91}
null-{"number":2343438,"date":1596490692,"shipping_address":"456 Preston St, Brooklyn, 11212 NY, USA","subtotal":34.0,"shipping_cost":2.0,"tax":3.0,"grand_total":39.0}
"122345"-{"number":2343439,"date":1596501510,"shipping_address":"1600 Pennsylvania Avenue NW, Washington, DC 20500, USA","subtotal":1000.0,"shipping_cost":20.0,"tax":0.0,"grand_total":1020.0}
"256743"-{"number":2343440,"date":1596501510,"shipping_address":"55 Music Concourse Dr, San Francisco, CA 94118, USA","subtotal":345.0,"shipping_cost":10.0,"tax":10.0,"grand_total":365.0}
Since we kept the --from-beginning
property, you’ll see all the records sent to the topic. You’ll notice the results before you sent keys null-<value>
.
You’re all down now!
Go back to your open windows and stop any console producers and consumers with a CTRL+C
then close the containter shells with a CTRL+D
command.
Then you can shut down the stack by running:
docker-compose down -v
Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.
First, create your Kafka cluster in Confluent Cloud.
Use the promo code CC100KTS
to receive an additional $100 free usage (details).
Next, from the Confluent Cloud UI, click on Tools & client config
to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application.
Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.