Community contribution ✨

Produce and Consume Records in non-Java languages

Question:

How can you produce and consume events from Kafka using various programming languages, other than Java?

Edit this page

Example use case:

In this tutorial, you will enrich and expose a list of books from a library. You will produce an event for each book acquisition (including its title, editor, release date, and so on), and then consume back the same events in order to serve the book collection over HTTP.

Hands-on code example:

Run it

Prerequisites

1

This tutorial installs Confluent Platform using Docker. Before proceeding:

  • • Install Docker Desktop (version 4.0.0 or later) or Docker Engine (version 19.03.0 or later) if you don’t already have it

  • • Install the Docker Compose plugin if you don’t already have it. This isn’t necessary if you have Docker Desktop since it includes Docker Compose.

  • • Start Docker if it’s not already running, either by starting Docker Desktop or, if you manage Docker Engine with systemd, via systemctl

  • • Verify that Docker is set up properly by ensuring no errors are output when you run docker info and docker compose version on the command line

Get Confluent Platform

2

To get started, make a new directory anywhere you’d like for this project:

mkdir -p produce-consume-scala/ && cd produce-consume-scala

Next, create the following docker-compose.yml file to obtain Confluent Platform (for Kafka in the cloud, see Confluent Cloud):

version: '3.7'
services:
  broker:
    image: confluentinc/cp-kafka:7.4.1
    hostname: broker
    container_name: broker
    ports:
    - 29092:29092
    networks:
    - tutorial
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_NODE_ID: 1
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093
      KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_DIRS: /tmp/kraft-combined-logs
      CLUSTER_ID: MkU3OEVBNTcwNTJENDM2Qk
  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
    - broker
    ports:
    - 8081:8081
    networks:
    - tutorial
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: broker:9092
      SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: WARN
networks:
  tutorial:
    name: tutorial

And launch it by running:

docker compose up -d

Initialize the project

3

Create the following Gradle build file, named build.gradle for the project:

plugins {

    id "com.github.johnrengelman.shadow" version "6.1.0"
}

allprojects {
    version = "0.1.0-SNAPSHOT"
    group = "io.confluent.developer"
    apply plugin: 'java'
    apply plugin: 'scala'

    repositories {
        mavenCentral()

        maven {
            url "https://packages.confluent.io/maven"
        }
    }

    sourceCompatibility = JavaVersion.VERSION_17
    targetCompatibility = JavaVersion.VERSION_17

    tasks.withType(ScalaCompile) {
        scalaCompileOptions.additionalParameters = [
                '-deprecation',
                '-encoding', 'utf-8',
                '-language:postfixOps'
        ]
    }

    dependencies {
        implementation "org.scala-lang:scala-library:2.13.10"

        implementation "org.apache.kafka:kafka-clients:3.1.0"
        implementation "io.confluent:kafka-streams-avro-serde:7.3.0"
        implementation "com.sksamuel.avro4s:avro4s-core_2.13:4.1.1"
        testImplementation "org.scalatest:scalatest_2.13:3.3.0-SNAP3"
        testImplementation "org.testcontainers:kafka:1.18.0"

        // presentation boiler plate
        implementation "com.lihaoyi:cask_2.13:0.8.0"
        implementation "ch.qos.logback:logback-classic:1.2.11"
        implementation "com.github.pureconfig:pureconfig_2.13:0.17.2"
        implementation "com.nrinaudo:kantan.csv-generic_2.13:0.7.0"
        implementation "com.nrinaudo:kantan.csv-java8_2.13:0.7.0"
        implementation "com.nrinaudo:kantan.csv-enumeratum_2.13:0.7.0"
    }

    test {
        testLogging {
            events "passed", "skipped", "failed"
            exceptionFormat "full"
        }
    }
}

configure(subprojects) {
    apply plugin: 'application'
    apply plugin: 'com.github.johnrengelman.shadow'

    dependencies {
        implementation parent
        testImplementation parent.sourceSets.test.output
    }

    task spec(dependsOn: ['compileTestScala'], type: JavaExec) {
        group = 'verification'
        description = 'Runs the scala tests.'
        mainClass = 'org.scalatest.tools.Runner'
        args = ['-R', 'build/classes/scala/test', '-o']
        classpath = sourceSets.test.runtimeClasspath
    }

    test.dependsOn spec
}

configure(project('produce-consume-scala-producer-app')) {
    shadowJar.archiveBaseName = "app-producer"
    mainClassName = 'io.confluent.developer.produce.Producer'
}

configure(project('produce-consume-scala-consumer-app')) {
    shadowJar.archiveBaseName = "app-consumer"
    mainClassName = 'io.confluent.developer.consume.Consumer'
}

task topicCreation(type: JavaExec) {
    mainClass = 'io.confluent.developer.helper.TopicCreation'
    classpath = sourceSets.main.runtimeClasspath
}

task schemaPublication(type: JavaExec) {
    mainClass = 'io.confluent.developer.helper.SchemaPublication'
    classpath = sourceSets.main.runtimeClasspath
}

Create the following Gradle setting file, named settings.gradle for the project:


rootProject.name = 'produce-consume-scala'

include 'app-producer'
include 'app-consumer'

project(':app-producer').name = 'produce-consume-scala-producer-app'
project(':app-consumer').name = 'produce-consume-scala-consumer-app'

Run the following command to obtain the Gradle build:

gradle wrapper

Create a directory for the project resources:

mkdir -p src/main/resources

Add in your project resources the file src/main/resources/application.conf:

bootstrap.servers = "localhost:29092"
bootstrap.servers = ${?BOOTSTRAP_SERVERS}

schema.registry.url = "http://localhost:8081"
schema.registry.url = ${?SCHEMA_REGISTRY_URL}

producer {
  client-config {
    acks = all
    client.id = scala-tutorial
    bootstrap.servers = ${bootstrap.servers}
    schema.registry.url = ${schema.registry.url}
    max.in.flight.requests.per.connection = 1
    # See https://kafka.apache.org/documentation/#producerconfigs for more producer configs
  }

  topics = ${tutorial-topics}
}

consumer {
  client-config {
    group.id = scala-tutorial
    group.id = ${?CONSUMER_GRP}
    auto.offset.reset = earliest
    bootstrap.servers = ${bootstrap.servers}
    schema.registry.url = ${schema.registry.url}
    # See https://kafka.apache.org/documentation/#theconsumer for more consumer configs
  }

  topics = ${tutorial-topics}

  host = "0.0.0.0"
  host = ${?HTTP_HOST}
  port = 8080
  port = ${?HTTP_PORT}
}

tutorial-topics {
  book-topic = {
    name = "BOOKS"
    name = ${?BOOK_TOPIC}
    partitions = 1
    partitions = ${?BOOK_TOPIC_PARTITIONS}
    replication-factor = 1
    replication-factor = ${?BOOK_TOPIC_REPLICATION}
  }
}

helper {

  client-config {
    bootstrap.servers = ${bootstrap.servers}
    schema.registry.url = ${schema.registry.url}
  }

  topic-creation-timeout = 30 seconds
  schema-registry-retries-num = 5
  schema-registry-retries-interval = 5 seconds

  topics = ${tutorial-topics}
}

Add the logging configuration to your project resources in the file src/main/resources/logback.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>
                %yellow(%d{yyyy-MM-dd HH:mm:ss}) %cyan(${HOSTNAME}) %highlight([%p]) %green((%file:%line\)) - %msg%n
            </pattern>
        </encoder>
    </appender>

    <logger name="org.apache.kafka.clients" level="${TUTORIAL_LOG_LVL:-INFO}" additivity="false">
        <appender-ref ref="STDOUT" />
    </logger>

    <root level="INFO">
        <appender-ref ref="STDOUT" />
    </root>

</configuration>

Create a directory for the project sources:

mkdir -p src/main/scala/io/confluent/developer/

Add the classes corresponding to the HOCON configuration src/main/scala/io/confluent/developer/Configuration.scala:

package io.confluent.developer

import java.util.Properties

import com.typesafe.config.Config
import io.confluent.developer.Configuration.TopicConf.TopicSpec

import scala.concurrent.duration.Duration
import scala.jdk.CollectionConverters._

object Configuration {

  case class ProducerConf(clientConfig: Config, topics: TopicConf)

  case class ConsumerConf(clientConfig: Config, topics: TopicConf, host: String, port: Int)

  case class HelperConf(clientConfig: Config,
                        topics: TopicConf,
                        topicCreationTimeout: Duration,
                        schemaRegistryRetriesNum: Int,
                        schemaRegistryRetriesInterval: Duration)

  case class TopicConf(bookTopic: TopicSpec) {
    def all: Vector[TopicSpec] = Vector(bookTopic)
  }

  object TopicConf {
    case class TopicSpec(name: String, partitions: Int, replicationFactor: Short)
  }
}

trait Configuration {
  implicit class configMapperOps(config: Config) {

    def toMap: Map[String, AnyRef] = config
      .entrySet()
      .asScala
      .map(pair => (pair.getKey, config.getAnyRef(pair.getKey)))
      .toMap

    def toProperties: Properties = {
      val properties = new Properties()
      properties.putAll(config.toMap.asJava)
      properties
    }
  }
}

In addition, the producer application and the consumer application will both have their own submodule.

Create the following directories:

mkdir -p app-producer/src/main/scala/io/confluent/developer/produce/
mkdir -p app-consumer/src/main/scala/io/confluent/developer/consume/

Create a schema for the events

4

Create the schema package:

mkdir -p src/main/scala/io/confluent/developer/schema/

We will use case classes to represent Book records in our Scala code. Add the following case class in the schema package.

src/main/scala/io/confluent/developer/schema/Book.scala:

package io.confluent.developer.schema

case class Book(author: String, title: String, `type`: BookType, pages: Int, releaseDate: java.time.LocalDate)

And the following Enum type to represent the book type.

src/main/scala/io/confluent/developer/schema/BookType.scala:

package io.confluent.developer.schema

import enumeratum._

sealed trait BookType extends EnumEntry

object BookType extends Enum[BookType] {

  case object Tech extends BookType
  case object Comic extends BookType
  case object Novel extends BookType
  case object Romance extends BookType
  case object Other extends BookType

  override def values: IndexedSeq[BookType] = Vector(Tech, Comic, Novel, Romance, Other)
}

Set up the serialization method

5

In this tutorial, events will be will be serialized in Avro format. To do so, we will need the appropriate serializers/deserializers. We add the following function to easily create a Serializer[T] where T in the type of the key or the value we’d like to serialise. Add the class: src/main/scala/io/confluent/developer/schema/ScalaReflectionSerde.scala

