diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index d772f9088..da5023b23 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -9,7 +9,7 @@ import zio.kafka.security.KafkaCredentialStore /** * Settings for the consumer. * - * To stay source compatible with future releases, you are recommended to construct the settings as follows: + * Construct the settings as follows: * {{{ * ConsumerSettings(bootstrapServers) * .withGroupId(groupId) @@ -20,17 +20,17 @@ import zio.kafka.security.KafkaCredentialStore * @param bootstrapServers * the Kafka bootstrap servers */ -final case class ConsumerSettings( +final case class ConsumerSettings private ( bootstrapServers: List[String], - properties: Map[String, AnyRef] = Map.empty, - closeTimeout: Duration = 30.seconds, - pollTimeout: Duration = 50.millis, - commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, - offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), - rebalanceListener: RebalanceListener = RebalanceListener.noop, - restartStreamOnRebalancing: Boolean = false, - runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout, - fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() + properties: Map[String, AnyRef], + closeTimeout: Duration, + pollTimeout: Duration, + commitTimeout: Duration, + offsetRetrieval: OffsetRetrieval, + rebalanceListener: RebalanceListener, + restartStreamOnRebalancing: Boolean, + runloopTimeout: Duration, + fetchStrategy: FetchStrategy ) { private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { case OffsetRetrieval.Auto(reset) => Map(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> reset.toConfig) @@ -138,6 +138,27 @@ final case class ConsumerSettings( } object ConsumerSettings { + + /** + * Consumer settings with all defaults. + * + * @param bootstrapServers + * the Kafka bootstrap servers + */ + def apply(bootstrapServers: List[String]): ConsumerSettings = + ConsumerSettings( + bootstrapServers, + properties = Map.empty, + closeTimeout = 30.seconds, + pollTimeout = 50.millis, + commitTimeout = ConsumerSettings.defaultCommitTimeout, + offsetRetrieval = OffsetRetrieval.Auto(), + rebalanceListener = RebalanceListener.noop, + restartStreamOnRebalancing = false, + runloopTimeout = ConsumerSettings.defaultRunloopTimeout, + fetchStrategy = QueueSizeBasedFetchStrategy() + ) + val defaultRunloopTimeout: Duration = 4.minutes val defaultCommitTimeout: Duration = 15.seconds }