Skip to content

Commit

Permalink
Forward unhandled messages to optional subscriber handle_info callback
Browse files Browse the repository at this point in the history
The motivation for adding handle_info callbacks is to allow subscriber
worker processes which are spawned by brod to participate in message
passing, supporting a variety of use cases utilizing async acking and
committing.

An example use case:
* Start a group subscriber using `brod_group_subscriber_v2`
* In a partition worker spawn a new process for every message under a
  supervisor specific to the worker's topic-partition
* When the supervisor has <= N processes, ack last seen offset to fetch
  new messages. When the supervisor has > N processes, messages are not
  acked to apply backpressure
* When all processes up to offset O1 have completed, commit offset O1

Allowing arbitrary message passing in the topic and group subscriber
workers supports not only that use case but many others.
  • Loading branch information
urmastalimaa committed Jun 6, 2024
1 parent 7840a60 commit 5eab124
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 3 deletions.
12 changes: 11 additions & 1 deletion src/brod_group_subscriber_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-include("brod_int.hrl").

%% brod_topic_subscriber callbacks
-export([init/2, handle_message/3, terminate/2]).
-export([init/2, handle_message/3, handle_info/2, terminate/2]).

-type start_options() ::
#{ group_id := brod:group_id()
Expand Down Expand Up @@ -91,6 +91,16 @@ handle_message(_Partition, Msg, State) ->
{ok, NewState}
end.

handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
%% Only the {noreply, State} return value is supported.
case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of
{noreply, NewCbState} ->
NewState = State#state{cb_state = NewCbState},
{noreply, NewState}
end.

terminate(Reason, #state{cb_module = CbModule, cb_state = State}) ->
brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok).

Expand Down
11 changes: 9 additions & 2 deletions src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,15 @@ handle_info({'DOWN', _Mref, process, Pid, Reason},
%% not a consumer pid
{noreply, State}
end;
handle_info(_Info, State) ->
{noreply, State}.
handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
%% Only the {noreply, State} return value is supported.
case brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], {noreply, CbState}) of
{noreply, NewCbState} ->
NewState = State#state{cb_state = NewCbState},
{noreply, NewState}
end.

%% @private
handle_call(Call, _From, State) ->
Expand Down
55 changes: 55 additions & 0 deletions test/brod_topic_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
-export([ init/2
, terminate/2
, handle_message/3
, handle_info/2
]).

%% Test cases
Expand All @@ -40,6 +41,7 @@
, t_callback_crash/1
, t_begin_offset/1
, t_cb_fun/1
, t_consumer_ack_via_message_passing/1
]).

-include("brod_test_setup.hrl").
Expand Down Expand Up @@ -107,6 +109,21 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck
false -> {ok, ack, State}
end.

handle_info({ack_offset, Partition, Offset} = Msg, #state{ counter = Counter
, worker_id = Ref
} = State0) ->
%% Participate in state continuity checks
?tp(topic_subscriber_seen_info,
#{ partition => Partition
, offset => Offset
, msg => Msg
, state => Counter
, worker_id => Ref
}),
State = State0#state{counter = Counter + 1},
ok = brod_topic_subscriber:ack(self(), Partition, Offset),
{noreply, State}.

terminate(Reason, #state{worker_id = Ref, counter = Counter}) ->
?tp(topic_subscriber_terminate,
#{ worker_id => Ref
Expand Down Expand Up @@ -184,6 +201,44 @@ t_async_acks(Config) when is_list(Config) ->
check_init_terminate(Trace)
end).

t_consumer_ack_via_message_passing(Config) when is_list(Config) ->
% Process messages one by one with no prefetch
ConsumerConfig = [ {prefetch_count, 0}
, {prefetch_bytes, 0}
, {sleep_timeout, 0}
, {max_bytes, 0}
],
Partition = 0,
SendFun =
fun(I) ->
produce({?topic, Partition}, <<I>>)
end,
?check_trace(
%% Run stage:
begin
O0 = SendFun(0),
%% Send two messages
Offset0 = SendFun(1),
_Offset1 = SendFun(2),
InitArgs = {_IsAsyncAck = true,
_ConsumerOffsets = [{0, O0}]},
{ok, SubscriberPid} =
brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig,
?MODULE, InitArgs),
{ok, _} = wait_message(<<1>>),
%% ack_offset allows consumer to proceed to message 2
SubscriberPid ! {ack_offset, 0, Offset0},
{ok, _} = wait_message(<<2>>),
ok = brod_topic_subscriber:stop(SubscriberPid),
_Expected = [<<1>>, <<2>>]
end,
%% Check stage:
fun(Expected, Trace) ->
check_received_messages(Expected, Trace),
check_state_continuity(Trace),
check_init_terminate(Trace)
end).

t_begin_offset(Config) when is_list(Config) ->
ConsumerConfig = [ {prefetch_count, 100}
, {prefetch_bytes, 0} %% as discard
Expand Down

0 comments on commit 5eab124

Please sign in to comment.