package io.confluent.developer.schema

import com.sksamuel.avro4s.RecordFormat
import io.confluent.kafka.streams.serdes.avro.{GenericAvroDeserializer, GenericAvroSerializer}
import org.apache.kafka.common.serialization.{Deserializer, Serializer}

trait ScalaReflectionSerde {

  implicit lazy val bookFormat: RecordFormat[Book] = RecordFormat[Book]
}

object ScalaReflectionSerde {

  def reflectionSerializer4S[T: RecordFormat]: Serializer[T] = new Serializer[T] {
    val inner = new GenericAvroSerializer()

    override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = inner.configure(configs, isKey)

    override def serialize(topic: String, maybeData: T): Array[Byte] = Option(maybeData)
      .map(data => inner.serialize(topic, implicitly[RecordFormat[T]].to(data)))
      .getOrElse(Array.emptyByteArray)

    override def close(): Unit = inner.close()
  }

  def reflectionDeserializer4S[T: RecordFormat]: Deserializer[T] = new Deserializer[T] {
    val inner = new GenericAvroDeserializer()

    override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = inner.configure(configs, isKey)

    override def deserialize(topic: String, maybeData: Array[Byte]): T = Option(maybeData)
      .filter(_.nonEmpty)
      .map(data => implicitly[RecordFormat[T]].from(inner.deserialize(topic, data)))
      .getOrElse(null.asInstanceOf[T])

    override def close(): Unit = inner.close()
  }
}

In this tutorial we are going to use a reflection based serialization method using Avro4s, which is a serializing/deserializing library for Avro written in Scala. Avro4s performs it’s de/serialization in multiple stages: - For Serialization: Case Class Instance → Generic Record → Avro Formatted Bytes - For Deserialization: Avro Formatted bytes → Generic Record → Case Class Instance

Let’s add some helper functions to the code making our application more interactive.

Add src/main/scala/io/confluent/developer/schema/package.scala:

package io.confluent.developer

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import com.sksamuel.avro4s.AvroSchema
import kantan.csv.RowDecoder
import kantan.csv.enumeratum._
import kantan.csv.java8._
import org.apache.avro.Schema
import upickle.default
import upickle.default.{ReadWriter => JsonDecoder, macroRW => JsonMacro, _}

package object schema {

  implicit lazy val BookTypeAvroSchema: Schema = AvroSchema[BookType]

  implicit lazy val BookTypeJsonDecoder: JsonDecoder[BookType] = JsonMacro

  implicit lazy val BookAvroSchema: Schema = AvroSchema[Book]

  implicit lazy val BookJsonDecoder: JsonDecoder[Book] = JsonMacro

  implicit lazy val CsvDecoder: RowDecoder[Book] = RowDecoder.ordered(
    (author: String, title: String, `type`: BookType, pages: Int, releaseDate: java.time.LocalDate) =>
      Book(noLineBreak(author), noLineBreak(title), `type`, pages, releaseDate)
  )

  implicit val LocalDateJson: default.ReadWriter[LocalDate] = readwriter[ujson.Value].bimap[LocalDate](
    date => date.format(DateTimeFormatter.BASIC_ISO_DATE),
    json => LocalDate.parse(json.str)
  )

  def noLineBreak(line: String): String = line.replace(util.Properties.lineSeparator, " ")
}

Add the Gradle helper tasks

6

Topic creation and avro schema declaration are often part of an external process. For the sake of clarity in this tutorial, we won’t include these steps as part of the main application, but isolate them in a dedicated package.

Create a directory for the package helper:

mkdir -p src/main/scala/io/confluent/developer/helper

Add the following class at src/main/scala/io/confluent/developer/helper/TopicCreation.scala

package io.confluent.developer.helper

import java.util.concurrent.ExecutionException

import io.confluent.developer.Configuration
import io.confluent.developer.Configuration.HelperConf
import org.apache.kafka.clients.admin.{Admin, CreateTopicsResult, NewTopic}
import org.apache.kafka.common.errors.TopicExistsException
import org.slf4j.{Logger, LoggerFactory}
import pureconfig.ConfigSource
import pureconfig.generic.auto._

import scala.concurrent.duration.TimeUnit
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

object TopicCreation extends App with Configuration {

  private val logger: Logger = LoggerFactory.getLogger(getClass)

  val helperConf = ConfigSource.default.at("helper").loadOrThrow[HelperConf]

  val client = Admin.create(helperConf.clientConfig.toMap.asJava)

  val newTopics = helperConf
    .topics
    .all
    .map { topic =>
      new NewTopic(topic.name, topic.partitions, topic.replicationFactor)
    }

  logger.info(s"Starting the topics creation for: ${helperConf.topics.all.map(_.name).mkString(", ")}")

  val allKFutures: CreateTopicsResult = client.createTopics(newTopics.asJava)

  allKFutures.values().asScala.foreach { case (topicName, kFuture) =>

    kFuture.whenComplete {

      case (_, throwable: Throwable) if Option(throwable).isDefined =>
        logger.warn("Topic creation didn't complete:", throwable)

      case _ =>
        newTopics.find(_.name() == topicName).map { topic =>
          logger.info(
            s"""|Topic ${topic.name}
                | has been successfully created with ${topic.numPartitions} partitions
                | and replicated ${topic.replicationFactor() - 1} times""".stripMargin.replaceAll("\n", "")
          )
        }
    }
  }

