Let’s create a directory for the tests to live in:
mkdir -p src/test/scala/io/confluent/developer
Before we go further we need to discuss what and how to test in these applications.
What to test? We don’t want to test Kafka clients themselves. Instead, we’d like to test functions with
business behavior, even if they have side effects due to the call to Kafka producers / consumers.
How to test? On the opposite of most tutorials on this site, we are not testing a streaming application here,
which means we can not extract a streaming topology and test it separately.
We have to spawn a real Kafka broker to test our functions. And to do this we will use the
testcontainers Kafka module.
First we create a trait
that will extend AnyFlatSpec
, Matchers
and other tests traits.
Because these tests might be asynchronous, we will use the scalatest functions from Eventually
.
All Kafka Clients related tests will extends this trait, so let’s call it:
src/test/scala/io/confluent/developer/KafkaFlatSpec.scala
package io.confluent.developer
import io.confluent.developer.Configuration.TopicConf.TopicSpec
import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.junit.Rule
import org.scalatest.concurrent.Eventually
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Inspectors}
import org.testcontainers.containers.KafkaContainer
import org.testcontainers.utility.DockerImageName
import scala.jdk.CollectionConverters._
trait KafkaFlatSpec extends AnyFlatSpec
with Matchers
with Inspectors
with BeforeAndAfterAll
with GivenWhenThen
with Eventually {
val testTopics: Vector[TopicSpec]
@Rule
val kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.3.0"))
lazy val admin: Admin = Admin.create(Map[String, AnyRef]("bootstrap.servers" -> kafka.getBootstrapServers).asJava)
override def beforeAll(): Unit = {
super.beforeAll()
kafka.start()
admin.createTopics(
testTopics.map { topic =>
new NewTopic(
topic.name,
topic.partitions,
topic.replicationFactor
)
}.asJava
)
}
override def afterAll(): Unit = {
admin.close()
kafka.stop()
super.afterAll()
}
}
mkdir -p app-producer/src/test/scala/io/confluent/developer/produce/
mkdir -p app-consumer/src/test/scala/io/confluent/developer/consume/
Add the produce function test class: app-producer/src/test/scala/io/confluent/developer/ProducerSpec.scala
:
package io.confluent.developer.produce
import java.time.LocalDate
import io.confluent.developer.Configuration.{ProducerConf, TopicConf}
import io.confluent.developer.KafkaFlatSpec
import io.confluent.developer.schema.BookType.{Novel, Other, Tech}
import io.confluent.developer.schema.{Book, ScalaReflectionSerde}
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{Deserializer, Serdes, Serializer}
import org.apache.kafka.common.utils.Bytes
import pureconfig.ConfigSource
import pureconfig.generic.auto._
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
class ProducerSpec extends KafkaFlatSpec with ScalaReflectionSerde {
var testConsumer: KafkaConsumer[Bytes, Book] = _
val bookSerializer: Serializer[Book] = ScalaReflectionSerde.reflectionSerializer4S[Book]
val bookDeserializer: Deserializer[Book] = ScalaReflectionSerde.reflectionDeserializer4S[Book]
val conf: ProducerConf = ConfigSource.default.at("producer").loadOrThrow[ProducerConf]
override val testTopics: Vector[TopicConf.TopicSpec] = conf.topics.all
bookSerializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)
bookDeserializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)
override def beforeAll(): Unit = {
super.beforeAll()
val config = Map[String, AnyRef]("group.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
testConsumer = new KafkaConsumer[Bytes, Book](config.asJava, Serdes.Bytes().deserializer(), bookDeserializer)
}
"produce" should "write a series of new books to kafka" in {
Given("a producer config")
val config = Map[String, AnyRef]("client.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
val producer = new KafkaProducer[Bytes, Book](config.asJava, Serdes.Bytes().serializer(), bookSerializer)
And("a collection of books")
val newBook1 = Book("book1", "title1", Tech, 20, LocalDate.of(2020, 1, 1))
val newBook2 = Book("book2", "title2", Novel, 300, LocalDate.of(2020, 2, 1))
val newBook3 = Book("book3", "title3", Other, 888, LocalDate.of(2020, 3, 1))
When("the books get produced")
val maybeMetadata1 = Producer.produce(producer, conf.topics.bookTopic.name, newBook1)
val maybeMetadata2 = Producer.produce(producer, conf.topics.bookTopic.name, newBook2)
val maybeMetadata3 = Producer.produce(producer, conf.topics.bookTopic.name, newBook3)
val topicPartitions: Seq[TopicPartition] = (0 until conf.topics.bookTopic.partitions)
.map(new TopicPartition(conf.topics.bookTopic.name, _))
testConsumer.assign(topicPartitions.asJava)
Then("records can be fetched from Kafka")
eventually(timeout(5 second), interval(1 second)){
testConsumer.seekToBeginning(topicPartitions.asJava)
val records: List[Book] = testConsumer.poll((1 second) toJava).asScala.map(_.value()).toList
records should have length 3
records should contain theSameElementsAs(newBook1 :: newBook2 :: newBook3 :: Nil)
forAll (maybeMetadata1 :: maybeMetadata2 :: maybeMetadata3 :: Nil) { metadata =>
metadata.isDone shouldBe true
}
}
producer.flush()
producer.close()
}
}
Add the consume function test class: app-consumer/src/test/scala/io/confluent/developer/ConsumerSpec.scala
:
package io.confluent.developer.consume
import java.time.LocalDate
import io.confluent.developer.Configuration.{ProducerConf, TopicConf}
import io.confluent.developer.KafkaFlatSpec
import io.confluent.developer.schema.BookType.{Novel, Other, Tech}
import io.confluent.developer.schema.{Book, ScalaReflectionSerde}
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.{Deserializer, Serdes, Serializer}
import org.apache.kafka.common.utils.Bytes
import pureconfig.ConfigSource
import pureconfig.generic.auto._
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
class ConsumerSpec extends KafkaFlatSpec with ScalaReflectionSerde {
var testProducer: KafkaProducer[Bytes, Book] = _
val conf: ProducerConf = ConfigSource.default.at("consumer").loadOrThrow[ProducerConf]
val bookSerializer: Serializer[Book] = ScalaReflectionSerde.reflectionSerializer4S[Book]
val bookDeserializer: Deserializer[Book] = ScalaReflectionSerde.reflectionDeserializer4S[Book]
override val testTopics: Vector[TopicConf.TopicSpec] = conf.topics.all
bookSerializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)
bookDeserializer.configure(Map(SCHEMA_REGISTRY_URL_CONFIG -> "mock://unused:8081").asJava, false)
override def beforeAll(): Unit = {
super.beforeAll()
val config = Map[String, AnyRef]("group.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
testProducer = new KafkaProducer[Bytes, Book](config.asJava, Serdes.Bytes().serializer(), bookSerializer)
}
override def afterAll(): Unit = {
testProducer.close()
super.afterAll()
}
"consume" should "fetch the existing records from kafka" in {
Given("a consumer config")
val config = Map[String, AnyRef]("client.id" -> "test", "bootstrap.servers" -> kafka.getBootstrapServers)
val consumer = new KafkaConsumer[Bytes, Book](config.asJava, Serdes.Bytes().deserializer(), bookDeserializer)
And("a collection of books")
val newBook1 = Book("book1", "title1", Tech, 20, LocalDate.of(2020, 4, 1))
val newBook2 = Book("book2", "title2", Novel, 300, LocalDate.of(2020, 5, 1))
val newBook3 = Book("book3", "title3", Other, 888, LocalDate.of(2020, 6, 1))
testProducer.send(new ProducerRecord(conf.topics.bookTopic.name, newBook1))
testProducer.send(new ProducerRecord(conf.topics.bookTopic.name, newBook2))
testProducer.send(new ProducerRecord(conf.topics.bookTopic.name, newBook3))
testProducer.flush()
When("we consume back the records")
val topicPartitions: Seq[TopicPartition] = (0 until conf.topics.bookTopic.partitions)
.map(new TopicPartition(conf.topics.bookTopic.name, _))
consumer.assign(topicPartitions.asJava)
Then("a collection of books is returned")
eventually(timeout(5 second), interval(1 second)){
consumer.seekToBeginning(topicPartitions.asJava)
val records: List[Book] = consumer.poll((1 second) toJava).asScala.map(_.value()).toList
records should have length 3
records should contain theSameElementsAs(newBook1 :: newBook2 :: newBook3 :: Nil)
}
}
}