Handling multiple event types in a topic

Question:

How can you have multiple event types in a topic and maintain topic-name subject constraints?

Edit this page

Example use case:

You have distinct but related event types and you want to produce them to the same topic, but you also want to maintain topic-name subject constraints. Why produce different events to the different same topic? One reason would be you have low-traffic topics and you'd like to consolodate them to reduce overhead for the brokers. Or you need to get the exact order of different events and by producing them to the same topic you are guaranteed correct ordering per-partition.
To do multiple events with topic-name constraints you'll need to use schema references, which is a schema that contains a field representing an object which is reference to another another schema. In this tutorial you'll learn about using schema references with both Protobuf and Avro.
For more information on multiple event topics you can read Put several event types in a Kafka Topic by Martin Kleppmann and Putting Several Event Types in the Same Topic – Revisited by Robert Yokota.

Hands-on code example:

Run it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir multiple-event-types && cd multiple-event-types

Next, create a directory for configuration data:

mkdir configuration

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform:

version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.0.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR

  schema-registry:
    image: confluentinc/cp-schema-registry:7.0.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:9092'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

And launch it by running:

docker-compose up -d

3
Create topics

In this step we’re going to create a topics needed for this tutorial.

Since you are going to produce records using Protobuf and Avro serialization, you’ll need two topics.

But first, you’re going to open a shell on the broker docker container.

Open a new terminal and window then run this command:

docker-compose exec broker bash

Now use the following commands to create the topics:

kafka-topics --create --topic proto-events --bootstrap-server broker:9092 --replication-factor 1 --partitions 1
kafka-topics --create --topic avro-events --bootstrap-server broker:9092 --replication-factor 1 --partitions 1

Keep this terminal window open as you’ll need to run a console-producer in a few steps.

4
Configure the project

Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        mavenCentral()
        maven {
            url = uri("https://packages.confluent.io/maven/")
        }
        maven {
            url = uri("https://plugins.gradle.org/m2/")
        }
        maven {
            url = uri("https://jitpack.io")
        }
    }
    dependencies {
        classpath "com.github.jengelman.gradle.plugins:shadow:4.0.2"
    }
}

plugins {
    id "java"
    id "com.google.cloud.tools.jib" version "3.1.1"
    id "idea"
    id "eclipse"
    id "com.github.imflog.kafka-schema-registry-gradle-plugin" version "1.5.0"
    id "com.google.protobuf" version "0.8.17"
    id "com.github.davidmc24.gradle.plugin.avro" version "1.0.0"
}


sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
version = "0.0.1"

repositories {
    mavenCentral()
    maven {
        url = uri("https://packages.confluent.io/maven/")
    }

    maven {
        url = uri("https://jitpack.io")
    }
}

apply plugin: "com.github.johnrengelman.shadow"

dependencies {
    implementation 'com.google.protobuf:protobuf-java:3.18.1'
    implementation 'org.apache.avro:avro:1.10.2'
    implementation 'org.slf4j:slf4j-simple:1.7.32'
    implementation "org.apache.kafka:kafka-streams:3.0.0"
    testImplementation "junit:junit:4.13.2"
    testImplementation 'org.hamcrest:hamcrest:2.2'
    implementation "io.confluent:kafka-avro-serializer:7.0.0"
    implementation "io.confluent:kafka-protobuf-serializer:7.0.0"
    implementation "io.confluent:kafka-protobuf-provider:7.0.0"
}

protobuf {
    generatedFilesBaseDir = "${project.buildDir}/generated-main-proto-java"

    protoc {
        artifact = 'com.google.protobuf:protoc:3.15.3'
    }
}

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = "full"
    }
}

jar {
  manifest {
    attributes(
      "Class-Path": configurations.compileClasspath.collect { it.getName() }.join(" "),
      "Main-Class": "io.confluent.developer.MultiEventProtobufProduceConsumeApp"
    )
  }
}

shadowJar {
    archiveBaseName = "multiple-event-types-standalone"
    archiveClassifier = ''
}

schemaRegistry {
    def props = new Properties()
    def configs = file("configuration/ccloud.properties")
    if (configs.exists()) {
        configs.withInputStream { props.load(it) }
        def srUrl = props.getProperty("schema.registry.url")
        def auth = props.getProperty("basic.auth.user.info").split(":")
        println "Using Confluent properties Schema Registry endpoint:${srUrl}, username:${auth[0]},password:${auth[1]}"

        url = srUrl

        credentials {
            // username is the characters up to the ':' in the basic.auth.user.info property
            username = auth[0]
            // password is everything after ':' in the basic.auth.user.info property
            password = auth[1]
        }
    } else if (file("configuration/dev.properties").exists()) {
        configs = file("configuration/dev.properties")
        configs.withInputStream { props.load(it) }
        def srUrl = props.getProperty("schema.registry.url")
        println "Using local dev properties Schema Registry endpoint:${srUrl}"
    } else {
        println "No configs to parse yet"
    }


    // Possible types are ["JSON", "PROTOBUF", "AVRO"]
    register {
        subject('page-view', 'src/main/avro/page-view.avsc', 'AVRO')
        subject('purchase', 'src/main/avro/purchase.avsc', 'AVRO')
        subject('avro-events-value', 'src/main/avro/all-events.avsc', 'AVRO')
                .addReference("io.confluent.developer.avro.PageView", "page-view", 1)
                .addReference("io.confluent.developer.avro.Purchase", "purchase", 1)
    }

}

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Then create a development configuration file at configuration/dev.properties:

bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081
max.poll.interval.ms=300000
enable.auto.commit=true
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
key.serializer=org.apache.kafka.common.serialization.StringSerializer

# Application specific properties
proto.topic.name=proto-events
proto.topic.partitions=1
proto.topic.replication.factor=1

