Skip to content

Commit

Permalink
chore: updated existing implementation to rely on interface instead o…
Browse files Browse the repository at this point in the history
…f implementation
  • Loading branch information
devsprint committed Aug 30, 2023
1 parent 4196f74 commit b956d3f
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 36 deletions.
17 changes: 0 additions & 17 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package zio.kafka.consumer
import org.apache.kafka.clients.consumer.{
Consumer => JConsumer,
ConsumerRecord,
KafkaConsumer,
OffsetAndMetadata,
OffsetAndTimestamp
}
Expand Down Expand Up @@ -192,22 +191,6 @@ object Consumer {
*
* You are responsible for creating and closing the KafkaConsumer
*/
def fromJavaConsumer(
javaConsumer: KafkaConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
diagnostics: Diagnostics = Diagnostics.NoOp
): ZIO[Scope, Throwable, Consumer] =
for {
_ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized))
consumerAccess <- ConsumerAccess.make(javaConsumer)
runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics)
} yield new ConsumerLive(consumerAccess, runloopAccess)

/**
* Create a zio-kafka Consumer from an org.apache.kafka Consumer
*
* You are responsible for creating and closing the Consumer
*/
def fromJavaConsumer(
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
settings: ConsumerSettings,
Expand Down
19 changes: 0 additions & 19 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,25 +211,6 @@ object Producer {
*
* You are responsible for creating and closing the KafkaProducer
*/
def fromJavaProducer(
javaProducer: KafkaProducer[Array[Byte], Array[Byte]],
settings: ProducerSettings
): ZIO[Scope, Throwable, Producer] =
for {
runtime <- ZIO.runtime[Any]
sendQueue <-
Queue.bounded[(Chunk[ByteRecord], Promise[Nothing, Chunk[Either[Throwable, RecordMetadata]]])](
settings.sendBufferSize
)
producer = new ProducerLive(javaProducer, runtime, sendQueue)
_ <- ZIO.blocking(producer.sendFromQueue).forkScoped
} yield producer

/**
* Create a zio-kafka Producer from an existing org.apache.kafka Producer
*
* You are responsible for creating and closing the Producer
*/
def fromJavaProducer(
javaProducer: JProducer[Array[Byte], Array[Byte]],
settings: ProducerSettings
Expand Down

0 comments on commit b956d3f

Please sign in to comment.