Skip to content

Commit

Permalink
feat: added new integration points with java kafka producer and consu…
Browse files Browse the repository at this point in the history
…mer that rely on interface instead of implementation
  • Loading branch information
devsprint committed Aug 30, 2023
1 parent 7bfee4c commit 4196f74
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
24 changes: 23 additions & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package zio.kafka.consumer

import org.apache.kafka.clients.consumer.{ ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetAndTimestamp }
import org.apache.kafka.clients.consumer.{
Consumer => JConsumer,
ConsumerRecord,
KafkaConsumer,
OffsetAndMetadata,
OffsetAndTimestamp
}
import org.apache.kafka.common._
import zio._
import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization
Expand Down Expand Up @@ -197,6 +203,22 @@ object Consumer {
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
* Create a zio-kafka Consumer from an org.apache.kafka Consumer
*
* You are responsible for creating and closing the Consumer
*/
def fromJavaConsumer(
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess <- ConsumerAccess.make(javaConsumer)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
* Accessor method
*/
Expand Down
19 changes: 19 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,25 @@ object Producer {
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped
} yield producer

/**
* Create a zio-kafka Producer from an existing org.apache.kafka Producer
*
* You are responsible for creating and closing the Producer
*/
def fromJavaProducer(
javaProducer: JProducer[Array[Byte], Array[Byte]],
settings: ProducerSettings
): ZIO[Scope, Throwable, Producer] =
for {
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.sendBufferSize
)
producer = new ProducerLive(javaProducer, runtime, sendQueue)
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped
} yield producer

/**
* Accessor method
*/
Expand Down

0 comments on commit 4196f74

Please sign in to comment.