From 42b5401d7772ff1149566a0b0333e4c448efa919 Mon Sep 17 00:00:00 2001 From: svroonland Date: Sun, 3 Nov 2024 11:06:25 +0100 Subject: [PATCH] Remove attemptBlocking from AdminClient (#1359) Fixes #492 --- .../scala/zio/kafka/admin/AdminClient.scala | 66 +++++++++---------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala index 1d408a0ef..2af993639 100644 --- a/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala +++ b/zio-kafka/src/main/scala/zio/kafka/admin/AdminClient.scala @@ -353,7 +353,7 @@ object AdminClient { val asJava = newTopics.map(_.asJava).asJavaCollection fromKafkaFutureVoid { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.createTopics(asJava))(opts => adminClient.createTopics(asJava, opts.asJava)) .all() @@ -376,7 +376,7 @@ object AdminClient { ): Task[Unit] = { val asJava = groupIds.asJavaCollection fromKafkaFutureVoid { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.deleteConsumerGroups(asJava))(opts => adminClient.deleteConsumerGroups(asJava, opts.asJava) @@ -395,7 +395,7 @@ object AdminClient { ): Task[Unit] = { val asJava = topics.asJavaCollection fromKafkaFutureVoid { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.deleteTopics(asJava))(opts => adminClient.deleteTopics(asJava, opts.asJava)) .all() @@ -418,7 +418,7 @@ object AdminClient { ): Task[Unit] = { val records = recordsToDelete.map { case (k, v) => k.asJava -> v }.asJava fromKafkaFutureVoid { - ZIO.attemptBlocking( + ZIO.attempt( deleteRecordsOptions .fold(adminClient.deleteRecords(records))(opts => adminClient.deleteRecords(records, opts.asJava)) .all() @@ -431,7 +431,7 @@ object AdminClient { */ override def listTopics(listTopicsOptions: Option[ListTopicsOptions] = None): Task[Map[String, TopicListing]] = fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( listTopicsOptions .fold(adminClient.listTopics())(opts => adminClient.listTopics(opts.asJava)) .namesToListings() @@ -447,7 +447,7 @@ object AdminClient { ): Task[Map[String, TopicDescription]] = { val asJava = topicNames.asJavaCollection fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.describeTopics(asJava))(opts => adminClient.describeTopics(asJava, opts.asJava)) .allTopicNames() @@ -469,7 +469,7 @@ object AdminClient { ): Task[Map[ConfigResource, KafkaConfig]] = { val asJava = configResources.map(_.asJava).asJavaCollection fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.describeConfigs(asJava))(opts => adminClient.describeConfigs(asJava, opts.asJava)) .all() @@ -490,7 +490,7 @@ object AdminClient { ): Task[Map[ConfigResource, Task[KafkaConfig]]] = { val asJava = configResources.map(_.asJava).asJavaCollection ZIO - .attemptBlocking( + .attempt( options .fold(adminClient.describeConfigs(asJava))(opts => adminClient.describeConfigs(asJava, opts.asJava)) .values() @@ -509,7 +509,7 @@ object AdminClient { } private def describeCluster(options: Option[DescribeClusterOptions]): Task[DescribeClusterResult] = - ZIO.attemptBlocking( + ZIO.attempt( options.fold(adminClient.describeCluster())(opts => adminClient.describeCluster(opts.asJava)) ) @@ -568,7 +568,7 @@ object AdminClient { ): Task[Unit] = { val asJava = newPartitions.map { case (k, v) => k -> v.asJava }.asJava fromKafkaFutureVoid { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.createPartitions(asJava))(opts => adminClient.createPartitions(asJava, opts.asJava)) .all() @@ -585,7 +585,7 @@ object AdminClient { ): Task[Map[TopicPartition, ListOffsetsResultInfo]] = { val asJava = topicPartitionOffsets.bimap(_.asJava, _.asJava).asJava fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.listOffsets(asJava))(opts => adminClient.listOffsets(asJava, opts.asJava)) .all() @@ -603,7 +603,7 @@ object AdminClient { val topicPartitionOffsetsAsJava = topicPartitionOffsets.bimap(_.asJava, _.asJava) val topicPartitionsAsJava = topicPartitionOffsetsAsJava.keySet val asJava = topicPartitionOffsetsAsJava.asJava - ZIO.attemptBlocking { + ZIO.attempt { val listOffsetsResult = options .fold(adminClient.listOffsets(asJava))(opts => adminClient.listOffsets(asJava, opts.asJava)) topicPartitionsAsJava.map(tp => tp -> listOffsetsResult.partitionResult(tp)) @@ -625,7 +625,7 @@ object AdminClient { options: Option[ListConsumerGroupOffsetsOptions] = None ): Task[Map[TopicPartition, OffsetAndMetadata]] = fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.listConsumerGroupOffsets(groupId))(opts => adminClient.listConsumerGroupOffsets(groupId, opts.asJava) @@ -642,7 +642,7 @@ object AdminClient { groupSpecs: Map[String, ListConsumerGroupOffsetsSpec] ): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( adminClient .listConsumerGroupOffsets(groupSpecs.map { case (groupId, offsetsSpec) => (groupId, offsetsSpec.asJava) @@ -663,7 +663,7 @@ object AdminClient { options: ListConsumerGroupOffsetsOptions ): Task[Map[String, Map[TopicPartition, OffsetAndMetadata]]] = fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( adminClient .listConsumerGroupOffsets( groupSpecs.map { case (groupId, offsetsSpec) => (groupId, offsetsSpec.asJava) }.asJava, @@ -690,7 +690,7 @@ object AdminClient { ): Task[Unit] = { val asJava = offsets.bimap(_.asJava, _.asJava).asJava fromKafkaFutureVoid { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.alterConsumerGroupOffsets(groupId, asJava))(opts => adminClient.alterConsumerGroupOffsets(groupId, asJava, opts.asJava) @@ -704,7 +704,7 @@ object AdminClient { * Retrieves metrics for the underlying AdminClient */ override def metrics: Task[Map[MetricName, Metric]] = - ZIO.attemptBlocking( + ZIO.attempt( adminClient.metrics().asScala.toMap.map { case (metricName, metric) => (MetricName(metricName), Metric(metric)) } @@ -717,7 +717,7 @@ object AdminClient { options: Option[ListConsumerGroupsOptions] = None ): Task[List[ConsumerGroupListing]] = fromKafkaFuture { - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.listConsumerGroups())(opts => adminClient.listConsumerGroups(opts.asJava)) .all() @@ -738,7 +738,7 @@ object AdminClient { options: Option[DescribeConsumerGroupsOptions] ): Task[Map[String, ConsumerGroupDescription]] = fromKafkaFuture( - ZIO.attemptBlocking( + ZIO.attempt( options .fold(adminClient.describeConsumerGroups(groupIds.asJavaCollection))(opts => adminClient.describeConsumerGroups(groupIds.asJavaCollection, opts.asJava) @@ -755,7 +755,7 @@ object AdminClient { membersToRemove.map(new MemberToRemove(_)).asJavaCollection ) fromKafkaFuture( - ZIO.attemptBlocking( + ZIO.attempt( adminClient.removeMembersFromConsumerGroup(groupId, options).all() ) ).unit @@ -767,7 +767,7 @@ object AdminClient { override def removeMembersFromConsumerGroup(groupId: String): Task[Unit] = { val options = new RemoveMembersFromConsumerGroupOptions() fromKafkaFuture( - ZIO.attemptBlocking( + ZIO.attempt( adminClient.removeMembersFromConsumerGroup(groupId, options).all() ) ).unit @@ -777,7 +777,7 @@ object AdminClient { brokersId: Iterable[Int] ): ZIO[Any, Throwable, Map[Int, Map[String, LogDirDescription]]] = fromKafkaFuture( - ZIO.attemptBlocking( + ZIO.attempt( adminClient.describeLogDirs(brokersId.map(Int.box).asJavaCollection).allDescriptions() ) ).map { @@ -791,7 +791,7 @@ object AdminClient { brokersId: Iterable[Int] ): ZIO[Any, Throwable, Map[Int, Task[Map[String, LogDirDescription]]]] = ZIO - .attemptBlocking( + .attempt( adminClient.describeLogDirs(brokersId.map(Int.box).asJavaCollection).descriptions() ) .map { @@ -811,7 +811,7 @@ object AdminClient { ): Task[Unit] = fromKafkaFutureVoid( ZIO - .attemptBlocking( + .attempt( adminClient .incrementalAlterConfigs( configs.map { case (configResource, alterConfigOps) => @@ -828,7 +828,7 @@ object AdminClient { options: AlterConfigsOptions ): Task[Map[ConfigResource, Task[Unit]]] = ZIO - .attemptBlocking( + .attempt( adminClient .incrementalAlterConfigs( configs.map { case (configResource, alterConfigOps) => @@ -846,7 +846,7 @@ object AdminClient { override def alterConfigs(configs: Map[ConfigResource, KafkaConfig], options: AlterConfigsOptions): Task[Unit] = fromKafkaFutureVoid( ZIO - .attemptBlocking( + .attempt( adminClient .alterConfigs( configs.map { case (configResource, kafkaConfig) => @@ -864,7 +864,7 @@ object AdminClient { options: AlterConfigsOptions ): Task[Map[ConfigResource, Task[Unit]]] = ZIO - .attemptBlocking( + .attempt( adminClient .alterConfigs( configs.map { case (configResource, kafkaConfig) => @@ -884,7 +884,7 @@ object AdminClient { ): Task[Set[AclBinding]] = fromKafkaFuture( ZIO - .attemptBlocking( + .attempt( options .fold(adminClient.describeAcls(filter.asJava))(opt => adminClient.describeAcls(filter.asJava, opt.asJava)) .values() @@ -894,7 +894,7 @@ object AdminClient { override def createAcls(acls: Set[AclBinding], options: Option[CreateAclOptions]): Task[Unit] = fromKafkaFutureVoid( ZIO - .attemptBlocking( + .attempt( options .fold(adminClient.createAcls(acls.map(_.asJava).asJava))(opt => adminClient.createAcls(acls.map(_.asJava).asJava, opt.asJava) @@ -908,7 +908,7 @@ object AdminClient { options: Option[CreateAclOptions] ): Task[Map[AclBinding, Task[Unit]]] = ZIO - .attemptBlocking( + .attempt( options .fold(adminClient.createAcls(acls.map(_.asJava).asJava))(opt => adminClient.createAcls(acls.map(_.asJava).asJava, opt.asJava) @@ -922,7 +922,7 @@ object AdminClient { override def deleteAcls(filters: Set[AclBindingFilter], options: Option[DeleteAclsOptions]): Task[Set[AclBinding]] = fromKafkaFuture( ZIO - .attemptBlocking( + .attempt( options .fold(adminClient.deleteAcls(filters.map(_.asJava).asJava))(opt => adminClient.deleteAcls(filters.map(_.asJava).asJava, opt.asJava) @@ -936,7 +936,7 @@ object AdminClient { options: Option[DeleteAclsOptions] ): Task[Map[AclBindingFilter, Task[Map[AclBinding, Option[Throwable]]]]] = ZIO - .attemptBlocking( + .attempt( options .fold(adminClient.deleteAcls(filters.map(_.asJava).asJava))(opt => adminClient.deleteAcls(filters.map(_.asJava).asJava, opt.asJava) @@ -1548,7 +1548,7 @@ object AdminClient { .validateEndpoint(settings.driverSettings) endpointCheck *> ZIO.attempt(JAdmin.create(settings.driverSettings.asJava)) - }(client => ZIO.attemptBlocking(client.close(settings.closeTimeout)).orDie) + }(client => ZIO.attempt(client.close(settings.closeTimeout)).orDie) implicit final class MapOps[K1, V1](private val v: Map[K1, V1]) extends AnyVal { def bimap[K2, V2](fk: K1 => K2, fv: V1 => V2): Map[K2, V2] = v.map { case (k, v) => fk(k) -> fv(v) }