Skip to content

Commit

Permalink
Document Serdes in more depth (#1337)
Browse files Browse the repository at this point in the history
Fixes #1331.
  • Loading branch information
erikvanoosten authored Oct 8, 2024
1 parent da2ec46 commit 9cc273a
Showing 1 changed file with 123 additions and 8 deletions.
131 changes: 123 additions & 8 deletions docs/serialization-and-deserialization.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,63 @@
---
id: serialization-and-deserialization
title: "Serialization And Deserialization"
title: "Serialization and Deserialization"
---

Zio-kafka deserializes incoming data, and deserializes outgoing data (both keys and values) from byte arrays to any
other type and back. This works by providing a key and value `Deserializer` while constructing a `Consumer`,
and a key and value `Serializer` while constructing the `Producer`.

A `Serde` combines a `Deserializer` and a `Serializer`. Common serdes are provided in the `Serdes` object, e.g.
`Serdes.byteArray`, `Serdes.string` and `Serdes.long`. A serde can be converted to other serdes, or you can create a
custom serde by implementing the `Serde` trait directly.

This document contains:

- Handling failures in a serde
- How to create a custom serde
- How to create and use a custom serde that wraps invalid data
- How to do deserialization in the consumer stream
- A warning about using `mapZIO`

## Handling failures in a serde

Ideally, a serde can not fail serializing and deserializing. This is for example the case with the provided
`Serdes.byteArray` and `Serdes.string`. This is not the case for any serde that needs to handle invalid input,
(for example `Serdes.long`), or a serde that needs to do a remote lookup.

By default, a consumer stream will fail if it encounters a deserialization error in the serde. Unfortunately, the
resulting failure might not clearly indicate that the cause is in the serde.

There are 2 solutions for improving this:

- Wrap the result of the serde in a `Try` with the `Serde.asTry` method.
- Use `Serdes.byteArray`, put the deserialization code in the consumer stream, or do serialization before handing the
data to zio-kafka. This way you can handle failures any way you want.

Both approaches are discussed further below.

## Custom Data Type Serdes

Serializers and deserializers (serdes) for custom data types can be constructed from scratch or by converting existing serdes. For example, to create a serde for an `Instant`:
Serializers and deserializers for custom data types can be created from scratch, or by converting existing
serdes. For example, to create a serde for an `Instant` from a serde for a `Long`:

```scala
import java.time.Instant
import zio.kafka.serde._

val instantSerde: Serde[Any, Instant] = Serde.long.inmap(java.time.Instant.ofEpochMilli)(_.toEpochMilli)
val instantSerde: Serde[Any, Instant] =
Serdes.long.inmap(java.time.Instant.ofEpochMilli)(_.toEpochMilli)
```

## Handling deserialization failures
To handle missing data (an empty key or value), you can use the `Serde.asOption` transformer. For example:
`Serdes.string.asOption`. This results in a `None` if the key or value is empty, and in a `Some(string)` otherwise.

The default behavior for a consumer stream when encountering a deserialization failure is to fail the stream. In many cases you may want to handle this situation differently, e.g. by skipping the message that failed to deserialize or by executing an alternative effect. For this purpose, any `Deserializer[T]` for some type `T` can be easily converted into a `Deserializer[Try[T]]` where deserialization failures are converted to a `Failure` using the `asTry` method.
## Custom serdes that wraps invalid data

Below is an example of skipping messages that fail to deserialize. The offset is passed downstream to be committed.
Any `Deserializer[A]` for a given type `A` can be converted into a `Deserializer[Try[A]]` where deserialization
failures are converted to a `Failure` using the `asTry` method. (Method `asTry` is also available on `Serde`.)

Below is an example of skipping records that fail to deserialize. The offset is passed downstream to be committed.

```scala
import zio._, stream._
Expand All @@ -28,11 +67,13 @@ import scala.util.{Try, Success, Failure}

val consumer = ZLayer.scoped(Consumer.make(consumerSettings))

val keySerde = Serdes.string
val valueSerde = Serdes.string.asTry // <-- using `.asTry`
val stream = Consumer
.plainStream(Subscription.topics("topic150"), Serde.string, Serde.string.asTry)
.plainStream(Subscription.topics("topic150"), keySerde, valueSerde)

stream
.mapZIO { record =>
.mapZIO { record => // ⚠️ see section about `mapZIO` below!
val tryValue: Try[String] = record.record.value()
val offset: Offset = record.offset

Expand All @@ -50,3 +91,77 @@ stream
.runDrain
.provideSomeLayer(consumer)
```

## Deserialization in the consumer stream

In this approach we provide zio-kafka with the `Serdes.byteArray` serde (which is a pass-through serde) and do the
deserialization in the consumer stream. The deserialization can be done with regular ZIO operators.

This approach provides more freedom at the cost of having to write more code. It also allows for optimizations such as
operating on chunks of records (see next section), and more contextual failure handling.

Here is an example:

```scala
import zio._, stream._
import zio.kafka.consumer._

val consumer = ZLayer.scoped(Consumer.make(consumerSettings))

val stream = Consumer
.plainStream(Subscription.topics("topic150"), Serde.byteArray, Serde.byteArray)

def deserialize(value: Array[Byte]): ZIO[Any, Throwable, Message] = ???

stream
.mapZIO { record => // ⚠️ see section about `mapZIO` below!
val value: Array[Byte] = record.record.value()
val offset: Offset = record.offset

deserialize(value)
// possible action to take if deserialization fails:
.recoverWith(_ => someEffect(value))
.flatMap(processMessage)
.as(offset)
}
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain
.provideSomeLayer(consumer)
```

## A warning about `mapZIO`

Be careful with using `mapZIO` as it breaks the chunking structure of the stream (or more precisely, the resulting
stream has chunks with a single element). Throughput can be considerably lower than with the chunking structure intact.

If your application requirements allow all elements of a chunk to be processed in one go, then you can use one of these
techniques to preserve the chunking structure:

### Use `chunksWith`

Use `chunksWith` when you have a single processing step that needs to work on a chunk.

```scala
def f(a: A): ZIO[R, E, B]

stream // ZStream[R, E, A]
.chunksWith { stream => stream.mapZIO(f) } // ZStream[R, E, B]
```

### Expose chunking structure with `chunks`

Use `chunks` when you have multiple processing steps that can all work on a chunk at a time. Since `chunks` exposes the
chunking structure explicitly, the program can no longer accidentally break the chunking structure (unless
`flattenChunks` is also used).

```scala
def f(a: A): ZIO[R, E, B]
def g(b: B): ZIO[R, E, C]

stream // ZStream[R, E, A]
.chunks // ZStream[R, E, Chunk[A]]
.mapZIO { chunk => ZIO.foreach(chunk)(f) } // ZStream[R, E, Chunk[B]]
.mapZIO { chunk => ZIO.foreach(chunk)(g) } // ZStream[R, E, Chunk[C]]
.flattenChunks // ZStream[R, E, C]
```

0 comments on commit 9cc273a

Please sign in to comment.