  val (timeOut, timeUnit): (Long, TimeUnit) = helperConf.topicCreationTimeout

  Try(allKFutures.all().get(timeOut, timeUnit)) match {

    case Failure(ex) if ex.getCause.isInstanceOf[TopicExistsException] =>
      logger info "Topic creation stage completed. (Topics already created)"

    case failure@Failure(_: InterruptedException | _: ExecutionException) =>
      logger error "The topic creation failed to complete"
      failure.exception.printStackTrace()
      sys.exit(2)

    case Failure(exception) =>
      logger error "The following exception occurred during the topic creation"
      exception.printStackTrace()
      sys.exit(3)

    case Success(_) =>
      logger info "Topic creation stage completed."
  }
}

Add the following class in the file src/main/scala/io/confluent/developer/helper/SchemaPublication.scala

package io.confluent.developer.helper

import java.io.IOException

import io.confluent.developer.{Configuration, schema}
import io.confluent.developer.Configuration.HelperConf
import io.confluent.developer.schema.{Book, BookAvroSchema}
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.slf4j.{Logger, LoggerFactory}
import pureconfig.ConfigSource
import pureconfig.generic.auto._

import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

object SchemaPublication extends App with Configuration {

  private val logger: Logger = LoggerFactory.getLogger(getClass)

  @tailrec
  def retryCallSchemaRegistry(logger: Logger)(countdown: Int, interval: Duration, f: => Unit): Try[Unit] = {
    Try(f) match {
      case result@Success(_) =>
        logger info "Successfully call the Schema Registry."
        result
      case result@Failure(_) if countdown <= 0 =>
        logger error "Fail to call the Schema Registry for the last time."
        result
      case Failure(_) if countdown > 0 =>
        logger error s"Fail to call the Schema Registry, retry in ${interval.toSeconds} secs."
        Thread.sleep(interval.toMillis)
        retryCallSchemaRegistry(logger)(countdown - 1, interval, f)
    }
  }

  val helperConf = ConfigSource.default.at("helper").loadOrThrow[HelperConf]

  val schemaRegistryClient = new CachedSchemaRegistryClient(
    helperConf.clientConfig.getString(SCHEMA_REGISTRY_URL_CONFIG),
    200
  )

  retryCallSchemaRegistry(logger)(
    helperConf.schemaRegistryRetriesNum,
    helperConf.schemaRegistryRetriesInterval, {
      schemaRegistryClient.register(s"${helperConf.topics.bookTopic.name}-value", new AvroSchema(BookAvroSchema))
    }
  ) match {
    case failure@Failure(_: IOException | _: RestClientException) =>
      failure.exception.printStackTrace()
    case _ =>
      logger.info(s"Schemas publication at: ${helperConf.clientConfig.getString(SCHEMA_REGISTRY_URL_CONFIG)}")
  }
}

Run the following command to create the topic and then the schemas:

./gradlew -q --console=plain topicCreation schemaPublication

These gradle tasks are defined in the build.gradle file and are declared as TaskKey[Unit] with the corresponding main class.

Write the producer

7

The following Scala class will be the entry point of the producer application. Add the following class in the file

app-producer/src/main/scala/io/confluent/developer/produce/Producer.scala

package io.confluent.developer.produce

import java.util.concurrent.Future

import io.confluent.developer.Configuration
import io.confluent.developer.Configuration.ProducerConf
import io.confluent.developer.schema.ScalaReflectionSerde.reflectionSerializer4S
import io.confluent.developer.schema.{Book, ScalaReflectionSerde}
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.{Serdes, Serializer}
import org.apache.kafka.common.utils.Bytes
import org.slf4j.{Logger, LoggerFactory}
import pureconfig.ConfigSource
import pureconfig.generic.auto._
import kantan.csv._
import kantan.csv.ops._

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._

object Producer extends App with ScalaReflectionSerde with Configuration {

  private def logger: Logger = LoggerFactory.getLogger(getClass)

  private val producerConf = ConfigSource.default.at("producer").loadOrThrow[ProducerConf]

  private val schemaRegistryConfigMap: Map[String, AnyRef] = Map[String, AnyRef](
    SCHEMA_REGISTRY_URL_CONFIG -> producerConf.clientConfig.getString(SCHEMA_REGISTRY_URL_CONFIG)
  )

  def produce(producer: KafkaProducer[Bytes, Book], topic: String, book: Book): Future[RecordMetadata] = {
    val record: ProducerRecord[Bytes, Book] = new ProducerRecord(topic, book)

    producer.send(record, new Callback {
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = Option(exception)
        .map(ex => logger error s"fail to produce record due to: ${ex.getMessage}")
        .getOrElse(logger info s"successfully produced - ${printMetaData(metadata)}")
    })
  }

  def parseCsvLine(line: String) = line.readCsv[Vector, Book](rfc.withoutHeader.withCellSeparator(',').withQuote('\''))

  def printMetaData(metadata: RecordMetadata) =
    s"""topic: ${metadata.topic()},
       | partition: ${metadata.partition()},
       | offset: ${metadata.offset()}
       | (ts: ${metadata.timestamp()})""".stripMargin.replace("\n", "")