avro.topic.name=avro-events
avro.topic.partitions=1
avro.topic.replication.factor=1

5
Create the Protobuf Schemas

In this tutorial let’s say you have a micro-service for an e-commerce site and you track both customer page-views and purchases. Since the page-views could be highly related to any purchase, you’d like to capture the exact order of both of these event types as they occur, so producing the events to the same topic makes sense.

Since you use the customer id for the key, you are guaranteed to see the exact order of events since both types will live in the same partition. Even though these two events are similar, you represent them as distinct domain objects as it fits in well with others in your organization that need the same data.

For the Protobuf portion of the tutorial you’ll need to create three protobuf schemas. Two of the schemas represent the domain objects in the example scenario and the third schema contains references to the other two schemas.

To get started create a directory to place the schemas:

mkdir -p src/main/proto

Then create this schema file for the purchase domain object at src/main/proto/purchase.proto

syntax = "proto3";

package io.confluent.developer.proto;
option java_outer_classname = "PurchaseProto";

message Purchase {
  string item = 1;
  double amount = 2;
  string customer_id = 3;
}

Note that for this tutorial we won’t go into the specifics of Protocol Buffers, but you can read the Proto 3 language guide and the Protobuf Java tutorial

Next create the schema for the page-view object at src/main/proto/page-view.proto

syntax = "proto3";

package io.confluent.developer.proto;
option java_outer_classname = "PageViewProto";

message PageView {
  string url = 1;
  bool is_special = 2;
  string customer_id = 3;
}

Now that you have the schemas in place for your two domain objects, you’ll create the schema that references the other two.

Go ahead and create the file src/main/proto/customer-event.proto now and then we’ll go over some necessary details about it.

syntax = "proto3";

package io.confluent.developer.proto;

import "purchase.proto";
import "page-view.proto";  (1)

option java_outer_classname = "CustomerEventProto";

message CustomerEvent {  (2)

  oneof action {   (3)
    Purchase purchase = 1;
    PageView page_view = 2;
  }
  string id = 3;
}
1 Importing the other existing proto schema
2 The outer "container" event
3 A oneof field named action which will contain exactly one of the referenced types

This is where the "rubber hits the road" regarding schema references. Here you have the CustomerEvent object which contains either a Purchase or a PageView object in the action field. But instead of nesting schemas for these two objects, we reference existing ones. In addition to allowing for an effective way to combine multiple event types in the same topic while maintaining the TopicName subject name strategy, by using a reference you get the same benefits that you only have one place you need to go when you need to make schema updates.

Note that with Protobuf oneof can’t be a top-level field, it has to exist inside a "wrapper" class. This has implications when producing and consuming and you see what those are when we get to creating the KafkaProducer and KafkaConsumer for this tutorial.

6
Create the Avro Schemas

Now you’ll take a similar step and create the Avro schemas; this is done for comparison and is not strictly required. As with Protobuf, you’ll have two schemas for the domain objects and a third schema that will contain the references. Avro has a distinct difference regarding the reference schema, and you’ll see it as we go through this section.

So to get started create a directory for the avro schemas:

mkdir -p src/main/avro

Then create this schema file for the purchase domain object at src/main/avro/purchase.avsc

{
  "type":"record",
  "namespace": "io.confluent.developer.avro",
  "name":"Purchase",
  "fields": [
    {"name": "item", "type":"string"},
    {"name": "amount", "type": "double"},
    {"name": "customer_id", "type": "string"}
  ]
}

Right away you’ll notice one difference is that you write Avro schemas in JSON while Protobuf more closely resembles a programming language.

In this tutorial we won’t go into details about Avro. For more information you can read the Apache Avro documentation, Getting Started (Java) guide, and the Avro Specification.

Next create the schema for the page-view object at src/main/avro/page-view.avsc

{
  "type":"record",
  "namespace": "io.confluent.developer.avro",
  "name":"PageView",
  "fields": [
    {"name": "url", "type":"string"},
    {"name": "is_special", "type": "boolean"},
    {"name": "customer_id", "type":  "string"}
  ]
}

Now that you have the schemas in place for your two domain objects, you’ll create a third schema that references the other two.

Go ahead and create the file src/main/avro/all-events.avsc now:

[
  "io.confluent.developer.avro.Purchase",
  "io.confluent.developer.avro.PageView"
]

The all-events.avsc file contains an Avro Union. The union type in Avro is analogous to the Protobuf oneof field in that it indicates that a field might have more than one datatype.

But with Avro, a union can be a top-level element, hence you don’t have to create a wrapper or container class, the Avro schema itself is a union and it can represent either of the types listed in the union. To be clear you could create an Avro schema for a wrapper class and provide a union field within the schema, but we’re not going to cover that approach in this tutorial. The GitHub repo for the Multiple Events in Schema Registry Kafka Summit Europe 2021 presentation contains an example of using an outer Avro class containing a union field.

Now that you have created all the necessary schema files you need to compile them into code so that you can work with them in the application. The build.gradle file contains plugins for both Avro and Protobuf so all you need to do is run the following command to generate the Java code files:

./gradlew build

7
Register the schemas

Next you’ll need to register some schemas. When you have an Avro schema where the top level element is a union you need to register the individual schemas in the union first. Then you’ll register the parent schema itself along with references to the schemas making up the union element.

Fortunately, the gradle Schema Registry plugin makes this easy for us. Here’s the configuration that you already have in the build.gradle file:

Avro schemas configuration for using references
register {
        subject('page-view', 'src/main/avro/page_view.avsc', 'AVRO')  (1)
        subject('purchase', 'src/main/avro/purchase.avsc', 'AVRO')
        subject('avro-events-value', 'src/main/avro/all_events.avsc', 'AVRO')   (2)
                .addReference("io.confluent.developer.avro.PageView", "page-view", 1) (3)
                .addReference("io.confluent.developer.avro.Purchase", "purchase", 1)
    }
