Skip to content

Commit

Permalink
Implement 'produce' aliases in the Producer trait, making producer co…
Browse files Browse the repository at this point in the history
…mposition simpler
  • Loading branch information
soujiro32167 committed Sep 12, 2023
1 parent 6cddf50 commit 95b921c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 94 deletions.
124 changes: 30 additions & 94 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ trait Producer {
*/
def produce(
record: ProducerRecord[Array[Byte], Array[Byte]]
): Task[RecordMetadata]
): Task[RecordMetadata] =
produceAsync(record).flatten

/**
* Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version
Expand All @@ -30,7 +31,8 @@ trait Producer {
record: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, RecordMetadata]
): RIO[R, RecordMetadata] =
produceAsync(record, keySerializer, valueSerializer).flatten

/**
* Produces a single record and await broker acknowledgement. See [[produceAsync[R,K,V](topic:String*]] for version
Expand All @@ -42,7 +44,8 @@ trait Producer {
value: V,
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, RecordMetadata]
): RIO[R, RecordMetadata] =
produce(new ProducerRecord(topic, key, value), keySerializer, valueSerializer)

/**
* A stream pipeline that produces all records from the stream.
Expand All @@ -63,9 +66,11 @@ trait Producer {
* and await all of their acknowledgements at once. That amortizes the cost of sending requests to Kafka and increases
* throughput. See [[produce[R,K,V](record*]] for version that awaits broker acknowledgement.
*/
// noinspection YieldingZIOEffectInspection
def produceAsync(
record: ProducerRecord[Array[Byte], Array[Byte]]
): Task[Task[RecordMetadata]]
): Task[Task[RecordMetadata]] =
produceChunkAsyncWithFailures(Chunk.single(record)).map(done => done.flatMap(c => ZIO.fromEither(c.head)))

/**
* Produces a single record. The effect returned from this method has two layers and describes the completion of two
Expand All @@ -81,7 +86,8 @@ trait Producer {
record: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Task[RecordMetadata]]
): RIO[R, Task[RecordMetadata]] =
serialize(record, keySerializer, valueSerializer).flatMap(produceAsync)

/**
* Produces a single record. The effect returned from this method has two layers and describes the completion of two
Expand All @@ -99,15 +105,17 @@ trait Producer {
value: V,
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Task[RecordMetadata]]
): RIO[R, Task[RecordMetadata]] =
produceAsync(new ProducerRecord(topic, key, value), keySerializer, valueSerializer)

/**
* Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time
* penalty for each chunk.
*/
def produceChunk(
records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]
): Task[Chunk[RecordMetadata]]
): Task[Chunk[RecordMetadata]] =
produceChunkAsync(records).flatten

/**
* Produces a chunk of records. See [[produceChunkAsync(records*]] for version that allows to avoid round-trip-time
Expand All @@ -117,7 +125,8 @@ trait Producer {
records: Chunk[ProducerRecord[K, V]],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Chunk[RecordMetadata]]
): RIO[R, Chunk[RecordMetadata]] =
produceChunkAsync(records, keySerializer, valueSerializer).flatten

/**
* Produces a chunk of records. The effect returned from this method has two layers and describes the completion of
Expand All @@ -129,9 +138,17 @@ trait Producer {
* the transmission of part of the chunk. Regardless, awaiting the inner layer guarantees the transmission of the
* entire chunk.
*/
// noinspection YieldingZIOEffectInspection
def produceChunkAsync(
records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]
): Task[Task[Chunk[RecordMetadata]]]
): Task[Task[Chunk[RecordMetadata]]] =
produceChunkAsyncWithFailures(records).map(_.flatMap { chunkResults =>
val (errors, success) = chunkResults.partitionMap(identity)
errors.headOption match {
case Some(error) => ZIO.fail(error) // Only the first failure is returned.
case None => ZIO.succeed(success)
}
})

/**
* Produces a chunk of records. The effect returned from this method has two layers and describes the completion of
Expand All @@ -147,7 +164,10 @@ trait Producer {
records: Chunk[ProducerRecord[K, V]],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Task[Chunk[RecordMetadata]]]
): RIO[R, Task[Chunk[RecordMetadata]]] =
ZIO
.foreach(records)(serialize(_, keySerializer, valueSerializer))
.flatMap(produceChunkAsync)

/**
* Produces a chunk of records. The effect returned from this method has two layers and describes the completion of
Expand Down Expand Up @@ -346,7 +366,6 @@ object Producer {
* Accessor method
*/
val metrics: RIO[Producer, Map[MetricName, Metric]] = ZIO.serviceWithZIO(_.metrics)

}

