Building a Kafka Serde is Easy

Peter McIntyre Senior DevOps Engineer at Versent

Peter McIntyre

Principal Engineer

November 20, 2024

The what, why, and how of building a Serde

🚧 Serde 101

tl;dr; the symmetric process of converting objects into bytes and back again.

What is a Serde?

A Serde (or SerDe, short for Serializer/Deserializer) in Kafka represents a symmetric mechanism for converting between in-memory representations of data into a desired format and back again.

When publishing to and consuming from a Kafka topic, you must agree on a Serde for both the message key and value. For example, you could serialize the key to a String and the value to Avro.

Why a custom Serde?

In most cases, you will want to use industry-standard formats like Avro, Protobuf, or JSON. However, there are real cases for custom serializers—adding end-to-end encryption, masking sensitive values, wrapping a message with metadata, decorating a message with company-standard headers, etc.

Core Concepts

To implement a custom Serde in Java, you will want to familiarize yourself with the following Java interfaces:

  • Serializer: An interface for converting objects to bytes.
  • Deserializer: An interface for converting bytes to objects.
  • Serde: The interface for wrapping a serializer and deserializer pair.
  • AbstractConfig: A base class to extend for handling configuration.

To build a custom Serde, you will need to use each of these.

The Ecosystem

The Kafka ecosystem is vast, and many documents blur the lines between Kafka, Springboot, Confluent, and other related terms. Let’s clarify:

  • Apache Kafka: The open-source distributed event streaming platform. It provides the core software for deploying a cluster and the SDK’s to interface with it.
  • Spring: A framework that provides features to help you build and test applications — like Inversion of Control (IoC) and Dependency Injection (DI).
  • Spring Boot: Provides auto-configuration and convenient abstractions on top of Spring.
  • Confluent: A company founded by the creators of Apache Kafka. They offer Kafka as a service and provide other add-ons like Kafka Connect, Schema Registry, and lots of Serdes!

🛠️ Building a custom Serde

I will describe the more interesting parts of building a custom serde. The full code and demos are in this accompanying repo.

For demonstration, we will build a Base64 serde, which is configurable to wrap another serde — we will choose JSON:

The data flow of a Java object from a producer to a consumer via Kafka

As a result, messages on the Kafka cluster will be base64 encoded — here is a view from the Confluent web UI:

Confluent web UI with base64 encoded messages

Understanding how a Serde is configured

There are many ways to load configuration and instantiate the Kafka classes. Below, I document the simplest approach using Spring Boot, which abstracts away undifferentiated plumbing. For a deeper understanding, consider reviewing the other options in the accompanying repo above.

To build a Serde, you should understand how it is commonly used by consuming applications. Below, we use application properties to configure a JSON Serde. Springboot reads this and instantiates Kafka’s internal Consumer and Producer.

# Kafka
kafka.persons.topic.name: springboot-simple-persons

# Kafka Producer
spring.kafka.producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

# Kafka Consumer
spring.kafka.consumer:
group-id: ${random.uuid}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
'[spring.json.value.default.type]': com.pwmcintyre.dto.Person
'[spring.json.trusted.packages]': com.pwmcintyre.*

Once the above config is set, Springboot can instantiate the Kafka classes. Below, we Autowire a KafkaTemplate used for sending messages — e.g.

@Autowired
private KafkaTemplate<String, Person> kafkaTemplate;

@Value("${kafka.persons.topic.name}")
private String topic;

public void send() {
kafkaTemplate.send(topic, new Person("bob"));
}

Similarly, we can begin listening to the topic with the @KafkaListener annotation — e.g.

@KafkaListener(topics = "${kafka.persons.topic.name}")
public void listen(final Person person) {
log.debug("Received: {}", person);
}

FYI — The magic behind these can be found in KafkaAutoConfiguration.java and KafkaListenerAnnotationBeanPostProcessor.java.

What’s nice about this approach is that you have complete control over the serialization via config alone! This is evaluated at startup and can be swapped at deployment time (e.g., per environment).

You can read more about Springboot properties — and other more flexible approaches — in the official guide.

Implementing Serializer

Note: The code below is similar to the Deserializer and Serde classes; I will show just the Serializer for brevity.

The first step is to implement the required parts of the Serializer and Deserializer interfaces — below, we implement our Base64Serializer:

public class Base64Serializer<T> implements Serializer<T> {

private Serializer<T> inner;

/**
* Args constructor used for programmatic instantiation
* @param inner used to serialize data prior to encoding
*/
public Base64Serializer(Serializer<T> inner) {
this.inner = inner;
}

/**
* @param topic topic associated with data
* @param data typed data
* @return Base64 encoded serialized data
*/
@Override
public byte[] serialize(String topic, T data) {

if (data == null) {
return null;
}

// use inner serializer first
byte[] serialized = inner.serialize(topic, data);

// encode
return Base64.getEncoder().encode(serialized);
}
}

