How to convert a stream's serialization format

Problem:

you have a Kafka topic with the data serialized in a particular format, and you want to change the format to something else.

Edit this page

Example use case:

Consider a topic with events that represent movie releases. The events in the topic are formatted with JSON. In this tutorial, we'll write a program that creates a new topic with the same events, but formatted with Avro.

Code example:

Try it

1
Initialize the project

To get started, make a new directory anywhere you’d like for this project:

mkdir kstreams-serialization && cd kstreams-serialization

2
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN

And launch it by running:

docker-compose up

3
Configure the project

Create the following build file, named build.gradle:

buildscript {
  repositories {
    jcenter()
  }
  dependencies {
    classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.15.1"
  }
}

plugins {
  id "java"
  id "application"
  id "com.google.cloud.tools.jib" version "1.1.1"
  id "com.github.johnrengelman.shadow" version "5.1.0"
}

sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"

repositories {
  jcenter()

  maven {
    url "http://packages.confluent.io/maven"
  }
}

apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: "com.github.johnrengelman.shadow"

dependencies {
  compile "org.apache.avro:avro:1.8.2"
  implementation "org.slf4j:slf4j-simple:1.7.26"
  implementation "org.apache.kafka:kafka-streams:2.2.0"
  implementation "io.confluent:kafka-streams-avro-serde:5.2.0"
  implementation 'com.google.code.gson:gson:2.8.5'

  testCompile "org.apache.kafka:kafka-streams-test-utils:2.2.0"
  testImplementation 'junit:junit:4.12'
}

test {
  testLogging {
    outputs.upToDateWhen { false }
    showStandardStreams = true
    exceptionFormat = "full"
  }
}

jar {
  manifest {
    attributes(
        "Class-Path": configurations.compile.collect { it.getName() }.join(" "),
        "Main-Class": "io.confluent.developer.SerializationTutorial"
    )
  }
}

shadowJar {
  baseName = "kstreams-serialization"
  classifier = "standalone"
}

// Define the main class for the application
mainClassName = 'io.confluent.developer.serialization.SerializationTutorial'

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

Next, create a directory for configuration data:

mkdir configuration

Then create a development file at configuration/dev.properties:

application.id=serialization-app
bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081

input.json.movies.topic.name=json-movies
input.json.movies.topic.partitions=1
input.json.movies.topic.replication.factor=1

output.avro.movies.topic.name=avro-movies
output.avro.movies.topic.partitions=1
output.avro.movies.topic.replication.factor=1

4
Create an Avro schema for the output events

Since we’ll be converting events into Avro, we’ll need to specify a schema for them. In this case, our events represent movies with a few attributes, such as the release year. Go ahead and create a directory for your schemas:

mkdir -p src/main/avro

Next, create an Avro schema file at src/main/avro/movie.avsc for the stream of movies:

{
  "namespace": "io.confluent.developer.avro",
  "type": "record",
  "name": "Movie",
  "fields": [
    {
      "name": "movie_id",
      "type": "long"
    },
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "release_year",
      "type": "int"
    }
  ]
}

Because we will use this Avro schema in our Java code, we’ll need to compile it. The Gradle Avro plugin is a part of the build, so it will see your new Avro files, generate Java code for them, and compile those and all other Java sources. Run this command to get it all done:

./gradlew build

5
Create the Kafka Streams topology

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer/serialization/serde

Then create the following file at src/main/java/io/confluent/developer/serialization/SerializationTutorial.java.

package io.confluent.developer.serialization;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import io.confluent.developer.avro.Movie;
import io.confluent.developer.serialization.serde.MovieJsonSerde;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

import static java.lang.Integer.parseInt;
import static java.lang.Short.parseShort;
import static org.apache.kafka.common.serialization.Serdes.Long;
import static org.apache.kafka.common.serialization.Serdes.String;

public class SerializationTutorial {

