Skip to content

Commit

Permalink
Prevent users from enabling auto commit (#1290)
Browse files Browse the repository at this point in the history
Zio-kafka applications always pre-fetch data so that user streams can
process the data asynchronously. This is not compatible with auto
commit. When auto commit is enabled, the consumer will automatically
commit batches _before_ they are processed by the user streams.

An unaware user might accidentally enable auto commit and lose data
during rebalances.

Solves #1289.
  • Loading branch information
erikvanoosten authored Jul 24, 2024
1 parent fd40816 commit 092de53
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package zio.kafka.consumer

import zio._
import zio.kafka.ZIOSpecDefaultSlf4j
import zio.test._
import zio.test.Assertion._

object ConsumerSettingsSpec extends ZIOSpecDefaultSlf4j {

override def spec: Spec[TestEnvironment with Scope, Throwable] =
suite("ConsumerSettingsSpec")(
test("accepts no auto.commit") {
ZIO.attempt(ConsumerSettings(List("host"))) *> assertCompletesZIO
},
test("accepts disabled auto.commit") {
ZIO.attempt(ConsumerSettings(List("host")).withProperty("enable.auto.commit", "false")) *> assertCompletesZIO
},
test("rejects auto.commit") {
val settings = ZIO.attempt(ConsumerSettings(List("host")).withProperty("enable.auto.commit", "true")).exit
assertZIO(settings)(failsWithA[IllegalArgumentException])
}
)

}
2 changes: 1 addition & 1 deletion zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ object Consumer {
/**
* Create a zio-kafka Consumer from an org.apache.kafka KafkaConsumer
*
* You are responsible for creating and closing the KafkaConsumer
* You are responsible for creating and closing the KafkaConsumer. Make sure auto.commit is disabled.
*/
def fromJavaConsumer(
javaConsumer: JConsumer[Array[Byte], Array[Byte]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ final case class ConsumerSettings(
runloopMetricsSchedule: Schedule[Any, Unit, Long] = Schedule.fixed(500.millis),
authErrorRetrySchedule: Schedule[Any, Throwable, Any] = Schedule.recurs(5) && Schedule.spaced(500.millis)
) {
// Parse booleans in a way compatible with how Kafka does this in org.apache.kafka.common.config.ConfigDef.parseType:
require(
properties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).forall(_.toString.trim.equalsIgnoreCase("false")),
"Because zio-kafka does pre-fetching, auto commit is not supported"
)

/**
* Tunes the consumer for high throughput.
Expand Down

0 comments on commit 092de53

Please sign in to comment.