Generate complex streams of test data

Question:

How can I generate realistic test data in Kafka?

Edit this page

Example use case:

Perhaps you are building an application, or constructing a pipeline, and you would like some mock data to test it with. Using this connector it's possible to generate realistic test data that can also be made referentially consistent across topics.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir generate-test-data-streams && cd generate-test-data-streams

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform. Make sure that you create this file in the same place as the cities.sql file that you created above.

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.1
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:5.4.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

  schema-registry:
    image: confluentinc/cp-schema-registry:5.4.1
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect-base:5.4.1
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
    command:
      # In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
      - bash
      - -c
      - |
        echo "Installing connector plugins"
        mkdir -p /usr/share/confluent-hub-components/
        confluent-hub install --no-prompt --component-dir /usr/share/confluent-hub-components/ mdrogalis/voluble:0.2.1
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity

  kafkacat:
    image: edenhill/kafkacat:1.5.0
    container_name: kafkacat
    links:
      - broker
    entrypoint:
      - /bin/sh
      - -c
      - |
        apk add jq;
        while [ 1 -eq 1 ];do sleep 60;done

Now launch Confluent Platform by running:

docker-compose up -d

Before continuing, you need to wait for all the containers to fully start up. Run this script which will wait for the final dependencies in the chain to be ready:

#!/bin/bash

# Wait for Schema Registry to become available
while :
  do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8081)
  echo -e $(date) " Component: Schema Registry\t\tHTTP state: " $curl_status "\t(waiting for 200)"
  if [ $curl_status -eq 200 ]
    then
      echo "✅✅ Schema Registry is ready"
      break
  fi
  sleep 5
done

# Wait for Kafka Connect to become available
while :
  do curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/)
  echo -e $(date) " Component: Kafka Connect \t\tHTTP state: " $curl_status "\t(waiting for 200)"
  if [ $curl_status -eq 200 ]
    then
      echo "✅✅ Kafka Connect is ready"
      break
  fi
  sleep 5
done

Once everything is ready you should see this, and get control back at your terminal:

Wed 1 Apr 2020 11:58:49 BST  Component: Schema Registry         HTTP state:  200        (waiting for 200)
✅ Schema Registry is ready
Wed 1 Apr 2020 11:58:49 BST  Component: Kafka Connect           HTTP state:  200        (waiting for 200)
✅ Kafka Connect is ready

Once the stack has started, run the following command to ensure that the connector plugin that we’re going to be using has been loaded.

docker exec -i connect curl -s localhost:8083/connector-plugins|jq '.[].class'|grep Voluble

You should see this output

"io.mdrogalis.voluble.VolubleSourceConnector"

3
Create a standalone stream of test data

We’re going to use Kafka Connect to run the data generator, which is available as a source connector.

To start with, let’s create a connector populating a single topic with some random purchase transactions

curl -X PUT http://localhost:8083/connectors/clicks/config \
     -i -H "Content-Type: application/json" -d '{
    "connector.class"             : "io.mdrogalis.voluble.VolubleSourceConnector",
    "key.converter"               : "org.apache.kafka.connect.storage.StringConverter",

    "genkp.clicks.with"           : "#{Number.randomDigit}",
    "attrkp.clicks.null.rate"     : 1,
    "genv.clicks.source_ip.with"  : "#{Internet.ipV4Address}",
    "genv.clicks.host.with"       : "#{Internet.url}",
    "genv.clicks.path.with"       : "#{File.fileName}",
    "genv.clicks.user_agent.with" : "#{Internet.userAgentAny}",
    "topic.clicks.throttle.ms"    : 1000
}'

Check that the connector is running:

curl -s http://localhost:8083/connectors/clicks/status

You should see that the state is RUNNING:

{"name":"clicks","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

4
Consume events from the test topic

With the connector running let’s now inspect the data that’s being generated to the Kafka topic. Here we’ll use kafkacat, but you could use any Kafka consumer if you wanted.

docker exec -i kafkacat kafkacat -b broker:9092 -t clicks \
            -s value=avro -r http://schema-registry:8081        \
            -u -f 'Key     (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n' \
            -C -c5 -o end

You should see 5 messages and then the command will exit. Observe that the one message is being generated per second, which is what was specified in the 'topic.clicks.throttle.ms' = 1000 setting.

Key     (-1 bytes):
Payload (161 bytes):	{"user_agent": {"string": "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)"}, "source_ip": {"string": "104.26.97.183"}, "path": {"string": "omnis_est/odit.avi"}, "host": {"string": "www.edmond-weissnat.co"}}
--
Key     (-1 bytes):
Payload (189 bytes):	{"user_agent": {"string": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.113 Safari/537.36"}, "source_ip": {"string": "35.161.154.175"}, "path": {"string": "dolore_excepturi/error.mp3"}, "host": {"string": "www.annmarie-moore.co"}}
--
Key     (-1 bytes):
Payload (136 bytes):	{"user_agent": {"string": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:54.0) Gecko/20100101 Firefox/54.0"}, "source_ip": {"string": "220.72.31.248"}, "path": {"string": "neque_ut/in.html"}, "host": {"string": "www.mervin-torphy.com"}}
--
Key     (-1 bytes):
Payload (154 bytes):	{"user_agent": {"string": "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.0; Windows NT 5.1; .NET CLR 1.1.4322)"}, "source_ip": {"string": "175.177.231.111"}, "path": {"string": "et_exercitationem/nihil.css"}, "host": {"string": "www.sarah-cassin.biz"}}
--
Key     (-1 bytes):
Payload (143 bytes):	{"user_agent": {"string": "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.11) Gecko GranParadiso/3.0.11"}, "source_ip": {"string": "48.240.212.93"}, "path": {"string": "quos_nam/vel.json"}, "host": {"string": "www.joe-swaniawski.io"}}
--

You can also run this without the -c5 argument to see a continuous stream of messages. If you do this, press Ctrl-C to return to the command prompt.

Now let’s see how you can generate streams of data that is related and consistent.

Consider an example where we want to generate some dummy data about network devices. The stream of events would have fields such as

→ The MAC address of the device

→ The number of bytes sent

Now let’s imagine that instead of generating lots of random MAC addresses, we want a limited set so that the data is more realistic, tying it back to a fixed set of imaginary devices. Each device will have some characteristics:

→ MAC address (primary key / unique identifier)

→ Device name

→ Location

Run the following to create the connector:

curl -X PUT http://localhost:8083/connectors/network_traffic/config \
     -i -H "Content-Type: application/json" -d '{
    "connector.class"                : "io.mdrogalis.voluble.VolubleSourceConnector",
    "key.converter"                  : "org.apache.kafka.connect.storage.StringConverter",
    "genkp.devices.with"                   : "#{Internet.macAddress}",
     "genv.devices.name.with"              : "#{GameOfThrones.dragon}",
     "genv.devices.location->city.with"    : "#{Address.city}",
     "genv.devices.location->country.with" : "#{Address.country}",
    "topic.devices.records.exactly"        : 10,
     "genkp.traffic.with"                : "#{Number.randomDigit}",
    "attrkp.traffic.null.rate"           : 1,
      "genv.traffic.mac.matching"        : "devices.key",
      "genv.traffic.bytes_sent.with"     : "#{Number.numberBetween \u002764\u0027,\u00274096\u0027}",
     "topic.traffic.throttle.ms"         : 500
}'

The connector settings can be understood as follows:

  • Devices

genkp sets the macAddress as the primitive key

→ The city and country are set as nested values in the location entity

records.exactly limits the number of records produced to 10

  • Traffic

→ Not all messages in Kafka are keyed, and this shows an example of generating null keys by setting the null rate to 100% (null.rate has a value between 0 and 1).

'genkp.traffic.with'                = '#{Number.randomDigit}',
'attrkp.traffic.null.rate'           = 1,

→ The MAC address of the device is set as one of the existing keys on devices

→ The rate at which messages are produced is limited to two per second ('throttle.ms' = 500)

Check that the connector is running:

curl -s http://localhost:8083/connectors/network_traffic/status

You should see that the state is RUNNING:

{"name":"network_traffic","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}

6
Consume events from the test topics

We now have two Kafka topics being written to. The first (devices) is keyed on the MAC address, as can be seen from the data:

docker exec -i kafkacat kafkacat -b broker:9092 -t devices \
            -s key=s -s value=avro -r http://schema-registry:8081        \
            -f 'Key     (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n' \
            -C -c3 -o beginning 2>/dev/null
Key     (17 bytes):	9b:64:90:5f:66:d6
Payload (44 bytes):	{"name": {"string": "Tyraxes"}, "location": {"Gen1": {"city": {"string": "Parisianmouth"}, "country": {"string": "South Africa"}}}}
--
Key     (17 bytes):	4c:3c:12:83:f0:47
Payload (50 bytes):	{"name": {"string": "Balerion"}, "location": {"Gen1": {"city": {"string": "Lueilwitzborough"}, "country": {"string": "Cayman Islands"}}}}
--
Key     (17 bytes):	0e:1c:5f:e9:9a:33
Payload (48 bytes):	{"name": {"string": "Vermithrax"}, "location": {"Gen1": {"city": {"string": "Fisherberg"}, "country": {"string": "Pitcairn Islands"}}}}
--

The second is a stream of information about network traffic for the devices created in the topic above.

docker exec -i kafkacat kafkacat -b broker:9092 -t traffic \
            -s key=s -s value=avro -r http://schema-registry:8081        \
            -f 'Key     (%K bytes):\t%k\nPayload (%S bytes):\t%s\n--\n' \
            -C -c5 -o end -u 2>/dev/null
Key     (-1 bytes):
Payload (30 bytes):	{"mac": {"string": "0f:68:80:67:1b:70"}, "bytes_sent": {"string": "3181"}}
--
Key     (-1 bytes):
Payload (30 bytes):	{"mac": {"string": "41:4f:20:bf:22:c7"}, "bytes_sent": {"string": "3279"}}
--
Key     (-1 bytes):
Payload (30 bytes):	{"mac": {"string": "35:36:f7:71:92:10"}, "bytes_sent": {"string": "2950"}}
--
Key     (-1 bytes):
Payload (28 bytes):	{"mac": {"string": "35:36:f7:71:92:10"}, "bytes_sent": {"string": "65"}}
--
Key     (-1 bytes):
Payload (30 bytes):	{"mac": {"string": "0e:1c:5f:e9:9a:33"}, "bytes_sent": {"string": "2552"}}
--

7
Clean up

Shut down the stack by running:

docker-compose down