1 Registering the schemas for the referenced objects
2 The parent schema containing the references
3 Adding the references which point to the schemas registered previously

Now to register these Avro schemas, run this in the command line:

./gradlew registerSchemasTask

This task runs quickly, and you should see some text followed by this result in the console:

BUILD SUCCESSFUL

We don’t have a corresponding register command to register schemas for Protobuf. Instead you are going to use the auto registration feature for the Protobuf schemas because Protobuf will recursively register any proto files included in the main schema. Using the Confluent UI you can view the uploaded schemas by clicking in the Schema Registry tab and click on the individual schemas to inspect them.

We’ll get into some of the details more in the next section.

8
Create the Protobuf Multi Event application

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

To complete this tutorial, you’ll build an application that uses a KafkaProducer and KafkaConsumer instance for producing both Avro and Protobuf. The approach you’ll take in this tutorial is not typically of applications you’ll build in a production setting. But by using multiple clients you can compare how to handle multiple event types for each serializer format.

To that end, the point of this sample application is this: you want capture page-view and purchase events in the exact order that they occur and you feel the best option is to have these events produced to the same topic. Since the customer id will be the message key, you are guaranteed to get per-customer events in the order that they occur.

Let’s go over some of the key parts of the KafkaMultiEventConsumerApplication starting with the producer for the Protobuf events.

Since this an advanced topic, the tutorial doesn’t go into the basics of using a KafkaProducer. For more details see the KafkaProducer tutorial
KafkaProducer for Protobuf events
public void produceProtobufEvents(final Supplier<Producer<String, CustomerEventProto.CustomerEvent>> producerSupplier,
                                  final String topic,
                                  final List<CustomerEventProto.CustomerEvent> protoCustomerEvents) {

        try (Producer<String, CustomerEventProto.CustomerEvent> producer = producerSupplier.get()) { (1)
            protoCustomerEvents.stream()    (2)
                    .map((event -> new ProducerRecord<>(topic, event.getId(), event))) (3)
                    .forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception)

      //Details left out for clarity

   // Relevant configurations

 protoProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
}
1 Retrieving the producer instance from the Supplier
2 Using a java.util.stream to map each event into a ProducerRecord then send them to the broker
3 Creating the ProducerRecord instance.

There are two points to emphasize here. The first is the type of the producer, it’s using CustomerEventProto.CustomerEvent since you must use an outer class with Protobuf, the generics on the producer are a concrete type. As a result, to set the key to be the customer id you can call the CustomerEvent#getId method directly. Note the use of the Supplier to provide the producer this is done to delay the creation until the Supplier.get() method is executed. Using a supplier also makes testing easier by simplifying the process of providing a different implementation.

The second point is that you can use auto-registration feature of Schema Registry with Protobuf and the referenced schemas get registered recursively.

Next let’s move on to the KafkaConsumer for the Protobuf application.

