How to rekey a stream with a function

Problem:

you have a Kafka topic and you want to change the key of the messages. The new key is a variation of data you currently have in the messages. A small fragment of code could convert the data you have to the new key.

Edit this page

Example use case:

Consider a stream of customer information events keyed by id. Each event contains a few attributes, including the customer's phone number. In this tutorial, we'll write a program that rekeys the topic by the area address of the phone number. Customers of the same area code will be placed into the same partition in the new topic.

Code example:

Try it

1
Initialize the project

KSQL has many built-in functions that help with processing records in streaming data, like ABS and SUM. Extracting the area code from a phone number is easiest done with a regular expression. To do this you can implement custom functions in Java that go beyond the built-in functions.

Get started by making a new directory anywhere you’d like for this project:

mkdir rekey-with-function && cd rekey-with-function

Then make the following directories:

mkdir src extensions

You can create a Gradle build file to build your Java code into a jar file that is supplied to KSQL. Create the following Gradle build file, named build.gradle for the project:

buildscript {
    repositories {
        jcenter()
    }
}

plugins {
    id 'java'
}

sourceCompatibility = '1.8'
targetCompatibility = '1.8'
version = '0.0.1'

repositories {
    mavenCentral()
    jcenter()

    maven {
        url 'http://packages.confluent.io/maven'
    }
}

dependencies {
    compile 'io.confluent.ksql:ksql-udf:5.3.0'
    testCompile 'junit:junit:4.12'
}

task copyJar(type: Copy) {
    from jar
    into "extensions/"
}

build.dependsOn copyJar

test {
    testLogging {
        outputs.upToDateWhen { false }
        showStandardStreams = true
        exceptionFormat = 'full'
    }
}

The build.gradle also contains a copyJar step to copy the jar file to the extensions/ directory where it will be picked up by KSQL. This is convenient when you are iterating on a function. For example, you might have tested your UDF against your suite of unit tests and you are now ready to test against steams in KSQL. With the jar in the correct place a restart of KSQL will load your updated jar.

And be sure to run the following command to obtain the Gradle wrapper:

gradle wrapper

2
Implement the KSQL User-defined Function

Create a directory for the Java files in this project:

mkdir -p src/main/java/io/confluent/developer

Then create the following file at src/main/java/io/confluent/developer/RegexReplace.java. This file contains the Java logic of your custom function. Read through the code to familiarize yourself. You will see that the code is checking for null values in each of the parameters. We do this because, the custom function could be used with unpopulated data that will send a null to the input parameter. As as extra sanity we check the regex and replacement parameters are not sent null.

package io.confluent.developer;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

@UdfDescription(name = "regexReplace", description = "Replace string using a regex")
public class RegexReplace {

  @Udf(description = "regexReplace string using a regex")
  public String regexReplace(
    @UdfParameter(value = "input", description = "If null, then function returns null.") final String input,
    @UdfParameter(value = "regex", description = "If null, then function returns null.") final String regex,
    @UdfParameter(value = "replacement", description = "If null, then function returns null.") final String replacement) {
      if (input == null || regex == null || replacement == null) {
        return null;
      }
      return input.replaceAll(regex, replacement);
  }
}

See more about KSQL User-Defined Functions at the KSQL Custom Function Reference.

3
Build and Copy the KSQL User-defined Function

In your terminal, run:

./gradlew build

The copyJar gradle task will automatically deliver the jar to the extensions/ directory.

4
Get Confluent Platform

Next, create the following docker-compose.yml file to obtain Confluent Platform:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:5.3.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  ksql-server:
    image: confluentinc/cp-ksql-server:5.3.0
    hostname: ksql-server
    container_name: ksql-server
    depends_on:
      - broker
      - schema-registry
    volumes:
      - ./extensions:/etc/ksql/ext
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_KSQL_EXTENSION_DIR: "/etc/ksql/ext/"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_HOST_NAME: ksql-server
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"

  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.3.0
    container_name: ksql-cli
    depends_on:
      - broker
      - ksql-server
    entrypoint: /bin/sh
    tty: true
    volumes:
      - ./src:/opt/app/src
      - ./test:/opt/app/test