  logger debug "creating the serializer and configuration"
  private val bookSerializer: Serializer[Book] = reflectionSerializer4S[Book]
  bookSerializer.configure(schemaRegistryConfigMap.asJava, false)

  logger debug "creating the kafka producer"
  private val producer = new KafkaProducer[Bytes, Book](
    producerConf.clientConfig.toProperties,
    Serdes.Bytes().serializer(),
    bookSerializer
  )

  if (args.isEmpty) {
    var input = ""
    while (input != "exit") {
      Thread.sleep((2 second) toMillis)
      System.out.print("produce-a-book> ")
      input = scala.io.StdIn.readLine()
      if (input != "exit") {
        val books: Vector[ReadResult[Book]] = parseCsvLine(input)
        books.foreach { maybeBook =>
          maybeBook
            .map(book => produce(producer, producerConf.topics.bookTopic.name, book))
            .left.foreach(error => logger warn error.getMessage)
        }
      }
    }
  } else {
    parseCsvLine(args.mkString("\n")).foreach { maybeBook =>
      maybeBook
        .map(book => produce(producer, producerConf.topics.bookTopic.name, book))
        .left.foreach(error => logger warn error.getMessage)
    }
  }

  logger info "closing the book producer application"
  producer.flush()
  producer.close()
}

Let’s describe the key sections of the producer code.

The producer application loads its configuration from the producer block of the application.conf

A Producer#produce function covers most of the record production.

def produce(producer: KafkaProducer[Bytes, Book], topic: String, book: Book): Future[RecordMetadata] = { (1)
    val record: ProducerRecord[Bytes, Book] = new ProducerRecord(topic, book) (2)

    producer.send(record, new Callback { (3)
      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = Option(exception) (4)
        .map(ex => logger error s"fail to produce record due to: ${ex.getMessage}")
        .getOrElse(logger info s"successfully produced - ${printMetaData(metadata)}")
    })
}
1 Producer#produce takes a KafkaProducer, a topic name, and an instance of book to send into a Kafka topic.
2 Producer#produce wraps our book with a ProducerRecord[K, V]. That’s where we attach the topic name to the book.
3 The KafkaProducer#send method is called on the producer instance, returning a Java future which will contain the broker response.
4 Producer#produce uses a CallBack to respond to success or failure for the record production.

In order to utilize the Producer#produce function, we construct an instance of our KafkaProducer to pass to it.

  • The KafkaProducer has types parameter corresponding to its key and value types. Its constructor takes serializers / deserializers with the same types.

  • As described above, we utilize the reflectionSerializer4S[T] for our Serializer[Book] which is configured to connect to the Schema Registry.

  • Our Kafka records do not have keys and will always be null, so we choose an arbitrary type such Bytes from the kafka.common.utils package.

  • We instantiate the KafkaProducer by passing a Java Properties from the producer.client-config block of application.conf

That’s it! we are ready to produce records.

At the end of the program we use KafkaProducer#flush to check if the latest messages have been written and KafkaProducer#close stop our connection.

Run the producer

8

To make producing records easy, the producer application can accept book data in the form of a CSV line.

Build the producer app:

./gradlew shadowJar

We can create a book by starting the Produce app:

java -jar app-producer/build/libs/app-producer-0.1.0-SNAPSHOT.jar

And then typing the book data at the prompt (type "exit" to stop the program)

Loic D.,How to sharpen a knife,Other,10,2020-06-09

or, by directly passing the book as argument:

java -jar app-producer/build/libs/app-producer-0.1.0-SNAPSHOT.jar "'Loic D.','How to sharpen a knife (safely this time)',Other,10,2020-06-09"

Add the following file as an example dataset: data.csv

'Franz Kafka','Der Prozess',Novel,239,1925-09-01
'Franz Kafka','Die Verwandlung',Novel,144,1915-01-01
'Franz Kafka','Der Bau',Novel,37,1923-01-01
'Paul Chiusano','Functional Programming in Scala',Tech,320,2014-09-01
'Stendhal','Le Rouge et le Noir',Novel,640,1830-09-01
'Émile Zola','Au Bonheur des Dames',Novel,542,1883-11-01
'Loic D.','Not the worst ramen recipe',Other,3,2020-06-01
'Neha Narkhede','Kafka: The Definitive Guide',Tech,322,2017-07-07

The CSV file contains the following lines without headers

Author Title Type Pages Release Date

Franz Kafka

Der Prozess

Novel

239

1925-09-01

Franz Kafka

Die Verwandlung

Novel

144

1915-01-01

Paul Chiusano

Functional Programming in Scala

Tech

320

2014-09-01

Stendhal

Le Rouge et le Noir

Novel

640

1830-09-01

Émile Zola

Au Bonheur des Dames

Novel

542

1883-11-01

Loic D.

Not the worst ramen recipe

Other

3

2020-06-01

Neha Narkhede

Kafka: The Definitive Guide

Tech

322

2017-07-07

Feel free to add your favorite book in the list, then run the following command to produce the prepared dataset:

java -jar app-producer/build/libs/app-producer-0.1.0-SNAPSHOT.jar "$(cat data.csv)"

Now let’s see if the records have been correctly written in Kafka and if the consumer App can poll theme.

Write the consumer

9

The following Scala class defines our Consumer application. Add the following class to the file

app-consumer/src/main/scala/io/confluent/developer/consume/Consumer.scala

package io.confluent.developer.consume

