Skip to content

Commit

Permalink
Remove attemptBlocking from AdminClient (#1359)
Browse files Browse the repository at this point in the history
Fixes #492
  • Loading branch information
svroonland authored Nov 3, 2024
1 parent d59e8f6 commit 42b5401
Showing 1 changed file with 33 additions and 33 deletions.
66 changes: 33 additions & 33 deletions zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ object AdminClient {
val asJava = newTopics.map(_.asJava).asJavaCollection

fromKafkaFutureVoid {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.createTopics(asJava))(opts => adminClient.createTopics(asJava, opts.asJava))
.all()
Expand All @@ -376,7 +376,7 @@ object AdminClient {
): Task[Unit] = {
val asJava = groupIds.asJavaCollection
fromKafkaFutureVoid {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.deleteConsumerGroups(asJava))(opts =>
adminClient.deleteConsumerGroups(asJava, opts.asJava)
Expand All @@ -395,7 +395,7 @@ object AdminClient {
): Task[Unit] = {
val asJava = topics.asJavaCollection
fromKafkaFutureVoid {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.deleteTopics(asJava))(opts => adminClient.deleteTopics(asJava, opts.asJava))
.all()
Expand All @@ -418,7 +418,7 @@ object AdminClient {
): Task[Unit] = {
val records = recordsToDelete.map { case (k, v) => k.asJava -> v }.asJava
fromKafkaFutureVoid {
ZIO.attemptBlocking(
ZIO.attempt(
deleteRecordsOptions
.fold(adminClient.deleteRecords(records))(opts => adminClient.deleteRecords(records, opts.asJava))
.all()
Expand All @@ -431,7 +431,7 @@ object AdminClient {
*/
override def listTopics(listTopicsOptions: Option[ListTopicsOptions] = None): Task[Map[String, TopicListing]] =
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
listTopicsOptions
.fold(adminClient.listTopics())(opts => adminClient.listTopics(opts.asJava))
.namesToListings()
Expand All @@ -447,7 +447,7 @@ object AdminClient {
): Task[Map[String, TopicDescription]] = {
val asJava = topicNames.asJavaCollection
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.describeTopics(asJava))(opts => adminClient.describeTopics(asJava, opts.asJava))
.allTopicNames()
Expand All @@ -469,7 +469,7 @@ object AdminClient {
): Task[Map[ConfigResource, KafkaConfig]] = {
val asJava = configResources.map(_.asJava).asJavaCollection
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.describeConfigs(asJava))(opts => adminClient.describeConfigs(asJava, opts.asJava))
.all()
Expand All @@ -490,7 +490,7 @@ object AdminClient {
): Task[Map[ConfigResource, Task[KafkaConfig]]] = {
val asJava = configResources.map(_.asJava).asJavaCollection
ZIO
.attemptBlocking(
.attempt(
options
.fold(adminClient.describeConfigs(asJava))(opts => adminClient.describeConfigs(asJava, opts.asJava))
.values()
Expand All @@ -509,7 +509,7 @@ object AdminClient {
}

private def describeCluster(options: Option[DescribeClusterOptions]): Task[DescribeClusterResult] =
ZIO.attemptBlocking(
ZIO.attempt(
options.fold(adminClient.describeCluster())(opts => adminClient.describeCluster(opts.asJava))
)

Expand Down Expand Up @@ -568,7 +568,7 @@ object AdminClient {
): Task[Unit] = {
val asJava = newPartitions.map { case (k, v) => k -> v.asJava }.asJava
fromKafkaFutureVoid {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.createPartitions(asJava))(opts => adminClient.createPartitions(asJava, opts.asJava))
.all()
Expand All @@ -585,7 +585,7 @@ object AdminClient {
): Task[Map[TopicPartition, ListOffsetsResultInfo]] = {
val asJava = topicPartitionOffsets.bimap(_.asJava, _.asJava).asJava
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.listOffsets(asJava))(opts => adminClient.listOffsets(asJava, opts.asJava))
.all()
Expand All @@ -603,7 +603,7 @@ object AdminClient {
val topicPartitionOffsetsAsJava = topicPartitionOffsets.bimap(_.asJava, _.asJava)
val topicPartitionsAsJava = topicPartitionOffsetsAsJava.keySet
val asJava = topicPartitionOffsetsAsJava.asJava
ZIO.attemptBlocking {
ZIO.attempt {
val listOffsetsResult = options
.fold(adminClient.listOffsets(asJava))(opts => adminClient.listOffsets(asJava, opts.asJava))
topicPartitionsAsJava.map(tp => tp -> listOffsetsResult.partitionResult(tp))
Expand All @@ -625,7 +625,7 @@ object AdminClient {
options: Option[ListConsumerGroupOffsetsOptions] = None
): Task[Map[TopicPartition, OffsetAndMetadata]] =
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.listConsumerGroupOffsets(groupId))(opts =>
adminClient.listConsumerGroupOffsets(groupId, opts.asJava)
Expand All @@ -642,7 +642,7 @@ object AdminClient {
groupSpecs: Map[String, ListConsumerGroupOffsetsSpec]
): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] =
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
adminClient
.listConsumerGroupOffsets(groupSpecs.map { case (groupId, offsetsSpec) =>
(groupId, offsetsSpec.asJava)
Expand All @@ -663,7 +663,7 @@ object AdminClient {
options: ListConsumerGroupOffsetsOptions
): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] =
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
adminClient
.listConsumerGroupOffsets(
groupSpecs.map { case (groupId, offsetsSpec) => (groupId, offsetsSpec.asJava) }.asJava,
Expand All @@ -690,7 +690,7 @@ object AdminClient {
): Task[Unit] = {
val asJava = offsets.bimap(_.asJava, _.asJava).asJava
fromKafkaFutureVoid {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.alterConsumerGroupOffsets(groupId, asJava))(opts =>
adminClient.alterConsumerGroupOffsets(groupId, asJava, opts.asJava)
Expand All @@ -704,7 +704,7 @@ object AdminClient {
* Retrieves metrics for the underlying AdminClient
*/
override def metrics: Task[Map[MetricName, Metric]] =
ZIO.attemptBlocking(
ZIO.attempt(
adminClient.metrics().asScala.toMap.map { case (metricName, metric) =>
(MetricName(metricName), Metric(metric))
}
Expand All @@ -717,7 +717,7 @@ object AdminClient {
options: Option[ListConsumerGroupsOptions] = None
): Task[List[ConsumerGroupListing]] =
fromKafkaFuture {
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.listConsumerGroups())(opts => adminClient.listConsumerGroups(opts.asJava))
.all()
Expand All @@ -738,7 +738,7 @@ object AdminClient {
options: Option[DescribeConsumerGroupsOptions]
): Task[Map[String, ConsumerGroupDescription]] =
fromKafkaFuture(
ZIO.attemptBlocking(
ZIO.attempt(
options
.fold(adminClient.describeConsumerGroups(groupIds.asJavaCollection))(opts =>
adminClient.describeConsumerGroups(groupIds.asJavaCollection, opts.asJava)
Expand All @@ -755,7 +755,7 @@ object AdminClient {
membersToRemove.map(new MemberToRemove(_)).asJavaCollection
)
fromKafkaFuture(
ZIO.attemptBlocking(
ZIO.attempt(
adminClient.removeMembersFromConsumerGroup(groupId, options).all()
)
).unit
Expand All @@ -767,7 +767,7 @@ object AdminClient {
override def removeMembersFromConsumerGroup(groupId: String): Task[Unit] = {
val options = new RemoveMembersFromConsumerGroupOptions()
fromKafkaFuture(
ZIO.attemptBlocking(
ZIO.attempt(
adminClient.removeMembersFromConsumerGroup(groupId, options).all()
)
).unit
Expand All @@ -777,7 +777,7 @@ object AdminClient {
brokersId: Iterable[Int]
): ZIO[Any, Throwable, Map[Int, Map[String, LogDirDescription]]] =
fromKafkaFuture(
ZIO.attemptBlocking(
ZIO.attempt(
adminClient.describeLogDirs(brokersId.map(Int.box).asJavaCollection).allDescriptions()
)
).map {
Expand All @@ -791,7 +791,7 @@ object AdminClient {
brokersId: Iterable[Int]
): ZIO[Any, Throwable, Map[Int, Task[Map[String, LogDirDescription]]]] =
ZIO
.attemptBlocking(
.attempt(
adminClient.describeLogDirs(brokersId.map(Int.box).asJavaCollection).descriptions()
)
.map {
Expand All @@ -811,7 +811,7 @@ object AdminClient {
): Task[Unit] =
fromKafkaFutureVoid(
ZIO
.attemptBlocking(
.attempt(
adminClient
.incrementalAlterConfigs(
configs.map { case (configResource, alterConfigOps) =>
Expand All @@ -828,7 +828,7 @@ object AdminClient {
options: AlterConfigsOptions
): Task[Map[ConfigResource, Task[Unit]]] =
ZIO
.attemptBlocking(
.attempt(
adminClient
.incrementalAlterConfigs(
configs.map { case (configResource, alterConfigOps) =>
Expand All @@ -846,7 +846,7 @@ object AdminClient {
override def alterConfigs(configs: Map[ConfigResource, KafkaConfig], options: AlterConfigsOptions): Task[Unit] =
fromKafkaFutureVoid(
ZIO
.attemptBlocking(
.attempt(
adminClient
.alterConfigs(
configs.map { case (configResource, kafkaConfig) =>
Expand All @@ -864,7 +864,7 @@ object AdminClient {
options: AlterConfigsOptions
): Task[Map[ConfigResource, Task[Unit]]] =
ZIO
.attemptBlocking(
.attempt(
adminClient
.alterConfigs(
configs.map { case (configResource, kafkaConfig) =>
Expand All @@ -884,7 +884,7 @@ object AdminClient {
): Task[Set[AclBinding]] =
fromKafkaFuture(
ZIO
.attemptBlocking(
.attempt(
options
.fold(adminClient.describeAcls(filter.asJava))(opt => adminClient.describeAcls(filter.asJava, opt.asJava))
.values()
Expand All @@ -894,7 +894,7 @@ object AdminClient {
override def createAcls(acls: Set[AclBinding], options: Option[CreateAclOptions]): Task[Unit] =
fromKafkaFutureVoid(
ZIO
.attemptBlocking(
.attempt(
options
.fold(adminClient.createAcls(acls.map(_.asJava).asJava))(opt =>
adminClient.createAcls(acls.map(_.asJava).asJava, opt.asJava)
Expand All @@ -908,7 +908,7 @@ object AdminClient {
options: Option[CreateAclOptions]
): Task[Map[AclBinding, Task[Unit]]] =
ZIO
.attemptBlocking(
.attempt(
options
.fold(adminClient.createAcls(acls.map(_.asJava).asJava))(opt =>
adminClient.createAcls(acls.map(_.asJava).asJava, opt.asJava)
Expand All @@ -922,7 +922,7 @@ object AdminClient {
override def deleteAcls(filters: Set[AclBindingFilter], options: Option[DeleteAclsOptions]): Task[Set[AclBinding]] =
fromKafkaFuture(
ZIO
.attemptBlocking(
.attempt(
options
.fold(adminClient.deleteAcls(filters.map(_.asJava).asJava))(opt =>
adminClient.deleteAcls(filters.map(_.asJava).asJava, opt.asJava)
Expand All @@ -936,7 +936,7 @@ object AdminClient {
options: Option[DeleteAclsOptions]
): Task[Map[AclBindingFilter, Task[Map[AclBinding, Option[Throwable]]]]] =
ZIO
.attemptBlocking(
.attempt(
options
.fold(adminClient.deleteAcls(filters.map(_.asJava).asJava))(opt =>
adminClient.deleteAcls(filters.map(_.asJava).asJava, opt.asJava)
Expand Down Expand Up @@ -1548,7 +1548,7 @@ object AdminClient {
.validateEndpoint(settings.driverSettings)

endpointCheck *> ZIO.attempt(JAdmin.create(settings.driverSettings.asJava))
}(client => ZIO.attemptBlocking(client.close(settings.closeTimeout)).orDie)
}(client => ZIO.attempt(client.close(settings.closeTimeout)).orDie)

implicit final class MapOps[K1, V1](private val v: Map[K1, V1]) extends AnyVal {
def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map { case (k, v) => fk(k) -> fv(v) }
Expand Down

0 comments on commit 42b5401

Please sign in to comment.