KafkaConsumer for multiple events in Protobuf
   consumerRecords.forEach(consumerRec -> {
    CustomerEventProto.CustomerEvent customerEvent = consumerRec.value();
    switch (customerEvent.getActionCase()) { (1)
        case PURCHASE:
            eventTracker.add(customerEvent.getPurchase().getItem()); (2)
            break;
        case PAGE_VIEW:
            eventTracker.add(customerEvent.getPageView().getUrl()); (3)
            break;



// details left out for clarity
1 Using a switch statement for the different enum types
2 Adding the purchased item to the event tracker
3 Adding the page-view link to the event tracker

With Protobuf, when you have a oneof field, it generates an enum for each message that could be in the field—determining which type to work with can be done by using a switch statement. To retrieve the correct enum, you’ll use a get<field-name>Case method, in this case, getActionCase since the oneof field is named action.

Before you go on to create the application we should mention quickly about the deserialization configurations you need to set

Configurations needed by the Protobuf Consumer
 protoConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class); (1)
 protoConsumeConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, CustomerEventProto.CustomerEvent.class); (2)
1 Configurations for the Protobuf consumer to use the Protobuf deserializer
2 Setting the specific class type for the Protobuf deserializer

It should come as no surprise that you need to set the deserializer class to KafkaProtobufDeserializer for the Protobuf consumers. But when working with multiple types, you still need to set the configuration for a specific type. For Protobuf it’s straight forward, setting the specific type to the outer class makes sense since the proto deserialization process knows how to handle the embedded types due to the schema.

Now go ahead and create the src/main/java/io/confluent/developer/MultiEventProtobufProduceConsumeApp.java file:

package io.confluent.developer;

import io.confluent.developer.proto.CustomerEventProto;
import io.confluent.developer.proto.PageViewProto;
import io.confluent.developer.proto.PurchaseProto;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializerConfig;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class MultiEventProtobufProduceConsumeApp implements AutoCloseable {

    public static final String CUSTOMER_ID = "wilecoyote";
    private static final Logger LOG = LoggerFactory.getLogger(MultiEventProtobufProduceConsumeApp.class);
    private volatile boolean keepConsumingProto = true;
    final ExecutorService executorService = Executors.newFixedThreadPool(2);

    public void produceProtobufEvents(final Supplier<Producer<String, CustomerEventProto.CustomerEvent>> producerSupplier,
                                      final String topic,
                                      final List<CustomerEventProto.CustomerEvent> protoCustomerEvents) {

        try (Producer<String, CustomerEventProto.CustomerEvent> producer = producerSupplier.get()) {
            protoCustomerEvents.stream()
                    .map((event -> new ProducerRecord<>(topic, event.getId(), event)))
                    .forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception) -> {
                        if (exception != null) {
                            LOG.error("Error Protobuf producing message", exception);
                        } else {
                            LOG.debug("Produced Protobuf record offset {} timestamp {}", metadata.offset(), metadata.timestamp());
                        }
                    })));
        }
    }

    public void consumeProtoEvents(final Supplier<Consumer<String, CustomerEventProto.CustomerEvent>> consumerSupplier,
                                   final String topic,
                                   final List<String> eventTracker) {

        try (Consumer<String, CustomerEventProto.CustomerEvent> eventConsumer = consumerSupplier.get()) {
            eventConsumer.subscribe(Collections.singletonList(topic));
            while (keepConsumingProto) {
                ConsumerRecords<String, CustomerEventProto.CustomerEvent> consumerRecords = eventConsumer.poll(Duration.ofSeconds(1));
                consumerRecords.forEach(consumerRec -> {
                    CustomerEventProto.CustomerEvent customerEvent = consumerRec.value();
                    switch (customerEvent.getActionCase()) {
                        case PURCHASE:
                            eventTracker.add(String.format("Protobuf Purchase event -> %s", customerEvent.getPurchase().getItem()));
                            break;
                        case PAGE_VIEW:
                            eventTracker.add(String.format("Protobuf Pageview event -> %s", customerEvent.getPageView().getUrl()));
                            break;
                        default:
                            LOG.error("Unexpected type - this shouldn't happen");
                    }
                });
            }
        }
    }

    public List<CustomerEventProto.CustomerEvent> protobufEvents() {
        CustomerEventProto.CustomerEvent.Builder customerEventBuilder = CustomerEventProto.CustomerEvent.newBuilder();
        PageViewProto.PageView.Builder pageViewBuilder = PageViewProto.PageView.newBuilder();
        PurchaseProto.Purchase.Builder purchaseBuilder = PurchaseProto.Purchase.newBuilder();
        List<CustomerEventProto.CustomerEvent> events = new ArrayList<>();

        PageViewProto.PageView pageView = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/traps").setIsSpecial(false).build();
        PageViewProto.PageView pageViewII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bombs").setIsSpecial(false).build();
        PageViewProto.PageView pageViewIII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bait").setIsSpecial(true).build();
        PurchaseProto.Purchase purchase = purchaseBuilder.setCustomerId(CUSTOMER_ID).setItem("road-runner-bait").setAmount(99.99).build();

        events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageView(pageView).build());
        events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageView(pageViewII).build());
        events.add(customerEventBuilder.setId(CUSTOMER_ID).setPageView(pageViewIII).build());
        events.add((customerEventBuilder.setId(CUSTOMER_ID).setPurchase(purchase)).build());

        return events;
    }

    @Override
    public void close() {
        keepConsumingProto = false;
        executorService.shutdown();
    }


    public void createTopics(final Properties allProps) {
        try (final AdminClient client = AdminClient.create(allProps)) {

            final List<NewTopic> topics = new ArrayList<>();

            topics.add(new NewTopic(
                    allProps.getProperty("proto.topic.name"),
                    Integer.parseInt(allProps.getProperty("proto.topic.partitions")),
                    Short.parseShort(allProps.getProperty("proto.topic.replication.factor"))));

            client.createTopics(topics);
        }
    }


    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            LOG.error("Must provide the path to the properties");
        }
        Properties properties = new Properties();
        try (FileInputStream fis = new FileInputStream(args[0])) {
            properties.load(fis);
        }

        Map<String, Object> commonConfigs = new HashMap<>();
        properties.forEach((key, value) -> commonConfigs.put((String) key, value));


        try (MultiEventProtobufProduceConsumeApp multiEventApp = new MultiEventProtobufProduceConsumeApp()) {
            multiEventApp.createTopics(properties);
            String protobufTopic = (String) commonConfigs.get("proto.topic.name");

            LOG.info("Producing Protobuf events now");
            multiEventApp.produceProtobufEvents(() -> new KafkaProducer<>(protoProduceConfigs(commonConfigs)), protobufTopic, multiEventApp.protobufEvents());

            List<String> protoEvents = new ArrayList<>();
            multiEventApp.executorService.submit(() -> multiEventApp.consumeProtoEvents(() -> new KafkaConsumer<>(protoConsumeConfigs(commonConfigs)), protobufTopic, protoEvents));
            while (protoEvents.size() < 3) {
                Thread.sleep(100);
            }
            LOG.info("Consumed Proto Events {}", protoEvents);
        }
    }

    @NotNull
    static Map<String, Object> protoConsumeConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> protoConsumeConfigs = new HashMap<>(commonConfigs);
        protoConsumeConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer-group");
        protoConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaProtobufDeserializer.class);
        protoConsumeConfigs.put(KafkaProtobufDeserializerConfig.SPECIFIC_PROTOBUF_VALUE_TYPE, CustomerEventProto.CustomerEvent.class);
        return protoConsumeConfigs;
    }

    @NotNull
    static Map<String, Object> protoProduceConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> protoProduceConfigs = new HashMap<>(commonConfigs);
        protoProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaProtobufSerializer.class);
        return protoProduceConfigs;
    }
}

9
Create the Avro Multi Event application

Next up in the tutorial is to create the application for the Avro multiple events application.