import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}

import io.confluent.developer.Configuration
import io.confluent.developer.Configuration.ConsumerConf
import io.confluent.developer.schema.ScalaReflectionSerde.reflectionDeserializer4S
import io.confluent.developer.schema.{Book, ScalaReflectionSerde}
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.common.serialization.{Deserializer, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.slf4j.{Logger, LoggerFactory}
import pureconfig.ConfigSource
import pureconfig.generic.auto._
import ujson.Obj

import scala.collection.mutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.util.Try

object Consumer extends cask.MainRoutes with ScalaReflectionSerde with Configuration {

  val bookMap: mutable.Map[String, Book] = mutable.Map[String, Book]()

  private val consumerConf = ConfigSource.default.at("consumer").loadOrThrow[ConsumerConf]

  private val schemaRegistryConfigMap: Map[String, AnyRef] = Map[String, AnyRef](
    SCHEMA_REGISTRY_URL_CONFIG -> consumerConf.clientConfig.getString(SCHEMA_REGISTRY_URL_CONFIG)
  )

  override def port: Int = consumerConf.port

  override def host: String = consumerConf.host

  private def logger: Logger = LoggerFactory.getLogger(getClass)

  @cask.get("/count")
  def getCount(): Obj = {
    ujson.Obj("count" -> bookMap.size)
  }

  @cask.get("/books")
  def getBooks(): Obj = {
    ujson.Obj("results" -> ujson.Arr(
      bookMap.toIndexedSeq.map { case (_: String, book: Book) =>
        upickle.default.writeJs(book)
      }: _*
    ))
  }

  def consume(consumer: KafkaConsumer[Bytes, Book]): Vector[Book] = {

    val books: ConsumerRecords[Bytes, Book] = consumer.poll((1 second) toJava)

    books.asScala.toVector.map(_.value())
  }

  val scheduler: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
  scheduler.schedule(() => {
    logger debug "creating the deserializers and configuring"
    val bookDeserializer: Deserializer[Book] = reflectionDeserializer4S[Book]
    bookDeserializer.configure(schemaRegistryConfigMap.asJava, false)

    logger debug "creating the kafka consumer"
    val consumer = new KafkaConsumer[Bytes, Book](
      consumerConf.clientConfig.toProperties,
      Serdes.Bytes().deserializer(),
      bookDeserializer
    )

    consumer.subscribe(Vector(consumerConf.topics.bookTopic.name).asJava)

    while (!scheduler.isShutdown) {
      Thread.sleep((2 second) toMillis)

      logger debug s"polling the new events"
      val books: Vector[Book] = consume(consumer)

      if (books.nonEmpty) logger info s"just polled ${books.size} books from kafka"
      books.foreach { book =>
        bookMap += book.title -> book
      }
    }

    logger info "Closing the kafka consumer"
    Try(consumer.close()).recover {
      case error => logger.error("Failed to close the kafka consumer", error)
    }

  }, 0, TimeUnit.SECONDS)

  sys.addShutdownHook {
    scheduler.shutdown()
    scheduler.awaitTermination(10, TimeUnit.SECONDS)
  }

  logger info s"starting the HTTP server"
  initialize()
}

Let’s describe the key pieces of this program.

The Consumer application loads its configuration from the consumer bloc of the application.conf

A Consumer#consume function covers most of the record consumption.

def consume(consumer: KafkaConsumer[Bytes, Book]): Vector[Book] = { (1)

  val books: ConsumerRecords[Bytes, Book] = consumer.poll((1 second) toJava) (2)

  books.asScala.toVector.map(_.value()) (3)
}
1 Consumer#consume takes a KafkaConsumer and gives back a collection of Book.
2 We call the KafkaConsumer#poll function on the KafkaConsumer which returns a ConsumerRecords containing Book instances.
3 For each returned value in the ConsumerRecords instance, the record value is extracted and passed back in a Vector.

Our Consumer application functions as an HTTP service defining two routes, /count and /books. These routes are mapped to the functions getCount and getBooks in our code.

Similar to the Producer code, we utilize our Consumer#consume function by constructing an instance of the KafkaConsumer.

  • The KafkaConsumer is constructed with the reflectionDeserializer4S[T] which is configured to connect to the Schema Registry.

  • The KafkaConsumer reads it’s configuration from the consumer.client-config block of application.conf.

  • Finally, we subscribe to the BOOK topic by calling KafkaConsumer#subscribe.

That’s it! we are ready to poll records in a loop, updating our map of books. The map of books is used to respond to queries on the HTTP routes.

We’ve added a shut down hook to close the consumer by calling KafkaConsumer#close.
The KafkaConsumer is created and used in a new thread. It’s is really important to use it in a *single* Thread. In our case it’s not the main thread just because we also have a HTTP server.

Run the consumer

10

In another terminal start the consumer application

./gradlew -q --console=plain shadowJar
java -jar app-consumer/build/libs/app-consumer-0.1.0-SNAPSHOT.jar

In a third terminal call your new service to get all the books.

curl http://localhost:8080/books | jq --color-output '{results: [.results[].title]}'
{
    "result": [
        "How to sharpen a knife",
        "How to sharpen a knife (safely this time)",
        "etc ..."
    ]
}

Let’s now play with these two programs and the Confluent Platform:

Test it

Create the test configuration files

1

Before we write any tests we will create the test resources used to configure the tests:

mkdir -p src/test/resources

Add the test config file with two test topics (TEST-BOOKS1 and TEST-BOOKS2) at src/test/resources/application.conf:

producer {
  client-config {}
  topics = {
    book-topic = {
      name = "TEST-BOOKS1"
      name = ${?BOOK_TOPIC}
      partitions = 1
      partitions = ${?BOOK_TOPIC_PARTITIONS}
      replication-factor = 1
      replication-factor = ${?BOOK_TOPIC_REPLICATION}
    }
  }
}

consumer {
  client-config {}
  topics = {
    book-topic = {
      name = "TEST-BOOKS2"
      name = ${?BOOK_TOPIC}
      partitions = 1
      partitions = ${?BOOK_TOPIC_PARTITIONS}
      replication-factor = 1
      replication-factor = ${?BOOK_TOPIC_REPLICATION}
    }
  }
}

Then define the log level of the test with src/test/resources/logback-test.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
            <pattern>
                %-4relative [%thread] %-5level %logger{35} - %msg
            </pattern>
        </encoder>
    </appender>

    <root level="ERROR">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

Write a the test classes

2

Let’s create a directory for the tests to live in:

mkdir -p src/test/scala/io/confluent/developer

Before we go further we need to discuss what and how to test in these applications.

What to test? We don’t want to test Kafka clients themselves. Instead, we’d like to test functions with business behavior, even if they have side effects due to the call to Kafka producers / consumers.

How to test? On the opposite of most tutorials on this site, we are not testing a streaming application here, which means we can not extract a streaming topology and test it separately. We have to spawn a real Kafka broker to test our functions. And to do this we will use the testcontainers Kafka module.

First we create a trait that will extend AnyFlatSpec, Matchers and other tests traits. Because these tests might be asynchronous, we will use the scalatest functions from Eventually.

All Kafka Clients related tests will extends this trait, so let’s call it: src/test/scala/io/confluent/developer/KafkaFlatSpec.scala

package io.confluent.developer

import io.confluent.developer.Configuration.TopicConf.TopicSpec
import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.junit.Rule
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Inspectors}
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName

import scala.jdk.CollectionConverters._

trait KafkaFlatSpec extends AnyFlatSpec
  with Matchers
  with Inspectors
  with BeforeAndAfterAll
  with GivenWhenThen
  with Eventually {

  val testTopics: Vector[TopicSpec]

  @Rule
  val kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"))
  lazy val admin: Admin = Admin.create(Map[String, AnyRef]("bootstrap.servers" -> kafka.getBootstrapServers).asJava)

  override def beforeAll(): Unit = {
    super.beforeAll()
    kafka.start()
    admin.createTopics(
      testTopics.map { topic =>
        new NewTopic(
          topic.name,
          topic.partitions,
          topic.replicationFactor
        )
      }.asJava
    )
  }

  override def afterAll(): Unit = {
    admin.close()
    kafka.stop()
    super.afterAll()
  }
}
mkdir -p app-producer/src/test/scala/io/confluent/developer/produce/
mkdir -p app-consumer/src/test/scala/io/confluent/developer/consume/

Add the produce function test class: app-producer/src/test/scala/io/confluent/developer/ProducerSpec.scala:

package io.confluent.developer.produce

import java.time.LocalDate

import io.confluent.developer.Configuration.{ProducerConf, TopicConf}
import io.confluent.developer.KafkaFlatSpec
import io.confluent.developer.schema.BookType.{Novel, Other, Tech}
import io.confluent.developer.schema.{Book, ScalaReflectionSerde}
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{Deserializer, Serdes, Serializer}
import org.apache.kafka.common.utils.Bytes
import pureconfig.ConfigSource
import pureconfig.generic.auto._

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._

class ProducerSpec extends KafkaFlatSpec with ScalaReflectionSerde {

  var testConsumer: KafkaConsumer[Bytes, Book] = _
  val bookSerializer: Serializer[Book] = ScalaReflectionSerde.reflectionSerializer4S[Book]
  val bookDeserializer: Deserializer[Book] = ScalaReflectionSerde.reflectionDeserializer4S[Book]
  val conf: ProducerConf = ConfigSource.default.at("producer").loadOrThrow[ProducerConf]

  override val testTopics: Vector[TopicConf.TopicSpec] = conf.topics.all

  bookSerializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)
  bookDeserializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)

  override def beforeAll(): Unit = {
    super.beforeAll()
    val config = Map[String, AnyRef]("group.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
    testConsumer = new KafkaConsumer[Bytes, Book](config.asJava, Serdes.Bytes().deserializer(), bookDeserializer)
  }

  "produce" should "write a series of new books to kafka" in {

    Given("a producer config")
    val config = Map[String, AnyRef]("client.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
    val producer = new KafkaProducer[Bytes, Book](config.asJava, Serdes.Bytes().serializer(), bookSerializer)

    And("a collection of books")
    val newBook1 = Book("book1", "title1", Tech, 20, LocalDate.of(2020, 1, 1))
    val newBook2 = Book("book2", "title2", Novel, 300, LocalDate.of(2020, 2, 1))
    val newBook3 = Book("book3", "title3", Other, 888, LocalDate.of(2020, 3, 1))

    When("the books get produced")
    val maybeMetadata1 = Producer.produce(producer, conf.topics.bookTopic.name, newBook1)
    val maybeMetadata2 = Producer.produce(producer, conf.topics.bookTopic.name, newBook2)
    val maybeMetadata3 = Producer.produce(producer, conf.topics.bookTopic.name, newBook3)

    val topicPartitions: Seq[TopicPartition] = (0 until conf.topics.bookTopic.partitions)
      .map(new TopicPartition(conf.topics.bookTopic.name, _))

    testConsumer.assign(topicPartitions.asJava)

    Then("records can be fetched from Kafka")
    eventually(timeout(5 second),  interval(1 second)){
      testConsumer.seekToBeginning(topicPartitions.asJava)
      val records: List[Book] = testConsumer.poll((1 second) toJava).asScala.map(_.value()).toList

      records should have length 3
      records should contain theSameElementsAs(newBook1 :: newBook2 :: newBook3 :: Nil)

      forAll (maybeMetadata1 :: maybeMetadata2 :: maybeMetadata3 :: Nil) { metadata =>
        metadata.isDone shouldBe true
      }
    }

    producer.flush()
    producer.close()
  }
}

Add the consume function test class: app-consumer/src/test/scala/io/confluent/developer/ConsumerSpec.scala:

package io.confluent.developer.consume

import java.time.LocalDate

import io.confluent.developer.Configuration.{ProducerConf, TopicConf}
import io.confluent.developer.KafkaFlatSpec
import io.confluent.developer.schema.BookType.{Novel, Other, Tech}
import io.confluent.developer.schema.{Book, ScalaReflectionSerde}
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{Deserializer, Serdes, Serializer}
import org.apache.kafka.common.utils.Bytes
import pureconfig.ConfigSource
import pureconfig.generic.auto._

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._

class ConsumerSpec extends KafkaFlatSpec with ScalaReflectionSerde {

  var testProducer: KafkaProducer[Bytes, Book] = _
  val conf: ProducerConf = ConfigSource.default.at("consumer").loadOrThrow[ProducerConf]

  val bookSerializer: Serializer[Book] = ScalaReflectionSerde.reflectionSerializer4S[Book]
  val bookDeserializer: Deserializer[Book] = ScalaReflectionSerde.reflectionDeserializer4S[Book]
  override val testTopics: Vector[TopicConf.TopicSpec] = conf.topics.all

  bookSerializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)
  bookDeserializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)

  override def beforeAll(): Unit = {
    super.beforeAll()
    val config = Map[String, AnyRef]("group.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
    testProducer = new KafkaProducer[Bytes, Book](config.asJava, Serdes.Bytes().serializer(), bookSerializer)
  }

  override def afterAll(): Unit = {
    testProducer.close()
    super.afterAll()
  }

  "consume" should "fetch the existing records from kafka" in {

    Given("a consumer config")
    val config = Map[String, AnyRef]("client.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
    val consumer = new KafkaConsumer[Bytes, Book](config.asJava, Serdes.Bytes().deserializer(), bookDeserializer)

    And("a collection of books")
    val newBook1 = Book("book1", "title1", Tech, 20, LocalDate.of(2020, 4, 1))
    val newBook2 = Book("book2", "title2", Novel, 300, LocalDate.of(2020, 5, 1))
    val newBook3 = Book("book3", "title3", Other, 888, LocalDate.of(2020, 6, 1))

    testProducer.send(new ProducerRecord(conf.topics.bookTopic.name, newBook1))
    testProducer.send(new ProducerRecord(conf.topics.bookTopic.name, newBook2))
    testProducer.send(new ProducerRecord(conf.topics.bookTopic.name, newBook3))

    testProducer.flush()

    When("we consume back the records")
    val topicPartitions: Seq[TopicPartition] = (0 until conf.topics.bookTopic.partitions)
      .map(new TopicPartition(conf.topics.bookTopic.name, _))

    consumer.assign(topicPartitions.asJava)

    Then("a collection of books is returned")
    eventually(timeout(5 second),  interval(1 second)){
      consumer.seekToBeginning(topicPartitions.asJava)
      val records: List[Book] = consumer.poll((1 second) toJava).asScala.map(_.value()).toList

      records should have length 3
      records should contain theSameElementsAs(newBook1 :: newBook2 :: newBook3 :: Nil)
    }
  }
}

Run the tests

3

Finally, run the test by typing:

./gradlew test

It should output the following:

> Task :app-consumer:spec
Discovery starting.
Discovery completed in 2 seconds, 922 milliseconds.
Run starting. Expected test count is: 1
ConsumerSpec:
consume
- should fetch the existing records from kafka
  + Given a consumer config
  + And a collection of books
  + When we consume back the records
  + Then a collection of books is returned
Run completed in 1 minute, 11 seconds.
Total number of tests run: 1
Suites: completed 2, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

> Task :app-producer:spec
Discovery starting.
Discovery completed in 2 seconds, 373 milliseconds.
Run starting. Expected test count is: 1
ProducerSpec:
produce
- should write a series of new books to kafka
  + Given a producer config
  + And a collection of books
  + When the books get produced
  + Then records can be fetched from Kafka
Run completed in 17 seconds, 802 milliseconds.
Total number of tests run: 1
Suites: completed 2, aborted 0
Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
All tests passed.

BUILD SUCCESSFUL in 1m 34s
14 actionable tasks: 2 executed, 12 up-to-date