Skip to content

Commit

Permalink
Put back the ProducerSettings::driverSettings and `AdminClientSetti…
Browse files Browse the repository at this point in the history
…ngs::driverSettings` methods to avoid a useless breaking change and to provide a more unified interface through all the settings case classes (#1057)
  • Loading branch information
guizmaii authored Sep 27, 2023
1 parent 9e745f7 commit 95c29dd
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ object SslHelperSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

if (isValidAddress) SocketChannel.open(address)
else throw new java.net.ConnectException("Connection refused")
}(settingsWithDownNode.properties)
}(settingsWithDownNode.driverSettings)
} yield result
).provide(Kafka.embedded)

Expand All @@ -171,7 +171,7 @@ object SslHelperSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {

if (isValidAddress) SocketChannel.open(address)
else throw new java.net.ConnectException("Connection refused")
}(settingsWithDownNode.properties)
}(settingsWithDownNode.driverSettings)
} yield result
).provide(Kafka.sslEmbedded)

Expand Down Expand Up @@ -241,7 +241,7 @@ object SslHelperSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
array(2) = e
throw e
}
}(settings.properties)
}(settings.driverSettings)

// custom assertion. More readable with a proper name.
val hasNoMessage = not(hasMessage(anything))
Expand Down Expand Up @@ -285,7 +285,7 @@ object SslHelperSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
array(2) = e
throw e
}
}(settings.properties).fork.flatMap(fiber => fiber.interrupt.delay(1.seconds))
}(settings.driverSettings).fork.flatMap(fiber => fiber.interrupt.delay(1.seconds))

// custom assertion. More readable with a proper name.
val hasNoMessage = not(hasMessage(anything))
Expand All @@ -306,7 +306,7 @@ object SslHelperSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
) @@ withLiveClock @@ sequential

implicit class SettingsHelper(adminClientSettings: AdminClientSettings) {
def bootstrapServers: List[String] = adminClientSettings.properties
def bootstrapServers: List[String] = adminClientSettings.driverSettings
.getOrElse(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "")
.toString
.split(",")
Expand Down
4 changes: 2 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1513,9 +1513,9 @@ object AdminClient {
def javaClientFromSettings(settings: AdminClientSettings): ZIO[Scope, Throwable, JAdmin] =
ZIO.acquireRelease {
val endpointCheck = SslHelper
.validateEndpoint(settings.properties)
.validateEndpoint(settings.driverSettings)

endpointCheck *> ZIO.attempt(JAdmin.create(settings.properties.asJava))
endpointCheck *> ZIO.attempt(JAdmin.create(settings.driverSettings.asJava))
}(client => ZIO.attemptBlocking(client.close(settings.closeTimeout)).orDie)

implicit final class MapOps[K1, V1](private val v: Map[K1, V1]) extends AnyVal {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ final case class AdminClientSettings(
closeTimeout: Duration,
properties: Map[String, AnyRef]
) {
def driverSettings: Map[String, AnyRef] = properties

def withBootstrapServers(servers: List[String]): AdminClientSettings =
withProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers.mkString(","))

Expand Down
4 changes: 2 additions & 2 deletions zio-kafka/src/main/scala/zio/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ object Producer {

def make(settings: ProducerSettings): ZIO[Scope, Throwable, Producer] =
for {
_ <- SslHelper.validateEndpoint(settings.properties)
_ <- SslHelper.validateEndpoint(settings.driverSettings)
rawProducer <- ZIO.acquireRelease(
ZIO.attempt(
new KafkaProducer[Array[Byte], Array[Byte]](
settings.properties.asJava,
settings.driverSettings.asJava,
new ByteArraySerializer(),
new ByteArraySerializer()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ final case class ProducerSettings(
sendBufferSize: Int = 4096,
properties: Map[String, AnyRef] = Map.empty
) {
def driverSettings: Map[String, AnyRef] = properties

def withBootstrapServers(servers: List[String]): ProducerSettings =
withProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers.mkString(","))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ object TransactionalProducer {
rawProducer <- ZIO.acquireRelease(
ZIO.attempt(
new KafkaProducer[Array[Byte], Array[Byte]](
settings.producerSettings.properties.asJava,
settings.producerSettings.driverSettings.asJava,
new ByteArraySerializer(),
new ByteArraySerializer()
)
Expand Down

0 comments on commit 95c29dd

Please sign in to comment.