KafkaProducer for Avro events
public void produceAvroEvents(final Supplier<Producer<String, SpecificRecordBase>> producerSupplier,
                              final String topic,
                              final List<SpecificRecordBase> avroEvents) {

        try (Producer<String, SpecificRecordBase> producer = producerSupplier.get()) {  (1)
           avroEvents.stream()  (2)
                    .map((event -> new ProducerRecord<>(topic, (String) event.get("customer_id"), event))) (3)
                    .forEach(producerRecord -> producer.send(producerRecord,
//Details left out for clarity

//Relevant configurations

 avroProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
 avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); (4)
 avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true) (5)
1 Getting the producer from the supplier
2 Streaming over the provided collection of Avro events to send
3 Creating the ProducerRecord instance, note the use of map-like access to get the required field for the key
4 Specifying to disable automatic schema registration
5 Setting to use the latest schema

You have a very similar approach with the Avro producer as you did with the Protobuf version, but take a took at the type at annotation one - it’s an abstract class, SpecificRecordBase that every Avro generated class inherits from. Since the schema for the Avro multi-event topic uses a union at the top level, you don’t know the concrete type. Since you want to use the customer id as the key you need to access the field in a map-like fashion by using the field name as it exists in the schema. This is possible because SpecificRecordBase implements the GenericRecord interface which provides the get method for retrieving a field value by name.

But the biggest difference is the configurations you provide to the producer for the Avro serializer, namely disabling automatic schema registration, otherwise it would override the union schema as the latest one. Additionally since you’ve set use.latest.version to true the serializer looks up the latest version, the union schema, and will use that for serialization. This blog post by Robert Yokota explains this mechanism in detail.

Next we’ll move on to creating the Avro consumer.

KafkaConsumer for multiple events in Avro
consumerRecords.forEach(consumerRec -> {
  SpecificRecord avroRecord = consumerRec.value(); (1)
  if (avroRecord instanceof Purchase) {    (2)
      Purchase purchase = (Purchase) avroRecord;  (3)
      eventTracker.add(purchase.getItem());
  } else if (avroRecord instanceof PageView) {
      PageView pageView = (PageView) avroRecord;
      eventTracker.add(pageView.getUrl());

// details left out for clarity
1 Getting the record
2 Doing an instanceof check to determine the type
3 Casting to the appropriate concrete type

With the Avro consumer you’ll need to use the Java instanceof operator to determine concrete type for the record. Notice that here you’re using the SpecificRecord interface which every Avro generated object implements. Once you find the correct concrete type you cast the record to that type and extract the required information.

Before you go on to create the application we should mention quickly about the deserialization configurations you need to set

Configurations needed by the Avro Consumer
 avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class); (1)
 avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); (2)
1 Specifying to use the Avro deserializer for the Avro consumer
2 Setting the Avro deserializer to use the specific reader

It should come as no surprise that you need to set the deserializer class to KafkaAvroDeserializer for the Avro consumer. But when working with multiple types, you still need to set the configuration for a specific type. With Avro, even with the union schema you’ll still need to specify to set the SPECIFIC_AVRO_READER_CONFIG to true to get the concrete types.

Now go ahead and create the src/main/java/io/confluent/developer/MultiEventAvroProduceConsumeApp.java file:

package io.confluent.developer;

import io.confluent.developer.avro.PageView;
import io.confluent.developer.avro.Purchase;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;

public class MultiEventAvroProduceConsumeApp implements AutoCloseable{

    public static final String CUSTOMER_ID = "wilecoyote";
    private static final Logger LOG = LoggerFactory.getLogger(MultiEventAvroProduceConsumeApp.class);
    private volatile boolean keepConsumingAvro = true;
    final ExecutorService executorService = Executors.newFixedThreadPool(1);

    public void produceAvroEvents(final Supplier<Producer<String, SpecificRecordBase>> producerSupplier,
                                  final String topic,
                                  final List<SpecificRecordBase> avroEvents) {

        try (Producer<String, SpecificRecordBase> producer = producerSupplier.get()) {
           avroEvents.stream()
                    .map((event -> new ProducerRecord<>(topic, (String) event.get("customer_id"), event)))
                    .forEach(producerRecord -> producer.send(producerRecord, ((metadata, exception) -> {
                        if (exception != null) {
                            LOG.error("Error Avro producing message", exception);
                        } else {
                            LOG.debug("Produced Avro record offset {} timestamp {}", metadata.offset(), metadata.timestamp());
                        }
                    })));
        }
    }

    public void consumeAvroEvents(final Supplier<Consumer<String, SpecificRecordBase>> consumerSupplier,
                                  final String topic,
                                  final List<String> eventTracker) {
        try (Consumer<String, SpecificRecordBase> eventConsumer = consumerSupplier.get()) {
            eventConsumer.subscribe(Collections.singletonList(topic));
            while (keepConsumingAvro) {
                ConsumerRecords<String, SpecificRecordBase> consumerRecords = eventConsumer.poll(Duration.ofSeconds(1));
                consumerRecords.forEach(consumerRec -> {
                    SpecificRecord avroRecord = consumerRec.value();
                    if (avroRecord instanceof Purchase) {
                        Purchase purchase = (Purchase) avroRecord;
                        eventTracker.add(String.format("Avro Purchase event -> %s",purchase.getItem()));
                    } else if (avroRecord instanceof PageView) {
                        PageView pageView = (PageView) avroRecord;
                        eventTracker.add(String.format("Avro Pageview event -> %s",pageView.getUrl()));
                    } else {
                        LOG.error("Unexpected type - this shouldn't happen");
                    }
                });
            }
        }
    }


    public List<SpecificRecordBase> avroEvents() {
        PageView.Builder pageViewBuilder = PageView.newBuilder();
        Purchase.Builder purchaseBuilder = Purchase.newBuilder();
        List<SpecificRecordBase> events = new ArrayList<>();

        PageView pageView = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/traps").setIsSpecial(false).build();
        PageView pageViewII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bombs").setIsSpecial(false).build();
        PageView pageViewIII = pageViewBuilder.setCustomerId(CUSTOMER_ID).setUrl("http://acme/bait").setIsSpecial(true).build();
        Purchase purchase = purchaseBuilder.setCustomerId(CUSTOMER_ID).setItem("road-runner-bait").setAmount(99.99).build();

        events.add(pageView);
        events.add(pageViewII);
        events.add(pageViewIII);
        events.add(purchase);

        return events;
    }

    @Override
    public void close() {
        keepConsumingAvro = false;
        executorService.shutdown();
    }


    public void createTopics(final Properties allProps) {
        try (final AdminClient client = AdminClient.create(allProps)) {

            final List<NewTopic> topics = new ArrayList<>();

            topics.add(new NewTopic(
                    allProps.getProperty("avro.topic.name"),
                    Integer.parseInt(allProps.getProperty("avro.topic.partitions")),
                    Short.parseShort(allProps.getProperty("avro.topic.replication.factor"))));

            client.createTopics(topics);
        }
    }


    public static void main(String[] args) throws Exception {
        if (args.length < 1) {
            LOG.error("Must provide the path to the properties");
        }
        Properties properties = new Properties();
        try (FileInputStream fis = new FileInputStream(args[0])) {
            properties.load(fis);
        }

        Map<String, Object> commonConfigs = new HashMap<>();
        properties.forEach((key, value) -> commonConfigs.put((String) key, value));


        try (MultiEventAvroProduceConsumeApp multiEventApp = new MultiEventAvroProduceConsumeApp()) {
            multiEventApp.createTopics(properties);
            String avroTopic = (String) commonConfigs.get("avro.topic.name");

            LOG.info("Producing Avro events");
            multiEventApp.produceAvroEvents(() -> new KafkaProducer<>(avroProduceConfigs(commonConfigs)), avroTopic, multiEventApp.avroEvents());

            List<String> avroEvents = new ArrayList<>();

            multiEventApp.executorService.submit(() -> multiEventApp.consumeAvroEvents(() -> new KafkaConsumer<>(avroConsumeConfigs(commonConfigs)), avroTopic, avroEvents));
            while (avroEvents.size() < 3) {
                Thread.sleep(100);
            }

            LOG.info("Consumed Avro Events {}", avroEvents);
        }
    }


    @NotNull
    static Map<String, Object> avroConsumeConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> avroConsumeConfigs = new HashMap<>(commonConfigs);
        avroConsumeConfigs.put(ConsumerConfig.GROUP_ID_CONFIG, "avro-consumer-group");
        avroConsumeConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        avroConsumeConfigs.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        return avroConsumeConfigs;
    }

    @NotNull
    static Map<String, Object> avroProduceConfigs(Map<String, Object> commonConfigs) {
        Map<String, Object> avroProduceConfigs = new HashMap<>(commonConfigs);
        avroProduceConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        avroProduceConfigs.put(AbstractKafkaSchemaSerDeConfig.USE_LATEST_VERSION, true);
        return avroProduceConfigs;
    }
}

10
Compile and run both the Multi Event applications

In your terminal, run:

./gradlew shadowJar

Now that you have an uberjar, you can launch each application locally. When you run the following, you’ll see some output as the producer sends records to the Kafka broker and you’ll also see the results of the multiple event consumer.

We’ll use a pipe and grep at the end of the command to filter out the logging from the clients which will make the results harder to see

First run the multi-event application for Protobuf

java -cp build/libs/multiple-event-types-standalone-0.0.1.jar io.confluent.developer.MultiEventProtobufProduceConsumeApp configuration/dev.properties 2>&1 | grep 'io.confluent.developer'

The output should look something like this:

[main] INFO io.confluent.developer.MultiEventProtobufProduceConsumeApp - Producing Protobuf events now
	specific.protobuf.value.type = class io.confluent.developer.proto.CustomerEventProto$CustomerEvent
[main] INFO io.confluent.developer.MultiEventProtobufProduceConsumeApp - Consumed Proto Events [Protobuf Pageview event -> http://acme/traps, Protobuf Pageview event -> http://acme/bombs, Protobuf Pageview event -> http://acme/bait, Protobuf Purchase event -> road-runner-bait]

Then run the multi-event application for Avro

java -cp build/libs/multiple-event-types-standalone-0.0.1.jar io.confluent.developer.MultiEventAvroProduceConsumeApp configuration/dev.properties 2>&1 | grep 'io.confluent.developer'

The output should look something like this:

[main] INFO io.confluent.developer.MultiEventAvroProduceConsumeApp - Producing Avro events
[main] INFO io.confluent.developer.MultiEventAvroProduceConsumeApp - Consumed Avro Events [Avro Pageview event -> http://acme/traps, Avro Pageview event -> http://acme/bombs, Avro Pageview event -> http://acme/bait, Avro Purchase event -> road-runner-bait]

Test it

1
Create a test configuration file

First, create a test file at configuration/test.properties:

proto.topic=proto-records
avro.topic=avro-records
schema.registry.url=mock://multi-event-produce-consume-test

2
Write tests for the Avro and Protobuf applications

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Testing a KafkaProducer and KafkaConsumer used in application fairly easy to accomplish thanks to the MockProducer and the MockConsumer. Since both the KafkaProducer and KafkaConsumer are well tested, we don’t need to test the clients themselves, we’ll use the mocks to verify that our logic executes as expected.

There are two test classes MultiEventAvroProduceConsumeAppTest and MultiEventProtobufProduceConsumeAppTest one for the Avro application and the Protobuf application. Before you create the tests, let’s look at some of key parts of using a mock producer and consumer.

Replaying the history of produced records
// Details left out for clarity

MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
                = new MockProducer<>(true, stringSerializer, protobufSerializer); (1)
List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);(2)

 actualKeyValues = mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList()); (3)
assertThat(actualKeyValues, equalTo(expectedKeyValues));
1 Creating the MockProducer
2 Executing the produce of Protobuf records with the mock producer
3 Replaying the history of the producer

In annotation 3 above, we can use a mock producer in the test to validate that all the records we expected to be produced were sent to the producer correctly. The test for the Avro producer has identical logic so we won’t review it here, but you can view the full source code if you’d like to see it.

For testing the consumer, it’s a little tricky because the consumer polls for records and will continue polling until you close the application. The MockConsumer provides a method schedulePollTask where you provide the action you want to take at each poll call.

Driving the behavior of a consumer poll
  mockConsumer.schedulePollTask(() -> {  (1)
        addTopicPartitionsAssignment(topic, mockConsumer);
        addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
    });
  mockConsumer.schedulePollTask(() -> produceConsumeApp.close()); (2)
1 Assigning the topic-partitions and records in the first poll call
2 Shutting down the application in the next call

In our case here for the first poll call we’ll assign the topic partitions then provide the records to the consumer to process. In the next poll call we simply shut the application down. Note that the methods in the first schedulePollTask are internal to the test and to fully understand what’s going on you’ll need to look at the source code for the test. The test for the Avro multi-event application more or less uses the same logic so we won’t review that test here.

Now create the following file for the Protobuf application test at src/test/java/io/confluent/developer/MultiEventProtobufProduceConsumeAppTest.java.

package io.confluent.developer;

import io.confluent.developer.proto.CustomerEventProto;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class MultiEventProtobufProduceConsumeAppTest {
    private static final Map<String, Object> commonConfigs = new HashMap<>();
    private static final Properties properties = new Properties();
    private final Serializer<String> stringSerializer = new StringSerializer();
    private MultiEventProtobufProduceConsumeApp produceConsumeApp;

    @BeforeClass
    public static void beforeAllTests() throws IOException {
        try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
            properties.load(fis);
            properties.forEach((key, value) -> commonConfigs.put((String) key, value));
        }
    }


    @Before
    public void setup() {
        produceConsumeApp = new MultiEventProtobufProduceConsumeApp();
    }

    @Test
    public void testProduceProtobufMultipleEvents() {
        KafkaProtobufSerializer<CustomerEventProto.CustomerEvent> protobufSerializer
                = new KafkaProtobufSerializer<>();
        protobufSerializer.configure(commonConfigs, false);
        MockProducer<String, CustomerEventProto.CustomerEvent> mockProtoProducer
                = new MockProducer<>(true, stringSerializer, protobufSerializer);
        List<CustomerEventProto.CustomerEvent> events = produceConsumeApp.protobufEvents();
        produceConsumeApp.produceProtobufEvents(() -> mockProtoProducer, (String) commonConfigs.get("proto.topic"), events);
        List<KeyValue<String, CustomerEventProto.CustomerEvent>> expectedKeyValues =
                produceConsumeApp.protobufEvents().stream().map((e -> KeyValue.pair(e.getId(), e))).collect(Collectors.toList());

        List<KeyValue<String, CustomerEventProto.CustomerEvent>> actualKeyValues =
                mockProtoProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
        assertThat(actualKeyValues, equalTo(expectedKeyValues));
    }

    @Test
    public void testConsumeProtobufEvents() {
        MockConsumer<String, CustomerEventProto.CustomerEvent> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        String topic = (String) commonConfigs.get("proto.topic");
        List<String> expectedProtoResults = Arrays.asList("Protobuf Pageview event -> http://acme/traps", "Protobuf Pageview event -> http://acme/bombs", "Protobuf Pageview event -> http://acme/bait", "Protobuf Purchase event -> road-runner-bait");
        List<String> actualProtoResults = new ArrayList<>();
        mockConsumer.schedulePollTask(()-> {
            addTopicPartitionsAssignment(topic, mockConsumer);
            addConsumerRecords(mockConsumer, produceConsumeApp.protobufEvents(), CustomerEventProto.CustomerEvent::getId, topic);
        });
        mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
        produceConsumeApp.consumeProtoEvents(() -> mockConsumer, topic, actualProtoResults);
        assertThat(actualProtoResults, equalTo(expectedProtoResults));
    }

    private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
        return KeyValue.pair(producerRecord.key(), producerRecord.value());
    }

    private <V> void addTopicPartitionsAssignment(final String topic,
                                                  final MockConsumer<String, V> mockConsumer) {
        final TopicPartition topicPartition = new TopicPartition(topic, 0);
        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
        beginningOffsets.put(topicPartition, 0L);
        mockConsumer.rebalance(Collections.singletonList(topicPartition));
        mockConsumer.updateBeginningOffsets(beginningOffsets);
    }

    private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
                                        final List<V> records,
                                        final Function<V, String> keyFunction,
                                        final String topic) {
        AtomicInteger offset = new AtomicInteger(0);
        records.stream()
                .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
                .forEach(mockConsumer::addRecord);
    }
}