  protected Properties buildStreamsProperties(Properties envProps) {
    Properties props = new Properties();

    props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, String().getClass());
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));

    return props;
  }

  private void createTopics(Properties envProps) {
    Map<String, Object> config = new HashMap<>();

    config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
    AdminClient client = AdminClient.create(config);

    List<NewTopic> topics = new ArrayList<>();

    topics.add(new NewTopic(
        envProps.getProperty("input.json.movies.topic.name"),
        parseInt(envProps.getProperty("input.json.movies.topic.partitions")),
        parseShort(envProps.getProperty("input.json.movies.topic.replication.factor"))));

    topics.add(new NewTopic(
        envProps.getProperty("output.avro.movies.topic.name"),
        parseInt(envProps.getProperty("output.avro.movies.topic.partitions")),
        parseShort(envProps.getProperty("output.avro.movies.topic.replication.factor"))));

    client.createTopics(topics);
    client.close();
  }

  private SpecificAvroSerde<Movie> movieAvroSerde(Properties envProps) {
    SpecificAvroSerde<Movie> movieAvroSerde = new SpecificAvroSerde<>();

    final HashMap<String, String> serdeConfig = new HashMap<>();
    serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
                    envProps.getProperty("schema.registry.url"));

    movieAvroSerde.configure(serdeConfig, false);
    return movieAvroSerde;
  }

  protected Topology buildTopology(Properties envProps, final SpecificAvroSerde<Movie> movieSpecificAvroSerde) {
    final String inputJsonTopicName = envProps.getProperty("input.json.movies.topic.name");
    final String outAvroTopicName = envProps.getProperty("output.avro.movies.topic.name");

    final StreamsBuilder builder = new StreamsBuilder();

    // topic contains values in json format
    final KStream<Long, Movie> jsonMovieStream =
        builder.stream(inputJsonTopicName, Consumed.with(Long(), new MovieJsonSerde()));

    // write movie data in avro format
    jsonMovieStream.to(outAvroTopicName, Produced.with(Long(), movieSpecificAvroSerde));

    return builder.build();
  }

  protected Properties loadEnvProperties(String fileName) throws IOException {
    Properties envProps = new Properties();
    FileInputStream input = new FileInputStream(fileName);
    envProps.load(input);
    input.close();
    return envProps;
  }

  private void runTutorial(String configPath) throws IOException {
    Properties envProps = this.loadEnvProperties(configPath);
    Properties streamProps = this.buildStreamsProperties(envProps);
    Topology topology = this.buildTopology(envProps, this.movieAvroSerde(envProps));

    this.createTopics(envProps);

    final KafkaStreams streams = new KafkaStreams(topology, streamProps);
    final CountDownLatch latch = new CountDownLatch(1);

    // Attach shutdown handler to catch Control-C.
    Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
      @Override
      public void run() {
        streams.close();
        latch.countDown();
      }
    });

    try {
      streams.start();
      latch.await();
    } catch (Throwable e) {
      System.exit(1);
    }
    System.exit(0);
  }

  public static void main(String[] args) throws IOException {
    if (args.length < 1) {
      throw new IllegalArgumentException(
          "This program takes one argument: the path to an environment configuration file.");
    }

    new SerializationTutorial().runTutorial(args[0]);
  }
}

Let’s take a close look at the buildTopology() method, which uses the Kafka Streams DSL. This particular topology is pretty simple. The first thing the method does is create an instance of StreamsBuilder, which is the helper object that lets us build our topology. We call the stream() method to create a KStream<Long, Movie> object. Lastly, we call to() to send the events to another topic.

All of the work to work to convert the events between JSON and Avro happens through parameterized serializers. You see, even though we specified default serializers with StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG and StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG in Streams Configuration, the Kafka Streams DSL allows us to use a specific serializer / deserializer each time we interact with a topic.

In this case, Consumed.with() allows us to consume the events with our custom JSON serde (that we’ll be implementing in the next step!), and Produced.with() allows us to produce the events back to a topic with Avro.

6
Implement a MovieJsonSerde class

Although Confluent Platform ships with Avro serializers, it doesn’t provide every format that we might need out of the box. In this case, we need to implement our own JSON serializer/deserializer pair, also known as a serde. This will let us read the JSON-formatted messages off the incoming topic. Create the following at src/main/java/io/confluent/developer/serialization/serde/MovieJsonSerde.java:

package io.confluent.developer.serialization.serde;

import com.google.gson.Gson;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.StandardCharsets;
import java.util.Map;

import io.confluent.developer.avro.Movie;

public class MovieJsonSerde extends Serdes.WrapperSerde<Movie> {

  public MovieJsonSerde() {
    super(new Serializer<Movie>() {
      private Gson gson = new Gson();

      @Override
      public void configure(Map<String, ?> map, boolean b) {
      }

      @Override
      public byte[] serialize(String topic, Movie data) {
        return gson.toJson(data).getBytes(StandardCharsets.UTF_8);
      }

      @Override
      public void close() {
      }
    }, new Deserializer<Movie>() {
      private Gson gson = new Gson();

      @Override
      public void configure(Map<String, ?> configs, boolean isKey) {

      }

      @Override
      public Movie deserialize(String topic, byte[] data) {

        return gson.fromJson(new String(data), Movie.class);
      }

      @Override
      public void close() {

      }
    });
  }
}

