Skip to content

Commit 596b885

Browse files
committed
Refactor rabbit_channel from gen_server2 to gen_server
1 parent 8cc26c4 commit 596b885

File tree

1 file changed

+34
-22
lines changed

1 file changed

+34
-22
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444

4545
-include("amqqueue.hrl").
4646

47-
-behaviour(gen_server2).
47+
-behaviour(gen_server).
4848

4949
-export([start_link/11, start_link/12, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
5050
-export([send_command/2]).
@@ -221,6 +221,8 @@
221221
put({Type, Key}, none)
222222
end).
223223

224+
-define(HIBERNATE_AFTER, 6_000).
225+
224226
%%----------------------------------------------------------------------------
225227

226228
-export_type([channel_number/0]).
@@ -258,9 +260,10 @@ start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
258260

259261
start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
260262
VHost, Capabilities, CollectorPid, Limiter, AmqpParams) ->
261-
gen_server2:start_link(
263+
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
264+
gen_server:start_link(
262265
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
263-
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], []).
266+
User, VHost, Capabilities, CollectorPid, Limiter, AmqpParams], Opts).
264267

265268
-spec do(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
266269

@@ -286,17 +289,17 @@ do_flow(Pid, Method, Content) ->
286289
-spec flush(pid()) -> 'ok'.
287290

288291
flush(Pid) ->
289-
gen_server2:call(Pid, flush, infinity).
292+
gen_server:call(Pid, flush, infinity).
290293

291294
-spec shutdown(pid()) -> 'ok'.
292295

293296
shutdown(Pid) ->
294-
gen_server2:cast(Pid, terminate).
297+
gen_server:cast(Pid, terminate).
295298

296299
-spec send_command(pid(), rabbit_framing:amqp_method_record()) -> 'ok'.
297300

298301
send_command(Pid, Msg) ->
299-
gen_server2:cast(Pid, {command, Msg}).
302+
gen_server:cast(Pid, {command, Msg}).
300303

301304

302305
-spec deliver_reply(binary(), mc:state()) -> 'ok'.
@@ -317,7 +320,7 @@ deliver_reply(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>, Message) ->
317320

318321
deliver_reply_local(Pid, Key, Message) ->
319322
case pg_local:in_group(rabbit_channels, Pid) of
320-
true -> gen_server2:cast(Pid, {deliver_reply, Key, Message});
323+
true -> gen_server:cast(Pid, {deliver_reply, Key, Message});
321324
false -> ok
322325
end.
323326

@@ -330,13 +333,25 @@ declare_fast_reply_to(<<"amq.rabbitmq.reply-to.", EncodedBin/binary>>) ->
330333
Msg = {declare_fast_reply_to, Key},
331334
rabbit_misc:with_exit_handler(
332335
rabbit_misc:const(not_found),
333-
fun() -> gen_server2:call(Pid, Msg, infinity) end);
336+
fun() -> gen_server:call(Pid, Msg, infinity) end);
334337
{error, _} ->
335338
not_found
336339
end;
337340
declare_fast_reply_to(_) ->
338341
not_found.
339342

343+
declare_fast_reply_to_v1(EncodedBin) ->
344+
%% the the original encoding function
345+
case rabbit_direct_reply_to:decode_reply_to_v1(EncodedBin) of
346+
{ok, V1Pid, V1Key} ->
347+
Msg = {declare_fast_reply_to, V1Key},
348+
rabbit_misc:with_exit_handler(
349+
rabbit_misc:const(not_found),
350+
fun() -> gen_server:call(V1Pid, Msg, infinity) end);
351+
{error, _} ->
352+
not_found
353+
end.
354+
340355
-spec list() -> [pid()].
341356

