diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSettingsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSettingsSpec.scala new file mode 100644 index 000000000..eb8ee74a5 --- /dev/null +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSettingsSpec.scala @@ -0,0 +1,24 @@ +package zio.kafka.consumer + +import zio._ +import zio.kafka.ZIOSpecDefaultSlf4j +import zio.test._ +import zio.test.Assertion._ + +object ConsumerSettingsSpec extends ZIOSpecDefaultSlf4j { + + override def spec: Spec[TestEnvironment with Scope, Throwable] = + suite("ConsumerSettingsSpec")( + test("accepts no auto.commit") { + ZIO.attempt(ConsumerSettings(List("host"))) *> assertCompletesZIO + }, + test("accepts disabled auto.commit") { + ZIO.attempt(ConsumerSettings(List("host")).withProperty("enable.auto.commit", "false")) *> assertCompletesZIO + }, + test("rejects auto.commit") { + val settings = ZIO.attempt(ConsumerSettings(List("host")).withProperty("enable.auto.commit", "true")).exit + assertZIO(settings)(failsWithA[IllegalArgumentException]) + } + ) + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index c1b66da1d..776e403e8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -197,7 +197,7 @@ object Consumer { /** * Create a zio-kafka Consumer from an org.apache.kafka KafkaConsumer * - * You are responsible for creating and closing the KafkaConsumer + * You are responsible for creating and closing the KafkaConsumer. Make sure auto.commit is disabled. */ def fromJavaConsumer( javaConsumer: JConsumer[Array[Byte], Array[Byte]], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 934454132..7e5fc01b5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -33,6 +33,11 @@ final case class ConsumerSettings( runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis), authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis) ) { + // Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType: + require( + properties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).forall(_.toString.trim.equalsIgnoreCase("false")), + "Because zio-kafka does pre-fetching, auto commit is not supported" + ) /** * Tunes the consumer for high throughput.