7
Compile and run the Kafka Streams program

In your terminal, run:

./gradlew shadowJar

This will produce an uberjar, which is a jar that contains your application code and all its dependencies.

Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.

java -jar build/libs/kstreams-serialization-0.0.1-standalone.jar configuration/dev.properties

8
Get ready to observe the Avro-formatted movies in the output topic

Before you start producing input data, it’s a good idea to set up the consumer on the output topic. This way, as soon as you produce movie in different format (Avro), you’ll see the results right away. Notice that we’re using the kafka-avro-console-consumer tool to do that. Kafka ships with a specialized command line consumer out of the box to read Avro formatted messages. Run this to get ready to consume the records:

docker exec -it schema-registry /usr/bin/kafka-avro-console-consumer --topic avro-movies --bootstrap-server broker:9092 --from-beginning --property value.schema="$(< src/main/avro/movie.avsc)"

You won’t see any results until the next step.

9
Produce some JSON-formatted movies to the input topic

When the console producer starts, it will log some text and hang, waiting for your input. You can copy and paste all of the test data at once to see the results.

Start the console producer with this command in a terminal window of its own:

docker exec -i broker /usr/bin/kafka-console-producer --topic json-movies --broker-list broker:9092

When the producer starts up, copy and paste these JSON lines into the terminal:

{"movie_id":1,"title":"Lethal Weapon","release_year":1992}
{"movie_id":2,"title":"Die Hard","release_year":1988}
{"movie_id":3,"title":"Predator","release_year":1987}

Looking back in the consumer terminal, these are the results you should see if you paste in all the movies above:

{"movie_id":1,"title":"Lethal Weapon","release_year":1992}
{"movie_id":2,"title":"Die Hard","release_year":1988}
{"movie_id":3,"title":"Predator","release_year":1987}

You’ll notice that they look identical to the input that you produced. The contents are in fact the same. But since Avro isn’t a human-readable format, the kafka-avro-console-consumer tool helpfully formatted the contents in something we can read, which happens to be JSON.

Congrats! You’ve converted formats across two topics.

Test it

1
Create a test configuration file

First, create a test file at configuration/test.properties:

application.id=serialization-app
bootstrap.servers=127.0.0.1:9092
schema.registry.url=http://SR_DUMMY_URL:8081

input.json.movies.topic.name=json-movies
input.json.movies.topic.partitions=1
input.json.movies.topic.replication.factor=1

output.avro.movies.topic.name=avro-movies
output.avro.movies.topic.partitions=1
output.avro.movies.topic.replication.factor=1

2
Test the streams topology

Create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer/serialization

Now create the following file at src/test/java/io/confluent/developer/SerializationTutorialTest.java. Testing a Kafka streams application requires a bit of test harness code, but the org.apache.kafka.streams.TopologyTestDriver class makes this easy.

There is only one method in SerializationTutorialTest annotated with @Test, and that is shouldChangeSerializationFormat(). This method actually runs our Streams topology using the TopologyTestDriver and some mocked data that is set up inside the test method.

package io.confluent.developer.serialization;

import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.junit.Test;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;

import io.confluent.developer.avro.Movie;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;

public class SerializationTutorialTest {

  private final static String TEST_CONFIG_FILE = "configuration/test.properties";

  private SpecificAvroSerde<Movie> makeAvroSerDe(Properties envProps) throws IOException, RestClientException {

    // MockSchemaRegistryClient doesn't require connection to Schema Registry which is perfect for unit test
    final MockSchemaRegistryClient client = new MockSchemaRegistryClient();
    String outputTopic = envProps.getProperty("output.avro.movies.topic.name");
    client.register(outputTopic + "-value", Movie.SCHEMA$);
    final SpecificAvroSerde<Movie> movieAvroSerde = new SpecificAvroSerde<>(client);

    final HashMap<String, String> map = new HashMap<>();

    // this should be unnecessary because we use MockSchemaRegistryClient
    // but still required in order to avoid `io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.`
    map.put("schema.registry.url", envProps.getProperty("schema.registry.url"));

    movieAvroSerde.configure(map, false);
    return movieAvroSerde;
  }