342357
list() ->
@@ -357,7 +372,7 @@ info_keys() -> ?INFO_KEYS.
357372
info(Pid) ->
358373
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
359374
try
360-
case gen_server2:call(Pid, {info, Deadline}, Timeout) of
375+
case gen_server:call(Pid, {info, Deadline}, Timeout) of
361376
{ok, Res} -> Res;
362377
{error, Error} -> throw(Error)
363378
end
@@ -372,7 +387,7 @@ info(Pid) ->
372387
info(Pid, Items) ->
373388
{Timeout, Deadline} = get_operation_timeout_and_deadline(),
374389
try
375-
case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of
390+
case gen_server:call(Pid, {{info, Items}, Deadline}, Timeout) of
376391
{ok, Res} -> Res;
377392
{error, Error} -> throw(Error)
378393
end
@@ -412,7 +427,7 @@ refresh_config_local() ->
412427
_ = rabbit_misc:upmap(
413428
fun (C) ->
414429
try
415-
gen_server2:call(C, refresh_config, infinity)
430+
gen_server:call(C, refresh_config, infinity)
416431
catch _:Reason ->
417432
rabbit_log:error("Failed to refresh channel config "
418433
"for channel ~tp. Reason ~tp",
@@ -426,7 +441,7 @@ refresh_interceptors() ->
426441
_ = rabbit_misc:upmap(
427442
fun (C) ->
428443
try
429-
gen_server2:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
444+
gen_server:call(C, refresh_interceptors, ?REFRESH_TIMEOUT)
430445
catch _:Reason ->
431446
rabbit_log:error("Failed to refresh channel interceptors "
432447
"for channel ~tp. Reason ~tp",
@@ -447,11 +462,11 @@ ready_for_close(Pid) ->
447462
% This event is necessary for the stats timer to be initialized with
448463
% the correct values once the management agent has started
449464
force_event_refresh(Ref) ->
450-
[gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
465+
[gen_server:cast(C, {force_event_refresh, Ref}) || C <- list()],
451466
ok.
452467

453468
list_queue_states(Pid) ->
454-
gen_server2:call(Pid, list_queue_states).
469+
gen_server:call(Pid, list_queue_states).
455470

456471
-spec update_user_state(pid(), rabbit_types:user()) -> 'ok' | {error, channel_terminated}.
457472

@@ -467,8 +482,6 @@ update_user_state(Pid, UserState) when is_pid(Pid) ->
467482
init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
468483
Capabilities, CollectorPid, LimiterPid, AmqpParams]) ->
469484
process_flag(trap_exit, true),
470-
rabbit_process_flag:adjust_for_message_handling_proc(),
471-
472485
?LG_PROCESS_TYPE(channel),
473486
?store_proc_name({ConnName, Channel}),
474487
ok = pg_local:join(rabbit_channels, self()),
@@ -542,8 +555,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
542555
fun() -> emit_stats(State2) end),
543556
put_operation_timeout(),
544557
State3 = init_tick_timer(State2),
545-
{ok, State3, hibernate,
546-
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
558+
{ok, State3}.
547559

548560
prioritise_call(Msg, _From, _Len, _State) ->
549561
case Msg of
@@ -722,7 +734,7 @@ handle_info(emit_stats, State) ->
722734
State1 = rabbit_event:reset_stats_timer(State, #ch.stats_timer),
723735
%% NB: don't call noreply/1 since we don't want to kick off the
724736
%% stats timer.
725-
{noreply, send_confirms_and_nacks(State1), hibernate};
737+
{noreply, send_confirms_and_nacks(State1)};
726738

727739
handle_info({{'DOWN', QName}, _MRef, process, QPid, Reason},
728740
#ch{queue_states = QStates0} = State0) ->
@@ -822,14 +834,14 @@ get_consumer_timeout() ->
822834

823835
%%---------------------------------------------------------------------------
824836

825-
reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
837+
reply(Reply, NewState) -> {reply, Reply, next_state(NewState)}.
826838

827-
noreply(NewState) -> {noreply, next_state(NewState), hibernate}.
839+
noreply(NewState) -> {noreply, next_state(NewState)}.
828840

829841
next_state(State) -> ensure_stats_timer(send_confirms_and_nacks(State)).
830842

831843
noreply_coalesce(#ch{confirmed = [], rejected = []} = State) ->
832-
{noreply, ensure_stats_timer(State), hibernate};
844+
{noreply, ensure_stats_timer(State)};
833845
noreply_coalesce(#ch{} = State) ->
834846
% Immediately process 'timeout' info message
835847
{noreply, ensure_stats_timer(State), 0}.

0 commit comments

Comments
 (0)