you have a Kafka topic with the data serialized in a particular format, and you want to change the format to something else.
To get started, make a new directory anywhere you’d like for this project:
mkdir kstreams-serialization && cd kstreams-serialization
Next, create the following docker-compose.yml
file to obtain Confluent Platform:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka:5.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:5.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN
And launch it by running:
docker-compose up
Create the following build file, named build.gradle
:
buildscript {
repositories {
jcenter()
}
dependencies {
classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:0.15.1"
}
}
plugins {
id "java"
id "application"
id "com.google.cloud.tools.jib" version "1.1.1"
id "com.github.johnrengelman.shadow" version "5.1.0"
}
sourceCompatibility = "1.8"
targetCompatibility = "1.8"
version = "0.0.1"
repositories {
jcenter()
maven {
url "http://packages.confluent.io/maven"
}
}
apply plugin: "com.commercehub.gradle.plugin.avro"
apply plugin: "com.github.johnrengelman.shadow"
dependencies {
compile "org.apache.avro:avro:1.8.2"
implementation "org.slf4j:slf4j-simple:1.7.26"
implementation "org.apache.kafka:kafka-streams:2.2.0"
implementation "io.confluent:kafka-streams-avro-serde:5.2.0"
implementation 'com.google.code.gson:gson:2.8.5'
testCompile "org.apache.kafka:kafka-streams-test-utils:2.2.0"
testImplementation 'junit:junit:4.12'
}
test {
testLogging {
outputs.upToDateWhen { false }
showStandardStreams = true
exceptionFormat = "full"
}
}
jar {
manifest {
attributes(
"Class-Path": configurations.compile.collect { it.getName() }.join(" "),
"Main-Class": "io.confluent.developer.SerializationTutorial"
)
}
}
shadowJar {
baseName = "kstreams-serialization"
classifier = "standalone"
}
// Define the main class for the application
mainClassName = 'io.confluent.developer.serialization.SerializationTutorial'
And be sure to run the following command to obtain the Gradle wrapper:
gradle wrapper
Next, create a directory for configuration data:
mkdir configuration
Then create a development file at configuration/dev.properties
:
application.id=serialization-app
bootstrap.servers=localhost:29092
schema.registry.url=http://localhost:8081
input.json.movies.topic.name=json-movies
input.json.movies.topic.partitions=1
input.json.movies.topic.replication.factor=1
output.avro.movies.topic.name=avro-movies
output.avro.movies.topic.partitions=1
output.avro.movies.topic.replication.factor=1
Since we’ll be converting events into Avro, we’ll need to specify a schema for them. In this case, our events represent movies with a few attributes, such as the release year. Go ahead and create a directory for your schemas:
mkdir -p src/main/avro
Next, create an Avro schema file at src/main/avro/movie.avsc
for the stream of movies:
{
"namespace": "io.confluent.developer.avro",
"type": "record",
"name": "Movie",
"fields": [
{
"name": "movie_id",
"type": "long"
},
{
"name": "title",
"type": "string"
},
{
"name": "release_year",
"type": "int"
}
]
}
Because we will use this Avro schema in our Java code, we’ll need to compile it. The Gradle Avro plugin is a part of the build, so it will see your new Avro files, generate Java code for them, and compile those and all other Java sources. Run this command to get it all done:
./gradlew build
Create a directory for the Java files in this project:
mkdir -p src/main/java/io/confluent/developer/serialization/serde
Then create the following file at src/main/java/io/confluent/developer/serialization/SerializationTutorial.java
.
package io.confluent.developer.serialization;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import io.confluent.developer.avro.Movie;
import io.confluent.developer.serialization.serde.MovieJsonSerde;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import static java.lang.Integer.parseInt;
import static java.lang.Short.parseShort;
import static org.apache.kafka.common.serialization.Serdes.Long;
import static org.apache.kafka.common.serialization.Serdes.String;
public class SerializationTutorial {
protected Properties buildStreamsProperties(Properties envProps) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, envProps.getProperty("application.id"));
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, envProps.getProperty("bootstrap.servers"));
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, String().getClass());
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
return props;
}
private void createTopics(Properties envProps) {
Map<String, Object> config = new HashMap<>();
config.put("bootstrap.servers", envProps.getProperty("bootstrap.servers"));
AdminClient client = AdminClient.create(config);
List<NewTopic> topics = new ArrayList<>();
topics.add(new NewTopic(
envProps.getProperty("input.json.movies.topic.name"),
parseInt(envProps.getProperty("input.json.movies.topic.partitions")),
parseShort(envProps.getProperty("input.json.movies.topic.replication.factor"))));
topics.add(new NewTopic(
envProps.getProperty("output.avro.movies.topic.name"),
parseInt(envProps.getProperty("output.avro.movies.topic.partitions")),
parseShort(envProps.getProperty("output.avro.movies.topic.replication.factor"))));
client.createTopics(topics);
client.close();
}
private SpecificAvroSerde<Movie> movieAvroSerde(Properties envProps) {
SpecificAvroSerde<Movie> movieAvroSerde = new SpecificAvroSerde<>();
final HashMap<String, String> serdeConfig = new HashMap<>();
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
envProps.getProperty("schema.registry.url"));
movieAvroSerde.configure(serdeConfig, false);
return movieAvroSerde;
}
protected Topology buildTopology(Properties envProps, final SpecificAvroSerde<Movie> movieSpecificAvroSerde) {
final String inputJsonTopicName = envProps.getProperty("input.json.movies.topic.name");
final String outAvroTopicName = envProps.getProperty("output.avro.movies.topic.name");
final StreamsBuilder builder = new StreamsBuilder();
// topic contains values in json format
final KStream<Long, Movie> jsonMovieStream =
builder.stream(inputJsonTopicName, Consumed.with(Long(), new MovieJsonSerde()));
// write movie data in avro format
jsonMovieStream.to(outAvroTopicName, Produced.with(Long(), movieSpecificAvroSerde));
return builder.build();
}
protected Properties loadEnvProperties(String fileName) throws IOException {
Properties envProps = new Properties();
FileInputStream input = new FileInputStream(fileName);
envProps.load(input);
input.close();
return envProps;
}
private void runTutorial(String configPath) throws IOException {
Properties envProps = this.loadEnvProperties(configPath);
Properties streamProps = this.buildStreamsProperties(envProps);
Topology topology = this.buildTopology(envProps, this.movieAvroSerde(envProps));
this.createTopics(envProps);
final KafkaStreams streams = new KafkaStreams(topology, streamProps);
final CountDownLatch latch = new CountDownLatch(1);
// Attach shutdown handler to catch Control-C.
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
public static void main(String[] args) throws IOException {
if (args.length < 1) {
throw new IllegalArgumentException(
"This program takes one argument: the path to an environment configuration file.");
}
new SerializationTutorial().runTutorial(args[0]);
}
}
Let’s take a close look at the buildTopology()
method, which uses the Kafka Streams DSL. This particular topology is pretty simple.
The first thing the method does is create an instance of StreamsBuilder
, which is the helper object that lets us build our topology.
We call the stream()
method to create a KStream<Long, Movie>
object. Lastly, we call to()
to send the events to another topic.
All of the work to work to convert the events between JSON and Avro happens through parameterized serializers. You see, even though we specified default serializers with StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG
and StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG
in Streams Configuration, the Kafka Streams DSL allows us to use a specific serializer / deserializer each time we interact with a topic.
In this case, Consumed.with()
allows us to consume the events with our custom JSON serde (that we’ll be implementing in the next step!), and Produced.with()
allows us to produce the events back to a topic with Avro.
Although Confluent Platform ships with Avro serializers, it doesn’t provide every format that we might need out of the box.
In this case, we need to implement our own JSON serializer/deserializer pair, also known as a serde. This will let us read the JSON-formatted messages off the incoming topic.
Create the following at src/main/java/io/confluent/developer/serialization/serde/MovieJsonSerde.java
:
package io.confluent.developer.serialization.serde;
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import io.confluent.developer.avro.Movie;
public class MovieJsonSerde extends Serdes.WrapperSerde<Movie> {
public MovieJsonSerde() {
super(new Serializer<Movie>() {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String topic, Movie data) {
return gson.toJson(data).getBytes(StandardCharsets.UTF_8);
}
@Override
public void close() {
}
}, new Deserializer<Movie>() {
private Gson gson = new Gson();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public Movie deserialize(String topic, byte[] data) {
return gson.fromJson(new String(data), Movie.class);
}
@Override
public void close() {
}
});
}
}
In your terminal, run:
./gradlew shadowJar
This will produce an uberjar, which is a jar that contains your application code and all its dependencies.
Now that you have an uberjar for the Kafka Streams application, you can launch it locally. When you run the following, the prompt won’t return, because the application will run until you exit it. There is always another message to process, so streaming applications don’t exit until you force them.
java -jar build/libs/kstreams-serialization-0.0.1-standalone.jar configuration/dev.properties
Before you start producing input data, it’s a good idea to set up the consumer on the output topic.
This way, as soon as you produce movie in different format (Avro), you’ll see the results right away.
Notice that we’re using the kafka-avro-console-consumer
tool to do that. Kafka ships with a specialized
command line consumer out of the box to read Avro formatted messages. Run this to get ready to consume the records:
docker exec -it schema-registry /usr/bin/kafka-avro-console-consumer --topic avro-movies --bootstrap-server broker:9092 --from-beginning --property value.schema="$(< src/main/avro/movie.avsc)"
You won’t see any results until the next step.
When the console producer starts, it will log some text and hang, waiting for your input. You can copy and paste all of the test data at once to see the results.
Start the console producer with this command in a terminal window of its own:
docker exec -i broker /usr/bin/kafka-console-producer --topic json-movies --broker-list broker:9092
When the producer starts up, copy and paste these JSON lines into the terminal:
{"movie_id":1,"title":"Lethal Weapon","release_year":1992}
{"movie_id":2,"title":"Die Hard","release_year":1988}
{"movie_id":3,"title":"Predator","release_year":1987}
Looking back in the consumer terminal, these are the results you should see if you paste in all the movies above:
{"movie_id":1,"title":"Lethal Weapon","release_year":1992}
{"movie_id":2,"title":"Die Hard","release_year":1988}
{"movie_id":3,"title":"Predator","release_year":1987}
You’ll notice that they look identical to the input that you produced. The contents are in fact the same. But since Avro isn’t a human-readable format, the kafka-avro-console-consumer
tool helpfully formatted the contents in something we can read, which happens to be JSON.
Congrats! You’ve converted formats across two topics.
First, create a test file at configuration/test.properties
:
application.id=serialization-app
bootstrap.servers=127.0.0.1:9092
schema.registry.url=http://SR_DUMMY_URL:8081
input.json.movies.topic.name=json-movies
input.json.movies.topic.partitions=1
input.json.movies.topic.replication.factor=1
output.avro.movies.topic.name=avro-movies
output.avro.movies.topic.partitions=1
output.avro.movies.topic.replication.factor=1
Create a directory for the tests to live in:
mkdir -p src/test/java/io/confluent/developer/serialization
Now create the following file at src/test/java/io/confluent/developer/SerializationTutorialTest.java
.
Testing a Kafka streams application requires a bit of test harness code, but the org.apache.kafka.streams.TopologyTestDriver
class makes this easy.
There is only one method in SerializationTutorialTest
annotated with @Test
, and that is shouldChangeSerializationFormat()
.
This method actually runs our Streams topology using the TopologyTestDriver
and some mocked data that is set up inside the test method.
package io.confluent.developer.serialization;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.streams.test.OutputVerifier;
import org.junit.Test;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import io.confluent.developer.avro.Movie;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
public class SerializationTutorialTest {
private final static String TEST_CONFIG_FILE = "configuration/test.properties";
private SpecificAvroSerde<Movie> makeAvroSerDe(Properties envProps) throws IOException, RestClientException {
// MockSchemaRegistryClient doesn't require connection to Schema Registry which is perfect for unit test
final MockSchemaRegistryClient client = new MockSchemaRegistryClient();
String outputTopic = envProps.getProperty("output.avro.movies.topic.name");
client.register(outputTopic + "-value", Movie.SCHEMA$);
final SpecificAvroSerde<Movie> movieAvroSerde = new SpecificAvroSerde<>(client);
final HashMap<String, String> map = new HashMap<>();
// this should be unnecessary because we use MockSchemaRegistryClient
// but still required in order to avoid `io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.`
map.put("schema.registry.url", envProps.getProperty("schema.registry.url"));
movieAvroSerde.configure(map, false);
return movieAvroSerde;
}
@Test
public void shouldChangeSerializationFormat() throws IOException, RestClientException {
SerializationTutorial sr = new SerializationTutorial();
final Properties envProps = sr.loadEnvProperties(TEST_CONFIG_FILE);
final Properties streamsProps = sr.buildStreamsProperties(envProps);
String inputTopic = envProps.getProperty("input.json.movies.topic.name");
String outputTopic = envProps.getProperty("output.avro.movies.topic.name");
final SpecificAvroSerde<Movie> avroSerde = this.makeAvroSerDe(envProps);
Topology topology = sr.buildTopology(envProps, avroSerde);
TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamsProps);
final Serializer<Long> keySerializer = new LongSerializer();
// Json serializer
final Serializer<byte[]> valueSerializer = Serdes.ByteArray().serializer();
final Deserializer<Long> keyDeserializer = new LongDeserializer();
// Avro deserializer
final SpecificAvroSerde<Movie> movieSpecificAvroSerde = makeAvroSerDe(envProps);
final ConsumerRecordFactory<Long, byte[]>
inputFactory =
new ConsumerRecordFactory<>(keySerializer, valueSerializer);
final List<JsonObject> input = prepareInputFixture();
final List<Movie> expectedMovies = prepareExpectedFixture();
for (JsonObject json : input) {
testDriver.pipeInput(inputFactory.create(inputTopic,
json.getAsJsonPrimitive("movie_id").getAsLong(),
json.toString().getBytes(Charset.forName("UTF-8"))));
}
for (int i = 0; i < 3; i++) {
final ProducerRecord<Long, Movie>
actual =
testDriver.readOutput(outputTopic, keyDeserializer, movieSpecificAvroSerde.deserializer());
OutputVerifier.compareKeyValue(actual, new ProducerRecord<>(outputTopic, (long) i + 1, expectedMovies.get(i)));
}
}
/**
* Prepares expected movies in avro format
*
* @return a list of three (3) movie
*/
private List<Movie> prepareExpectedFixture() {
final Movie lethalWeaponExpected = new Movie(1L, "Lethal Weapon", 1992);
final Movie dieHardExpected = new Movie(2L, "Die Hard", 1988);
final Movie predatorExpected = new Movie(3L, "Predator", 1987);
return Arrays.asList(lethalWeaponExpected, dieHardExpected, predatorExpected);
}
/**
* Prepares test data in JSON format
*
* @return a list of three (3) movies
*/
private List<JsonObject> prepareInputFixture() {
final JsonObject lethalWeaponJson = new JsonObject();
lethalWeaponJson.add("movie_id", new JsonPrimitive(1));
lethalWeaponJson.add("title", new JsonPrimitive("Lethal Weapon"));
lethalWeaponJson.add("release_year", new JsonPrimitive(1992));
final JsonObject dieHardJson = new JsonObject();
dieHardJson.add("movie_id", new JsonPrimitive(2));
dieHardJson.add("title", new JsonPrimitive("Die Hard"));
dieHardJson.add("release_year", new JsonPrimitive(1988));
final JsonObject predatorJson = new JsonObject();
predatorJson.add("movie_id", new JsonPrimitive(3));
predatorJson.add("title", new JsonPrimitive("Predator"));
predatorJson.add("release_year", new JsonPrimitive(1987));
return Arrays.asList(lethalWeaponJson, dieHardJson, predatorJson);
}
}
Now run the test, which is as simple as:
./gradlew test
First, create a new configuration file at configuration/prod.properties
with the following content.
application.id=serialization-app
bootstrap.servers=<< FILL ME IN >>
schema.registry.url=<< FILL ME IN >>
input.json.movies.topic.name=json-movies
input.json.movies.topic.partitions=<< FILL ME IN >>
input.json.movies.topic.replication.factor=<< FILL ME IN >>
output.avro.movies.topic.name=avro-movies
output.avro.movies.topic.partitions=<< FILL ME IN >>
output.avro.movies.topic.replication.factor=<< FILL ME IN >>
In your terminal, execute the following to invoke the Jib
plugin to build an image:
./gradlew jibDockerBuild --image=io.confluent.developer/kstreams-serialization:0.0.1
Finally, launch the container using your preferred container orchestration service. If you want to run it locally, you can execute the following:
docker run -v $PWD/configuration/prod.properties:/config.properties io.confluent.developer/kstreams-serialization:0.0.1 config.properties