Skip to content

Commit

Permalink
Refactor rabbit_channel from gen_server2 to gen_server
Browse files Browse the repository at this point in the history
  • Loading branch information
mkuratczyk committed Sep 4, 2024
1 parent 94baa7c commit 8360eae
Showing 1 changed file with 24 additions and 21 deletions.
45 changes: 24 additions & 21 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

-include("amqqueue.hrl").

-behaviour(gen_server2).
-behaviour(gen_server).

-export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2]).
Expand Down Expand Up @@ -217,6 +217,8 @@
put({Type, Key}, none)
end).

-define(HIBERNATE_AFTER, 6_000).

%%----------------------------------------------------------------------------

-export_type([channel_number/0]).
Expand Down Expand Up @@ -254,9 +256,10 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,

start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
gen_server2:start_link(
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], Opts).

-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.

Expand All @@ -282,17 +285,17 @@ do_flow(Pid, Method, Content) ->
-spec flush(pid()) -> 'ok'.

flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
gen_server:call(Pid, flush, infinity).

-spec shutdown(pid()) -> 'ok'.

shutdown(Pid) ->
gen_server2:cast(Pid, terminate).
gen_server:cast(Pid, terminate).

-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.

send_command(Pid, Msg) ->
gen_server2:cast(Pid, {command, Msg}).
gen_server:cast(Pid, {command, Msg}).


-spec deliver_reply(binary(), mc:state()) -> 'ok'.
Expand Down Expand Up @@ -324,7 +327,7 @@ deliver_reply_v1(EncodedBin, Message) ->

deliver_reply_local(Pid, Key, Message) ->
case pg_local:in_group(rabbit_channels, Pid) of
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
true -> gen_server:cast(Pid, {deliver_reply, Key, Message});
false -> ok
end.

Expand All @@ -338,7 +341,7 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
Msg = {declare_fast_reply_to, Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(Pid, Msg, infinity) end)
fun() -> gen_server:call(Pid, Msg, infinity) end)
end;
declare_fast_reply_to(_) ->
not_found.
Expand All @@ -350,7 +353,7 @@ declare_fast_reply_to_v1(EncodedBin) ->
Msg = {declare_fast_reply_to, V1Key},
rabbit_misc:with_exit_handler(
rabbit_misc:const(not_found),
fun() -> gen_server2:call(V1Pid, Msg, infinity) end);
fun() -> gen_server:call(V1Pid, Msg, infinity) end);
{error, _} ->
not_found
end.
Expand All @@ -375,7 +378,7 @@ info_keys() -> ?INFO_KEYS.
info(Pid) ->
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
try
case gen_server2:call(Pid, {info, Deadline}, Timeout) of
case gen_server:call(Pid, {info, Deadline}, Timeout) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end
Expand All @@ -390,7 +393,7 @@ info(Pid) ->
info(Pid, Items) ->
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
try
case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of
case gen_server:call(Pid, {{info, Items}, Deadline}, Timeout) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end
Expand Down Expand Up @@ -430,7 +433,7 @@ refresh_config_local() ->
_ = rabbit_misc:upmap(
fun (C) ->
try
gen_server2:call(C, refresh_config, infinity)
gen_server:call(C, refresh_config, infinity)
catch _:Reason ->
rabbit_log:error("Failed to refresh channel config "
"for channel ~tp. Reason ~tp",
Expand All @@ -444,7 +447,7 @@ refresh_interceptors() ->
_ = rabbit_misc:upmap(
fun (C) ->
try
gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
gen_server:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
catch _:Reason ->
rabbit_log:error("Failed to refresh channel interceptors "
"for channel ~tp. Reason ~tp",
Expand All @@ -465,11 +468,11 @@ ready_for_close(Pid) ->
% This event is necessary for the stats timer to be initialized with
% the correct values once the management agent has started
force_event_refresh(Ref) ->
[gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
[gen_server:cast(C, {force_event_refresh, Ref}) || C <- list()],
ok.

list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
gen_server:call(Pid, list_queue_states).

-spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}.

Expand All @@ -485,6 +488,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
?LG_PROCESS_TYPE(channel),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
Expand Down Expand Up @@ -549,8 +553,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
fun() -> emit_stats(State2) end),
put_operation_timeout(),
State3 = init_tick_timer(State2),
{ok, State3, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
{ok, State3}.

prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
Expand Down Expand Up @@ -726,7 +729,7 @@ handle_info(emit_stats, State) ->
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
%% NB: don't call noreply/1 since we don't want to kick off the
%% stats timer.
{noreply, send_confirms_and_nacks(State1), hibernate};
{noreply, send_confirms_and_nacks(State1)};

handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason},
#ch{queue_states = QStates0} = State0) ->
Expand Down Expand Up @@ -821,14 +824,14 @@ get_consumer_timeout() ->
end.
%%---------------------------------------------------------------------------

reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
reply(Reply, NewState) -> {reply, Reply, next_state(NewState)}.

noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
noreply(NewState) -> {noreply, next_state(NewState)}.

next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).

noreply_coalesce(#ch{confirmed = [], rejected = []} = State) ->
{noreply, ensure_stats_timer(State), hibernate};
{noreply, ensure_stats_timer(State)};
noreply_coalesce(#ch{} = State) ->
% Immediately process 'timeout' info message
{noreply, ensure_stats_timer(State), 0}.
Expand Down

0 comments on commit 8360eae

Please sign in to comment.