From da49ac701e0ad06ce36b0ed2027817019bf5170a Mon Sep 17 00:00:00 2001 From: urmastalimaa Date: Mon, 27 May 2024 23:33:37 +0300 Subject: [PATCH] Forward unhandled messages to optional subscriber handle_info callback The motivation for adding handle_info callbacks is to allow subscriber worker processes which are spawned by brod to participate in message passing, supporting a variety of use cases utilizing async acking and committing. An example use case: * Start a group subscriber using `brod_group_subscriber_v2` * In a partition worker spawn a new process for every message under a supervisor specific to the worker's topic-partition * When the supervisor has <= N processes, ack last seen offset to fetch new messages. When the supervisor has > N processes, messages are not acked to apply backpressure * When all processes up to offset O1 have completed, commit offset O1 Allowing arbitrary message passing in the topic and group subscriber workers supports not only that use case but many others. --- src/brod_group_subscriber_worker.erl | 8 ++++- src/brod_topic_subscriber.erl | 5 +++- test/brod_topic_subscriber_SUITE.erl | 44 ++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/src/brod_group_subscriber_worker.erl b/src/brod_group_subscriber_worker.erl index 4f0c40ae..2cd60f9d 100644 --- a/src/brod_group_subscriber_worker.erl +++ b/src/brod_group_subscriber_worker.erl @@ -22,7 +22,7 @@ -include("brod_int.hrl"). %% brod_topic_subscriber callbacks --export([init/2, handle_message/3, terminate/2]). +-export([init/2, handle_message/3, handle_info/2, terminate/2]). -type start_options() :: #{ group_id := brod:group_id() @@ -91,6 +91,12 @@ handle_message(_Partition, Msg, State) -> {ok, NewState} end. +handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], ok), + {noreply, State}. + terminate(Reason, #state{cb_module = CbModule, cb_state = State}) -> brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok). diff --git a/src/brod_topic_subscriber.erl b/src/brod_topic_subscriber.erl index 08f6512d..1fb106b6 100644 --- a/src/brod_topic_subscriber.erl +++ b/src/brod_topic_subscriber.erl @@ -357,7 +357,10 @@ handle_info({'DOWN', _Mref, process, Pid, Reason}, %% not a consumer pid {noreply, State} end; -handle_info(_Info, State) -> +handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) -> + %% Any unhandled messages are forwarded to the callback module to + %% support arbitrary message-passing. + brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], ok), {noreply, State}. %% @private diff --git a/test/brod_topic_subscriber_SUITE.erl b/test/brod_topic_subscriber_SUITE.erl index 9c022a9d..ce504f2f 100644 --- a/test/brod_topic_subscriber_SUITE.erl +++ b/test/brod_topic_subscriber_SUITE.erl @@ -30,6 +30,7 @@ -export([ init/2 , terminate/2 , handle_message/3 + , handle_info/2 ]). %% Test cases @@ -40,6 +41,7 @@ , t_callback_crash/1 , t_begin_offset/1 , t_cb_fun/1 + , t_consumer_ack_via_message_passing/1 ]). -include("brod_test_setup.hrl"). @@ -107,6 +109,10 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck false -> {ok, ack, State} end. +handle_info({ack_offset, Partition, Offset}, State) -> + ok = brod_topic_subscriber:ack(self(), Partition, Offset), + {noreply, State}. + terminate(Reason, #state{worker_id = Ref, counter = Counter}) -> ?tp(topic_subscriber_terminate, #{ worker_id => Ref @@ -184,6 +190,44 @@ t_async_acks(Config) when is_list(Config) -> check_init_terminate(Trace) end). +t_consumer_ack_via_message_passing(Config) when is_list(Config) -> + % Process messages one by one with no prefetch + ConsumerConfig = [ {prefetch_count, 0} + , {prefetch_bytes, 0} + , {sleep_timeout, 0} + , {max_bytes, 0} + ], + Partition = 0, + SendFun = + fun(I) -> + produce({?topic, Partition}, <>) + end, + ?check_trace( + %% Run stage: + begin + O0 = SendFun(0), + %% Send two messages + Offset0 = SendFun(1), + _Offset1 = SendFun(2), + InitArgs = {_IsAsyncAck = true, + _ConsumerOffsets = [{0, O0}]}, + {ok, SubscriberPid} = + brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig, + ?MODULE, InitArgs), + {ok, _} = wait_message(<<1>>), + %% ack_offset allows consumer to proceed to message 2 + SubscriberPid ! {ack_offset, 0, Offset0}, + {ok, _} = wait_message(<<2>>), + ok = brod_topic_subscriber:stop(SubscriberPid), + _Expected = [<<1>>, <<2>>] + end, + %% Check stage: + fun(Expected, Trace) -> + check_received_messages(Expected, Trace), + check_state_continuity(Trace), + check_init_terminate(Trace) + end). + t_begin_offset(Config) when is_list(Config) -> ConsumerConfig = [ {prefetch_count, 100} , {prefetch_bytes, 0} %% as discard