diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index f0d9be5..6171dd7 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -80,7 +80,7 @@ defmodule BroadwayKafka.BrodClient do offset_reset_policy: offset_reset_policy, begin_offset: begin_offset, group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], - fetch_config: Map.new(fetch_config || []), + fetch_config: Map.new(fetch_config), client_config: client_config, shared_client: shared_client, shared_client_id: build_shared_client_id(opts) @@ -109,10 +109,12 @@ defmodule BroadwayKafka.BrodClient do @impl true def ack(group_coordinator, generation_id, topic, partition, offset, config) do - :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) + if group_coordinator do + :brod_group_coordinator.ack(group_coordinator, generation_id, topic, partition, offset) - if group_coordinator && config.offset_commit_on_ack do - :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) + if config.offset_commit_on_ack do + :brod_group_coordinator.commit_offsets(group_coordinator, [{{topic, partition}, offset}]) + end end :ok @@ -188,7 +190,11 @@ defmodule BroadwayKafka.BrodClient do @impl true def update_topics(group_coordinator, topics) do - :brod_group_coordinator.update_topics(group_coordinator, topics) + if group_coordinator do + :brod_group_coordinator.update_topics(group_coordinator, topics) + end + + :ok end defp start_link_group_coordinator(stage_pid, client_id, callback_module, config) do diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index dda9c3e..4ae70fd 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -15,6 +15,7 @@ defmodule BroadwayKafka.KafkaClient do } @typep offset_reset_policy :: :earliest | :latest + @typep brod_group_coordinator :: pid() | nil @callback init(opts :: any) :: {:ok, config} | {:error, any} @callback setup( @@ -23,9 +24,9 @@ defmodule BroadwayKafka.KafkaClient do callback_module :: module, config ) :: - {:ok, group_coordinator :: pid} | {:error, any} + {:ok, group_coordinator :: brod_group_coordinator()} | {:error, any} @callback ack( - group_coordinator :: pid, + group_coordinator :: brod_group_coordinator(), generation_id :: integer, topic :: binary, partition :: integer, @@ -51,7 +52,7 @@ defmodule BroadwayKafka.KafkaClient do ) :: offset :: integer | no_return() - @callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok + @callback update_topics(brod_group_coordinator(), [:brod.topic()]) :: :ok @callback connected?(:brod.client()) :: boolean @callback disconnect(:brod.client()) :: :ok end