private[producer] final class ProducerLive(
Expand All @@ -355,79 +374,6 @@ private[producer] final class ProducerLive(
sendQueue: Queue[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])]
) extends Producer {

override def produce(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[RecordMetadata] =
produceAsync(record).flatten

override def produce[R, K, V](
record: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, RecordMetadata] =
produceAsync(record, keySerializer, valueSerializer).flatten

override def produce[R, K, V](
topic: String,
key: K,
value: V,
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, RecordMetadata] =
produce(new ProducerRecord(topic, key, value), keySerializer, valueSerializer)

// noinspection YieldingZIOEffectInspection
override def produceAsync(record: ProducerRecord[Array[Byte], Array[Byte]]): Task[Task[RecordMetadata]] =
for {
done <- Promise.make[Nothing, Chunk[Either[Throwable, RecordMetadata]]]
_ <- sendQueue.offer((Chunk.single(record), done))
} yield done.await.flatMap(result => ZIO.fromEither(result.head))

override def produceAsync[R, K, V](
record: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Task[RecordMetadata]] =
serialize(record, keySerializer, valueSerializer).flatMap(produceAsync)

override def produceAsync[R, K, V](
topic: String,
key: K,
value: V,
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Task[RecordMetadata]] =
produceAsync(new ProducerRecord(topic, key, value), keySerializer, valueSerializer)

override def produceChunk(records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]): Task[Chunk[RecordMetadata]] =
produceChunkAsync(records).flatten

override def produceChunk[R, K, V](
records: Chunk[ProducerRecord[K, V]],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Chunk[RecordMetadata]] =
produceChunkAsync(records, keySerializer, valueSerializer).flatten

// noinspection YieldingZIOEffectInspection
override def produceChunkAsync(
records: Chunk[ProducerRecord[Array[Byte], Array[Byte]]]
): Task[Task[Chunk[RecordMetadata]]] =
produceChunkAsyncWithFailures(records).map(_.flatMap { chunkResults =>
val (errors, success) = chunkResults.partitionMap(identity)
errors.headOption match {
case Some(error) => ZIO.fail(error) // Only the first failure is returned.
case None => ZIO.succeed(success)
}
})

override def produceChunkAsync[R, K, V](
records: Chunk[ProducerRecord[K, V]],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, Task[Chunk[RecordMetadata]]] =
ZIO
.foreach(records)(serialize(_, keySerializer, valueSerializer))
.flatMap(produceChunkAsync)

// noinspection YieldingZIOEffectInspection
override def produceChunkAsyncWithFailures(
records: Chunk[ByteRecord]
Expand Down Expand Up @@ -492,16 +438,6 @@ private[producer] final class ProducerLive(
}
.runDrain

private def serialize[R, K, V](
r: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, ByteRecord] =
for {
key <- keySerializer.serialize(r.topic, r.headers, r.key())
value <- valueSerializer.serialize(r.topic, r.headers, r.value())
} yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers)

/** Used to prevent warnings about not using the result of an expression. */
@inline private def exec[A](f: => A): Unit = { val _ = f }
}
12 changes: 12 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/producer/package.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package zio.kafka

import org.apache.kafka.clients.producer.ProducerRecord
import zio.RIO
import zio.kafka.serde.Serializer

package object producer {
type ByteRecord = ProducerRecord[Array[Byte], Array[Byte]]

def serialize[R, K, V](
r: ProducerRecord[K, V],
keySerializer: Serializer[R, K],
valueSerializer: Serializer[R, V]
): RIO[R, ByteRecord] =
for {
key <- keySerializer.serialize(r.topic, r.headers, r.key())
value <- valueSerializer.serialize(r.topic, r.headers, r.value())
} yield new ProducerRecord(r.topic, r.partition(), r.timestamp(), key, value, r.headers)
}

0 comments on commit 95b921c

Please sign in to comment.