NOTE: we could stop here — an application can programmatically instantiate this serialzier and use it to send messages, nice! 👌
(but we can do better)

Extending AbstractConfig

AbstractConfig is the true magic behind making the Kafka ecosystem so extensible. It is woefully undocumented and the primary driver behind my writing this article—although we will only scratch the surface!

If your Serde has configurable attributes, this is how to provide that mechanism. Our base64 serde has two configurable parts:

  1. the class name of the inner deserializer used after base64 decoding.
  2. the class name of the inner serializer used before base64 encoding.

We define the configurable parts using ConfigDef and supplying the name, type, and defaults — like so:

public class Base64SerDeConfig extends AbstractConfig {

public static final String DESERIALIZER_CLASS_CONFIG = "base64.inner.deserializer";
public static final String DESERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common.serialization.ByteArrayDeserializer";
public static final String DESERIALIZER_CLASS_CONFIG_DOC = "The class which is used to deserialize after decoding";

public static final String SERIALIZER_CLASS_CONFIG = "base64.inner.serializer";
public static final String SERIALIZER_CLASS_CONFIG_DEFAULT = "org.apache.kafka.common.serialization.ByteArraySerializer";
public static final String SERIALIZER_CLASS_CONFIG_DOC = "The class which is used to serialize before encoding";

private static final ConfigDef definition = new ConfigDef()
.define(
DESERIALIZER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
DESERIALIZER_CLASS_CONFIG_DEFAULT,
ConfigDef.Importance.HIGH,
DESERIALIZER_CLASS_CONFIG_DOC)
.define(
SERIALIZER_CLASS_CONFIG,
ConfigDef.Type.CLASS,
SERIALIZER_CLASS_CONFIG_DEFAULT,
ConfigDef.Importance.HIGH,
SERIALIZER_CLASS_CONFIG_DOC);

public Base64SerDeConfig(Map<?, ?> config) {
super(definition, config);
}

}

Making use of Config

Once the configs are defined, we can use them to make the serde configurable.

To make your class fully configurable, you should consider implementing two methods — they’re slightly different:

  1. implement Serializer interface, specifically the configure method
  2. implement Configurable interface
    (optional: but makes your class more flexible, read on for why)

Here are the new parts to our Base64Serializer:

public class Base64Serializer<T> implements Serializer<T>, Configurable {

...

/**
* Empty args constructor used in conjunction with {@link #configure}
*/
public Base64Serializer() {
}

/**
* Overrides {@link Serializer#configure}
*/
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(configs);
}

/**
* Declares this class as Configurable, used by {@link AbstractConfig#getConfiguredInstance}
*/
@Override
public void configure(Map<String, ?> configs) {
configure(new Base64SerDeConfig(configs));
}

/**
* Apply configs declared by {@link Base64SerDeConfig}
*/
@SuppressWarnings("unchecked")
protected void configure(Base64SerDeConfig configs) {
this.inner = configs.getConfiguredInstance(Base64SerDeConfig.SERIALIZER_CLASS_CONFIG, Serializer.class);
}

...
}

The real workhorse of this new code is the call to AbstractConfig.getConfiguredInstance().

This method instantiates a given class by name and attempts to configure it. This, in turn, calls the Configurable#configure method of that class—it’s Configurables all the way down! All the configuration data specified by the root application is passed down (so be sure to pick unique-ish keys for your Serde).

In our demo app, where we specify JsonSerializer as the inner serializer, you can see all the config is available at startup:

example config values passed to getConfiguredInstance when JsonSerialzer is set as our inner Serializer

Note that there are other getter methods of AbstractConfig to assist in defining and validating other types — e.g., getPassword, getString, getInt, etc. — which all do exactly as you expect.

✅ And that’s it — Easy!

Building a custom serde doesn’t have to be difficult — above I describe the parts of building a custom Serde: Implementing interfaces and enabling configuration.

When I first attempted this process, I found some aspects were overly documented while others needed to be included. I had to dig through a mountain of internal code and build demos to understand how everything worked. Hopefully, this article will make the journey smoother.

You can run this for yourself by cloning the supplementary repo!

Share

Great Tech-Spectations

Great Tech-Spectations

The Versent & AWS Great Tech-Spectations report explores how Aussies feel about tech in their everyday lives and how it measures up to expectations. Download the report now for a blueprint on how to meet consumer’s growing demands.