Skip to content

Commit

Permalink
Forward unhandled messages to optional subscriber handle_info callback
Browse files Browse the repository at this point in the history
commit message
  • Loading branch information
urmastalimaa committed Jun 6, 2024
1 parent c34fa87 commit b81af2f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 2 deletions.
8 changes: 7 additions & 1 deletion src/brod_group_subscriber_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-include("brod_int.hrl").

%% brod_topic_subscriber callbacks
-export([init/2, handle_message/3, terminate/2]).
-export([init/2, handle_message/3, handle_info/2, terminate/2]).

-type start_options() ::
#{ group_id := brod:group_id()
Expand Down Expand Up @@ -91,6 +91,12 @@ handle_message(_Partition, Msg, State) ->
{ok, NewState}
end.

handle_info(Info, #state{cb_module = CbModule , cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], ok),
{noreply, State}.

terminate(Reason, #state{cb_module = CbModule, cb_state = State}) ->
brod_utils:optional_callback(CbModule, terminate, [Reason, State], ok).

Expand Down
5 changes: 4 additions & 1 deletion src/brod_topic_subscriber.erl
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,10 @@ handle_info({'DOWN', _Mref, process, Pid, Reason},
%% not a consumer pid
{noreply, State}
end;
handle_info(_Info, State) ->
handle_info(Info, #state{cb_module = CbModule, cb_state = CbState} = State) ->
%% Any unhandled messages are forwarded to the callback module to
%% support arbitrary message-passing.
brod_utils:optional_callback(CbModule, handle_info, [Info, CbState], ok),
{noreply, State}.

%% @private
Expand Down
44 changes: 44 additions & 0 deletions test/brod_topic_subscriber_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
-export([ init/2
, terminate/2
, handle_message/3
, handle_info/2
]).

%% Test cases
Expand All @@ -40,6 +41,7 @@
, t_callback_crash/1
, t_begin_offset/1
, t_cb_fun/1
, t_consumer_ack_via_message_passing/1
]).

-include("brod_test_setup.hrl").
Expand Down Expand Up @@ -107,6 +109,10 @@ handle_message(Partition, Message, #state{ is_async_ack = IsAsyncAck
false -> {ok, ack, State}
end.

handle_info({ack_offset, Partition, Offset}, State) ->
ok = brod_topic_subscriber:ack(self(), Partition, Offset),
{noreply, State}.

terminate(Reason, #state{worker_id = Ref, counter = Counter}) ->
?tp(topic_subscriber_terminate,
#{ worker_id => Ref
Expand Down Expand Up @@ -184,6 +190,44 @@ t_async_acks(Config) when is_list(Config) ->
check_init_terminate(Trace)
end).

t_consumer_ack_via_message_passing(Config) when is_list(Config) ->
% Process messages one by one with no prefetch
ConsumerConfig = [ {prefetch_count, 0}
, {prefetch_bytes, 0}
, {sleep_timeout, 0}
, {max_bytes, 0}
],
Partition = 0,
SendFun =
fun(I) ->
produce({?topic, Partition}, <<I>>)
end,
?check_trace(
%% Run stage:
begin
O0 = SendFun(0),
%% Send two messages
Offset0 = SendFun(1),
_Offset1 = SendFun(2),
InitArgs = {_IsAsyncAck = true,
_ConsumerOffsets = [{0, O0}]},
{ok, SubscriberPid} =
brod:start_link_topic_subscriber(?CLIENT_ID, ?topic, ConsumerConfig,
?MODULE, InitArgs),
{ok, _} = wait_message(<<1>>),
%% ack_offset allows consumer to proceed to message 2
SubscriberPid ! {ack_offset, 0, Offset0},
{ok, _} = wait_message(<<2>>),
ok = brod_topic_subscriber:stop(SubscriberPid),
_Expected = [<<1>>, <<2>>]
end,
%% Check stage:
fun(Expected, Trace) ->
check_received_messages(Expected, Trace),
check_state_continuity(Trace),
check_init_terminate(Trace)
end).

t_begin_offset(Config) when is_list(Config) ->
ConsumerConfig = [ {prefetch_count, 100}
, {prefetch_bytes, 0} %% as discard
Expand Down

0 comments on commit b81af2f

Please sign in to comment.