Skip to content

Commit

Permalink
Forward unhandled messages to optional subscriber handle_info callback
Browse files Browse the repository at this point in the history
The motivation for adding handle_info callbacks is to allow subscriber
worker processes which are spawned by brod to participate in message
passing, supporting custom setups for acking and committing.

An example use case:
* Start a group subscriber using `brod_group_subscriber_v2`
* In a partition worker spawn a new process for every message under a
  supervisor specific to the worker's topic-partition
* When the supervisor has <= N processes, ack last seen offset to fetch
  new messages. When the supervisor has > N processes, messages are not
  acked to apply backpressure
* When all processes up to offset O1 have completed, commit offset O1

Allowing arbitrary message passing in the topic and group subscriber
workers supports not only that use case but many others.
  • Loading branch information
urmastalimaa committed Jun 6, 2024
1 parent c34fa87 commit bacc9ef
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
8 changes: 7 additions & 1 deletion src/brod_group_subscriber_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-include("brod_int.hrl").

%% brod_topic_subscriber callbacks
-export([init/2, handle_message/3, terminate/2]).
-export([init/2, handle_message/3, handle_info/2, terminate/2]).

-type start_options() ::
#{ group_id := brod:group_id()
Expand Down Expand Up @@ -91,6 +91,12 @@ handle_message(_Partition, Msg, State) ->
{ok, NewState}
end.

handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], ok),
{noreply, State}.

terminate(Reason, #state{cb_module = CbModule, cb_state = State}) ->
brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok).

Expand Down
5 changes: 4 additions & 1 deletion src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ handle_info({'DOWN', _Mref, process, Pid, Reason},
%% not a consumer pid
{noreply, State}
end;
handle_info(_Info, State) ->
handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], ok),
{noreply, State}.

%% @private
Expand Down
44 changes: 44 additions & 0 deletions test/brod_topic_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
-export([ init/2
, terminate/2
, handle_message/3
, handle_info/2
]).

%% Test cases
Expand All @@ -40,6 +41,7 @@
, t_callback_crash/1
, t_begin_offset/1
, t_cb_fun/1
, t_consumer_ack_via_message_passing/1
]).

-include("brod_test_setup.hrl").
Expand Down Expand Up @@ -107,6 +109,10 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck
false -> {ok, ack, State}
end.

handle_info({ack_offset, Partition, Offset}, State) ->
ok = brod_topic_subscriber:ack(self(), Partition, Offset),
{noreply, State}.

terminate(Reason, #state{worker_id = Ref, counter = Counter}) ->
?tp(topic_subscriber_terminate,
#{ worker_id => Ref
Expand Down Expand Up @@ -184,6 +190,44 @@ t_async_acks(Config) when is_list(Config) ->
check_init_terminate(Trace)
end).

t_consumer_ack_via_message_passing(Config) when is_list(Config) ->
% Process messages one by one with no prefetch
ConsumerConfig = [ {prefetch_count, 0}
, {prefetch_bytes, 0}
, {sleep_timeout, 0}
, {max_bytes, 0}
],
Partition = 0,
SendFun =
fun(I) ->
produce({?topic, Partition}, <<I>>)
end,
?check_trace(
%% Run stage:
begin
O0 = SendFun(0),
%% Send two messages
Offset0 = SendFun(1),
_Offset1 = SendFun(2),
InitArgs = {_IsAsyncAck = true,
_ConsumerOffsets = [{0, O0}]},
{ok, SubscriberPid} =
brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig,
?MODULE, InitArgs),
{ok, _} = wait_message(<<1>>),
%% ack_offset allows consumer to proceed to message 2
SubscriberPid ! {ack_offset, 0, Offset0},
{ok, _} = wait_message(<<2>>),
ok = brod_topic_subscriber:stop(SubscriberPid),
_Expected = [<<1>>, <<2>>]
end,
%% Check stage:
fun(Expected, Trace) ->
check_received_messages(Expected, Trace),
check_state_continuity(Trace),
check_init_terminate(Trace)
end).

t_begin_offset(Config) when is_list(Config) ->
ConsumerConfig = [ {prefetch_count, 100}
, {prefetch_bytes, 0} %% as discard
Expand Down

0 comments on commit bacc9ef

Please sign in to comment.