From 8360eae895c03a7ce4e5693738d67521aced6601 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 4 Sep 2024 12:58:36 +0200 Subject: [PATCH] Refactor rabbit_channel from gen_server2 to gen_server --- deps/rabbit/src/rabbit_channel.erl | 45 ++++++++++++++++-------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 908892781574..6473a662ea31 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -42,7 +42,7 @@ -include("amqqueue.hrl"). --behaviour(gen_server2). +-behaviour(gen_server). -export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2]). @@ -217,6 +217,8 @@ put({Type, Key}, none) end). +-define(HIBERNATE_AFTER, 6_000). + %%---------------------------------------------------------------------------- -export_type([channel_number/0]). @@ -254,9 +256,10 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams) -> - gen_server2:start_link( + Opts = [{hibernate_after, ?HIBERNATE_AFTER}], + gen_server:start_link( ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, - User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []). + User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], Opts). -spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. @@ -282,17 +285,17 @@ do_flow(Pid, Method, Content) -> -spec flush(pid()) -> 'ok'. flush(Pid) -> - gen_server2:call(Pid, flush, infinity). + gen_server:call(Pid, flush, infinity). -spec shutdown(pid()) -> 'ok'. shutdown(Pid) -> - gen_server2:cast(Pid, terminate). + gen_server:cast(Pid, terminate). -spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'. send_command(Pid, Msg) -> - gen_server2:cast(Pid, {command, Msg}). + gen_server:cast(Pid, {command, Msg}). -spec deliver_reply(binary(), mc:state()) -> 'ok'. @@ -324,7 +327,7 @@ deliver_reply_v1(EncodedBin, Message) -> deliver_reply_local(Pid, Key, Message) -> case pg_local:in_group(rabbit_channels, Pid) of - true -> gen_server2:cast(Pid, {deliver_reply, Key, Message}); + true -> gen_server:cast(Pid, {deliver_reply, Key, Message}); false -> ok end. @@ -338,7 +341,7 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) -> Msg = {declare_fast_reply_to, Key}, rabbit_misc:with_exit_handler( rabbit_misc:const(not_found), - fun() -> gen_server2:call(Pid, Msg, infinity) end) + fun() -> gen_server:call(Pid, Msg, infinity) end) end; declare_fast_reply_to(_) -> not_found. @@ -350,7 +353,7 @@ declare_fast_reply_to_v1(EncodedBin) -> Msg = {declare_fast_reply_to, V1Key}, rabbit_misc:with_exit_handler( rabbit_misc:const(not_found), - fun() -> gen_server2:call(V1Pid, Msg, infinity) end); + fun() -> gen_server:call(V1Pid, Msg, infinity) end); {error, _} -> not_found end. @@ -375,7 +378,7 @@ info_keys() -> ?INFO_KEYS. info(Pid) -> {Timeout, Deadline} = get_operation_timeout_and_deadline(), try - case gen_server2:call(Pid, {info, Deadline}, Timeout) of + case gen_server:call(Pid, {info, Deadline}, Timeout) of {ok, Res} -> Res; {error, Error} -> throw(Error) end @@ -390,7 +393,7 @@ info(Pid) -> info(Pid, Items) -> {Timeout, Deadline} = get_operation_timeout_and_deadline(), try - case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of + case gen_server:call(Pid, {{info, Items}, Deadline}, Timeout) of {ok, Res} -> Res; {error, Error} -> throw(Error) end @@ -430,7 +433,7 @@ refresh_config_local() -> _ = rabbit_misc:upmap( fun (C) -> try - gen_server2:call(C, refresh_config, infinity) + gen_server:call(C, refresh_config, infinity) catch _:Reason -> rabbit_log:error("Failed to refresh channel config " "for channel ~tp. Reason ~tp", @@ -444,7 +447,7 @@ refresh_interceptors() -> _ = rabbit_misc:upmap( fun (C) -> try - gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) + gen_server:call(C, refresh_interceptors, ?REFRESH_TIMEOUT) catch _:Reason -> rabbit_log:error("Failed to refresh channel interceptors " "for channel ~tp. Reason ~tp", @@ -465,11 +468,11 @@ ready_for_close(Pid) -> % This event is necessary for the stats timer to be initialized with % the correct values once the management agent has started force_event_refresh(Ref) -> - [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], + [gen_server:cast(C, {force_event_refresh, Ref}) || C <- list()], ok. list_queue_states(Pid) -> - gen_server2:call(Pid, list_queue_states). + gen_server:call(Pid, list_queue_states). -spec update_user_state(pid(), rabbit_types:auth_user()) -> 'ok' | {error, channel_terminated}. @@ -485,6 +488,7 @@ update_user_state(Pid, UserState) when is_pid(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, CollectorPid, LimiterPid, AmqpParams]) -> process_flag(trap_exit, true), + process_flag(message_queue_data, off_heap), ?LG_PROCESS_TYPE(channel), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), @@ -549,8 +553,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, fun() -> emit_stats(State2) end), put_operation_timeout(), State3 = init_tick_timer(State2), - {ok, State3, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + {ok, State3}. prioritise_call(Msg, _From, _Len, _State) -> case Msg of @@ -726,7 +729,7 @@ handle_info(emit_stats, State) -> State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer), %% NB: don't call noreply/1 since we don't want to kick off the %% stats timer. - {noreply, send_confirms_and_nacks(State1), hibernate}; + {noreply, send_confirms_and_nacks(State1)}; handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason}, #ch{queue_states = QStates0} = State0) -> @@ -821,14 +824,14 @@ get_consumer_timeout() -> end. %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. +reply(Reply, NewState) -> {reply, Reply, next_state(NewState)}. -noreply(NewState) -> {noreply, next_state(NewState), hibernate}. +noreply(NewState) -> {noreply, next_state(NewState)}. next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)). noreply_coalesce(#ch{confirmed = [], rejected = []} = State) -> - {noreply, ensure_stats_timer(State), hibernate}; + {noreply, ensure_stats_timer(State)}; noreply_coalesce(#ch{} = State) -> % Immediately process 'timeout' info message {noreply, ensure_stats_timer(State), 0}.