  @Test
  public void shouldChangeSerializationFormat() throws IOException, RestClientException {
    SerializationTutorial sr = new SerializationTutorial();
    final Properties envProps = sr.loadEnvProperties(TEST_CONFIG_FILE);
    final Properties streamsProps = sr.buildStreamsProperties(envProps);

    String inputTopic = envProps.getProperty("input.json.movies.topic.name");
    String outputTopic = envProps.getProperty("output.avro.movies.topic.name");

    final SpecificAvroSerde<Movie> avroSerde = this.makeAvroSerDe(envProps);
    Topology topology = sr.buildTopology(envProps, avroSerde);
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamsProps);

    final Serializer<Long> keySerializer = new LongSerializer();
    // Json serializer
    final Serializer<byte[]> valueSerializer = Serdes.ByteArray().serializer();

    final Deserializer<Long> keyDeserializer = new LongDeserializer();
    // Avro deserializer
    final SpecificAvroSerde<Movie> movieSpecificAvroSerde = makeAvroSerDe(envProps);

    final ConsumerRecordFactory<Long, byte[]>
        inputFactory =
        new ConsumerRecordFactory<>(keySerializer, valueSerializer);

    final List<JsonObject> input = prepareInputFixture();
    final List<Movie> expectedMovies = prepareExpectedFixture();

    for (JsonObject json : input) {
      testDriver.pipeInput(inputFactory.create(inputTopic,
                                               json.getAsJsonPrimitive("movie_id").getAsLong(),
                                               json.toString().getBytes(Charset.forName("UTF-8"))));
    }

    for (int i = 0; i < 3; i++) {
      final ProducerRecord<Long, Movie>
          actual =
          testDriver.readOutput(outputTopic, keyDeserializer, movieSpecificAvroSerde.deserializer());
      OutputVerifier.compareKeyValue(actual, new ProducerRecord<>(outputTopic, (long) i + 1, expectedMovies.get(i)));
    }
  }

  /**
   * Prepares expected movies in avro format
   *
   * @return a list of three (3) movie
   */
  private List<Movie> prepareExpectedFixture() {
    final Movie lethalWeaponExpected = new Movie(1L, "Lethal Weapon", 1992);
    final Movie dieHardExpected = new Movie(2L, "Die Hard", 1988);
    final Movie predatorExpected = new Movie(3L, "Predator", 1987);

    return Arrays.asList(lethalWeaponExpected, dieHardExpected, predatorExpected);
  }

  /**
   * Prepares test data in JSON format
   *
   * @return a list of three (3) movies
   */
  private List<JsonObject> prepareInputFixture() {
    final JsonObject lethalWeaponJson = new JsonObject();
    lethalWeaponJson.add("movie_id", new JsonPrimitive(1));
    lethalWeaponJson.add("title", new JsonPrimitive("Lethal Weapon"));
    lethalWeaponJson.add("release_year", new JsonPrimitive(1992));

    final JsonObject dieHardJson = new JsonObject();
    dieHardJson.add("movie_id", new JsonPrimitive(2));
    dieHardJson.add("title", new JsonPrimitive("Die Hard"));
    dieHardJson.add("release_year", new JsonPrimitive(1988));

    final JsonObject predatorJson = new JsonObject();
    predatorJson.add("movie_id", new JsonPrimitive(3));
    predatorJson.add("title", new JsonPrimitive("Predator"));
    predatorJson.add("release_year", new JsonPrimitive(1987));

    return Arrays.asList(lethalWeaponJson, dieHardJson, predatorJson);
  }
}

3
Invoke the tests

Now run the test, which is as simple as:

./gradlew test

Take it to production

1
Create a production configuration file

First, create a new configuration file at configuration/prod.properties with the following content.

application.id=serialization-app
bootstrap.servers=<< FILL ME IN >>
schema.registry.url=<< FILL ME IN >>

input.json.movies.topic.name=json-movies
input.json.movies.topic.partitions=<< FILL ME IN >>
input.json.movies.topic.replication.factor=<< FILL ME IN >>

output.avro.movies.topic.name=avro-movies
output.avro.movies.topic.partitions=<< FILL ME IN >>
output.avro.movies.topic.replication.factor=<< FILL ME IN >>

2
Build a Docker image

In your terminal, execute the following to invoke the Jib plugin to build an image:

./gradlew jibDockerBuild --image=io.confluent.developer/kstreams-serialization:0.0.1

3
Launch the container

Finally, launch the container using your preferred container orchestration service. If you want to run it locally, you can execute the following:

docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/kstreams-serialization:0.0.1 config.properties