Then create the file for the Avro application test at src/test/java/io/confluent/developer/MultiEventAvroProduceConsumeAppTest.java.

package io.confluent.developer;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

public class MultiEventAvroProduceConsumeAppTest {
    private static final Map<String, Object> commonConfigs = new HashMap<>();
    private static final Properties properties = new Properties();
    private final Serializer<String> stringSerializer = new StringSerializer();
    private MultiEventAvroProduceConsumeApp produceConsumeApp;

    @BeforeClass
    public static void beforeAllTests() throws IOException {
        try (FileInputStream fis = new FileInputStream("configuration/test.properties")) {
            properties.load(fis);
            properties.forEach((key, value) -> commonConfigs.put((String) key, value));
        }
    }


    @Before
    public void setup() {
        produceConsumeApp = new MultiEventAvroProduceConsumeApp();
    }

    @Test
    @SuppressWarnings("unchecked")
    public void testProduceAvroMultipleEvents() {
        KafkaAvroSerializer avroSerializer
                = new KafkaAvroSerializer();
        avroSerializer.configure(commonConfigs, false);
        MockProducer<String, SpecificRecordBase> mockAvroProducer
                = new MockProducer<String, SpecificRecordBase>(true, stringSerializer, (Serializer) avroSerializer);
        produceConsumeApp.produceAvroEvents(() -> mockAvroProducer, (String) commonConfigs.get("proto.topic"), produceConsumeApp.avroEvents());
        List<KeyValue<String, SpecificRecordBase>> expectedKeyValues =
                produceConsumeApp.avroEvents().stream().map((e -> KeyValue.pair((String) e.get("customer_id"), e))).collect(Collectors.toList());

        List<KeyValue<String, SpecificRecordBase>> actualKeyValues =
                mockAvroProducer.history().stream().map(this::toKeyValue).collect(Collectors.toList());
        assertThat(actualKeyValues, equalTo(expectedKeyValues));
    }