Note docker-compose.yml has configured the ksql-server container with KSQL_KSQL_EXTENSION_DIR: "/etc/ksql/ext/", and maps the local extensions directory to /etc/ksql/ext in the container. KSQL is now configured to look in this location for your extensions such as custom functions.

Launch the platform by running:

docker-compose up

5
Write the program interactively using the CLI

To begin developing interactively, open up the KSQL CLI:

docker exec -it ksql-cli ksql http://ksql-server:8088

Firstly, let’s confirm the UDF jar has been loaded correctly. You will see REGEXREPLACE in the list of functions.

SHOW FUNCTIONS;

You can see some addition detail about the function with DESCRIBE FUNCTION.

DESCRIBE FUNCTION REGEXREPLACE;

The result gives you a description of the function including input parameters and the return type.

Name        : REGEXREPLACE
Overview    : Replace string using a regex
Type        : scalar
Jar         : /etc/ksql/ext/rekey-with-function-0.0.1.jar
Variations  :

	Variation   : REGEXREPLACE(input VARCHAR, regex VARCHAR, replacement VARCHAR)
	Returns     : VARCHAR
	Description : regexReplace string using a regex
	input       : If null, then function returns null.
	regex       : If null, then function returns null.
	replacement : If null, then function returns null.

Create a Kafka topic and stream of customers.

CREATE STREAM customers (id int, firstname string, lastname string, phonenumber string)
  WITH (kafka_topic='customers',
        partitions=2,
        key='id',
        value_format = 'avro');

Then insert the customer data.

INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (1, 'Sleve', 'McDichael', '(360) 555-8909');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (2, 'Onson', 'Sweemey', '206-555-1272');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (3, 'Darryl', 'Archideld', '425.555.6940');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (4, 'Anatoli', 'Smorin', '509.555.8033');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (5, 'Rey', 'McSriff', '360 555 6952');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (6, 'Glenallen', 'Mixon', '(253) 555-7050');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (7, 'Mario', 'McRlwain', '360 555 7598');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (8, 'Kevin', 'Nogilny', '206.555.8090');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (9, 'Tony', 'Smehrik', '425-555-7926');
INSERT INTO customers (id, firstname, lastname, phonenumber) VALUES (10, 'Bobson', 'Dugnutt', '509.555.8795');

Now that you have stream with some events in it, let’s read them out. The first thing to do is set the following properties to ensure that you’re reading from the beginning of the stream:

SET 'auto.offset.reset' = 'earliest';

We can view the results of the newly created REGEXREPLACE with the messages:

SELECT ROWKEY, ID, FIRSTNAME, LASTNAME, PHONENUMBER, REGEXREPLACE(phonenumber, '\\(?(\\d{3}).*', '$1')
FROM CUSTOMERS
LIMIT 10;

This should yield roughly following output. The order will be different as we are delivering to 2 partitions:

1 | 1 | Sleve | McDichael | (360) 555-8909 | 360
10 | 10 | Bobson | Dugnutt | 509.555.8795 | 509
2 | 2 | Onson | Sweemey | 206-555-1272 | 206
3 | 3 | Darryl | Archideld | 425.555.6940 | 425
4 | 4 | Anatoli | Smorin | 509.555.8033 | 509
5 | 5 | Rey | McSriff | 360 555 6952 | 360
6 | 6 | Glenallen | Mixon | (253) 555-7050 | 253
7 | 7 | Mario | McRlwain | 360 555 7598 | 360
8 | 8 | Kevin | Nogilny | 206.555.8090 | 206
9 | 9 | Tony | Smehrik | 425-555-7926 | 425
Limit Reached
Query terminated

Note REGEXREPLACE has extracted the area code from several differently formatted phone numbers.

Using KSQL’s appropriately named PARTITION BY clause we can apply a key to the messages and write it to a new stream. Here we’ll use the output from REGEXREPLACE in the column area_code. Issue the following to create a new stream that is continuously populated by its query:

CREATE STREAM customers_by_area_code
  WITH (KAFKA_TOPIC='customers_by_area_code') AS
    SELECT
      id,
      firstname,
      lastname,
      phonenumber,
      REGEXREPLACE(phonenumber, '\\(?(\\d{3}).*', '$1') as area_code
    FROM customers
    PARTITION BY area_code;

To check that it’s working, use ROWKEY as before to check that the key matches the ID field:

SELECT ROWKEY, ID, FIRSTNAME, LASTNAME, AREA_CODE, PHONENUMBER
FROM CUSTOMERS_BY_AREA_CODE
LIMIT 10;

This should yield roughly the following output. The order might vary from what you see here, but the data has been repartitioned such that all customers in the same area code are now in exactly one partition. Note the ROWKEY column contains the area code.

206 | 2 | Onson | Sweemey | 206 | 206-555-1272
206 | 8 | Kevin | Nogilny | 206 | 206.555.8090
253 | 6 | Glenallen | Mixon | 253 | (253) 555-7050
360 | 1 | Sleve | McDichael | 360 | (360) 555-8909
360 | 5 | Rey | McSriff | 360 | 360 555 6952
360 | 7 | Mario | McRlwain | 360 | 360 555 7598
425 | 3 | Darryl | Archideld | 425 | 425.555.6940
425 | 9 | Tony | Smehrik | 425 | 425-555-7926
509 | 10 | Bobson | Dugnutt | 509 | 509.555.8795
509 | 4 | Anatoli | Smorin | 509 | 509.555.8033
Limit Reached
Query terminated

6
Write your statements to a file

Now that you have a series of statements that’s doing the right thing, the last step is to put them into a file so that they can be used outside the CLI session. Create a file at src/statements.sql with the following content:

CREATE STREAM customers (id int, firstname string, lastname string, phonenumber string)
  WITH (kafka_topic='customers',
        partitions=2,
        key='id',
        value_format = 'avro');

CREATE STREAM customers_by_area_code
  WITH (KAFKA_TOPIC='customers_by_area_code') AS
    SELECT
      id,
      firstname,
      lastname,
      phonenumber,
      REGEXREPLACE(phonenumber, '\\(?(\\d{3}).*', '$1') as area_code
    FROM customers
    PARTITION BY area_code;

Test it

1
Write a test

Then, create a directory for the tests to live in:

mkdir -p src/test/java/io/confluent/developer

Create the following test file at src/test/java/io/confluent/developer/RegexReplaceTest.java:

package io.confluent.developer;

import static org.junit.Assert.*;
import io.confluent.developer.RegexReplace;
import org.junit.Test;

public class RegexReplaceTest {

    @Test
    public void testRegexReplace() {
        RegexReplace udf = new RegexReplace();
        String regEx = "\\(?(\\d{3}).*";
        assertEquals("206", udf.regexReplace("206-555-1272", regEx, "$1"));
        assertEquals("425", udf.regexReplace("425.555.6940", regEx, "$1"));
        assertEquals("360", udf.regexReplace("360 555 6952", regEx, "$1"));
        assertEquals("253", udf.regexReplace("(253) 555-7050", regEx, "$1"));
        assertEquals("425", udf.regexReplace("425-555-7926", regEx, "$1"));

        // test null parameters return null
        assertNull(udf.regexReplace(null, regEx, "$1"));
        assertNull(udf.regexReplace("425-555-7926", null, "$1"));
        assertNull(udf.regexReplace("425-555-7926", regEx, null));
    }
}

2
Invoke the tests

Now run the test, which is as simple as:

./gradlew test

Take it to production

1
Send the statements to the REST API

Launch your statements into production by sending them to the REST API with the following command:

statements=$(< src/statements.sql) && \
    echo '{"ksql":"'$statements'", "streamsProperties": {}}' | \
        curl -X "POST" "http://localhost:8088/ksql" \
             -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
             -d @- | \
        jq