    @Test
    public void testConsumeAvroEvents() {
        MockConsumer<String, SpecificRecordBase> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        String topic = (String) commonConfigs.get("avro.topic");
        List<String> expectedAvroResults = Arrays.asList("Avro Pageview event -> http://acme/traps", "Avro Pageview event -> http://acme/bombs", "Avro Pageview event -> http://acme/bait", "Avro Purchase event -> road-runner-bait");
        List<String> actualAvroResults = new ArrayList<>();
        mockConsumer.schedulePollTask(() -> {
            addTopicPartitionsAssignment(topic, mockConsumer);
            addConsumerRecords(mockConsumer, produceConsumeApp.avroEvents(), (SpecificRecordBase r) -> (String) r.get("customer_id"), topic);
        });
        mockConsumer.schedulePollTask(() -> produceConsumeApp.close());
        produceConsumeApp.consumeAvroEvents(() -> mockConsumer, topic, actualAvroResults);
        assertThat(actualAvroResults, equalTo(expectedAvroResults));
    }

    private <K, V> KeyValue<K, V> toKeyValue(final ProducerRecord<K, V> producerRecord) {
        return KeyValue.pair(producerRecord.key(), producerRecord.value());
    }

    private <V> void addTopicPartitionsAssignment(final String topic,
                                                  final MockConsumer<String, V> mockConsumer) {
        final TopicPartition topicPartition = new TopicPartition(topic, 0);
        final Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
        beginningOffsets.put(topicPartition, 0L);
        mockConsumer.rebalance(Collections.singletonList(topicPartition));
        mockConsumer.updateBeginningOffsets(beginningOffsets);
    }

    private <V> void addConsumerRecords(final MockConsumer<String, V> mockConsumer,
                                        final List<V> records,
                                        final Function<V, String> keyFunction,
                                        final String topic) {
        AtomicInteger offset = new AtomicInteger(0);
        records.stream()
                .map(r -> new ConsumerRecord<>(topic, 0, offset.getAndIncrement(), keyFunction.apply(r), r))
                .forEach(mockConsumer::addRecord);
    }
}

3
Invoke the tests

Now run the tests, which is as simple as:

./gradlew test

Take it to production

1
Create a production configuration file

First, create a new configuration file at configuration/prod.properties with the following content. Be sure to fill in the addresses of your production hosts and change any other parameters that make sense for your setup.

application.id=multiple-event-types
bootstrap.servers=<<FILL ME IN>>
schema.registry.url=<<FILL ME IN>>

example.topic.name=<<FILL ME IN>>
example.topic.partitions=<<FILL ME IN>>
example.topic.replication.factor=<<FILL ME IN>>

2
Build a Docker image

In your terminal, execute the following to invoke the Jib plugin to build an image:

gradle jibDockerBuild --image=io.confluent.developer/multiple-event-types:0.0.1

3
Launch the container

Finally, launch the container using your preferred container orchestration service. If you want to run it locally, you can execute the following:

docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/multiple-event-types:0.0.1 config.properties

Deploy on Confluent Cloud

1
Run your app to Confluent Cloud

Instead of running a local Kafka cluster, you may use Confluent Cloud, a fully-managed Apache Kafka service.

  1. Sign up for Confluent Cloud, a fully-managed Apache Kafka service.

  2. After you log in to Confluent Cloud Console, click on Add cloud environment and name the environment learn-kafka. Using a new environment keeps your learning resources separate from your other Confluent Cloud resources.

  3. From the Billing & payment section in the Menu, apply the promo code CC100KTS to receive an additional $100 free usage on Confluent Cloud (details).

  4. Click on LEARN and follow the instructions to launch a Kafka cluster and to enable Schema Registry.

Confluent Cloud

Next, from the Confluent Cloud Console, click on Clients to get the cluster-specific configurations, e.g. Kafka cluster bootstrap servers and credentials, Confluent Cloud Schema Registry and credentials, etc., and set the appropriate parameters in your client application. In the case of this tutorial, add the following properties to the client application’s input properties file, substituting all curly braces with your Confluent Cloud values.

# Required connection configs for Kafka producer, consumer, and admin
bootstrap.servers={{ BROKER_ENDPOINT }}
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='{{ CLUSTER_API_KEY }}' password='{{ CLUSTER_API_SECRET }}';
sasl.mechanism=PLAIN
# Required for correctness in Apache Kafka clients prior to 2.6
client.dns.lookup=use_all_dns_ips

# Best practice for Kafka producer to prevent data loss
acks=all

# Required connection configs for Confluent Cloud Schema Registry
schema.registry.url=https://{{ SR_ENDPOINT }}
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }}

Now you’re all set to run your streaming application locally, backed by a Kafka cluster fully managed by Confluent Cloud.