Skip to content

Commit 34193d6

Browse files
committed
Presence server
This adds a simple presence server on each rabbit node that sends heartbeats to all other configred rabbbit nodes and maintains a local view of which peers are present. This can be used to get the list of present nodes for functions where it is more important to return a result than to be 100% correct (which is very hard anyway). remove logs fix maybe maybe
1 parent d258626 commit 34193d6

File tree

4 files changed

+113
-0
lines changed

4 files changed

+113
-0
lines changed

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ def all_beam_files(name = "all_beam_files"):
196196
"src/rabbit_prelaunch_feature_flags.erl",
197197
"src/rabbit_prelaunch_logging.erl",
198198
"src/rabbit_prequeue.erl",
199+
"src/rabbit_presence.erl",
199200
"src/rabbit_priority_queue.erl",
200201
"src/rabbit_process.erl",
201202
"src/rabbit_queue_consumers.erl",
@@ -461,6 +462,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
461462
"src/rabbit_prelaunch_feature_flags.erl",
462463
"src/rabbit_prelaunch_logging.erl",
463464
"src/rabbit_prequeue.erl",
465+
"src/rabbit_presence.erl",
464466
"src/rabbit_priority_queue.erl",
465467
"src/rabbit_process.erl",
466468
"src/rabbit_queue_consumers.erl",
@@ -745,6 +747,7 @@ def all_srcs(name = "all_srcs"):
745747
"src/rabbit_prelaunch_feature_flags.erl",
746748
"src/rabbit_prelaunch_logging.erl",
747749
"src/rabbit_prequeue.erl",
750+
"src/rabbit_presence.erl",
748751
"src/rabbit_priority_queue.erl",
749752
"src/rabbit_process.erl",
750753
"src/rabbit_queue_consumers.erl",

deps/rabbit/src/rabbit.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,13 @@
229229
{requires, [core_initialized, recovery]},
230230
{enables, routing_ready}]}).
231231

232+
-rabbit_boot_step({rabbit_presence,
233+
[{description, "rabbit node presence server"},
234+
{mfa, {rabbit_sup, start_restartable_child,
235+
[rabbit_presence]}},
236+
{requires, [core_initialized, recovery]},
237+
{enables, routing_ready}]}).
238+
232239
-rabbit_boot_step({rabbit_looking_glass,
233240
[{description, "Looking Glass tracer and profiler"},
234241
{mfa, {rabbit_looking_glass, boot, []}},

deps/rabbit/src/rabbit_presence.erl

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_presence).
9+
10+
-behaviour(gen_server).
11+
12+
-export([list_present/0]).
13+
-export([start_link/0]).
14+
15+
-export([init/1,
16+
handle_call/3,
17+
handle_cast/2,
18+
handle_info/2,
19+
terminate/2,
20+
code_change/3]).
21+
22+
-define(SERVER, ?MODULE).
23+
-define(INTERVAL, 1000).
24+
25+
-record(?MODULE, {tbl :: ets:table(),
26+
nodes = [] :: [node()]}).
27+
28+
%%----------------------------------------------------------------------------
29+
%% A presence server that heartbeats all configured servers with the goal of
30+
%% providing a very quickly accessible idea of node availability without
31+
%% having to use rabbit_nodes:all_running/1 which can block for a long time.
32+
%%----------------------------------------------------------------------------
33+
34+
-spec list_present() -> [node()].
35+
list_present() ->
36+
case whereis(?MODULE) of
37+
undefined ->
38+
%% TODO: change return type to ok | error?
39+
exit(presence_server_not_running);
40+
_ ->
41+
Cutoff = erlang:system_time(millisecond) - 5000,
42+
[N || {N, SeenMs} <- ets:tab2list(?MODULE),
43+
%% if it hasn't been seen since the cutoff
44+
SeenMs > Cutoff,
45+
%% if not in nodes() it is also considered not present
46+
lists:member(N, nodes())]
47+
end.
48+
49+
-spec start_link() -> rabbit_types:ok_pid_or_error().
50+
start_link() ->
51+
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
52+
53+
init([]) ->
54+
process_flag(trap_exit, true),
55+
Ref = ets:new(?MODULE, [set, named_table, public]),
56+
Nodes = rabbit_nodes:list_members(),
57+
beat_all(Nodes),
58+
erlang:send_after(?INTERVAL, self(), beat),
59+
{ok, #?MODULE{tbl = Ref,
60+
nodes = Nodes}}.
61+
62+
handle_call(_Request, _From, State) ->
63+
{noreply, State}.
64+
65+
handle_cast(_Request, State) ->
66+
{noreply, State}.
67+
68+
handle_info(beat, #?MODULE{tbl = _Tbl,
69+
nodes = Nodes} = State) ->
70+
_ = erlang:send_after(?INTERVAL, self(), beat),
71+
_ = beat_all(Nodes),
72+
{noreply, State};
73+
handle_info({hb, Node}, #?MODULE{tbl = Tbl,
74+
nodes = _Nodes} = State) ->
75+
ets:insert(Tbl, {Node, erlang:system_time(millisecond)}),
76+
{noreply, State};
77+
handle_info({terminate, Node}, #?MODULE{tbl = Tbl,
78+
nodes = _Nodes} = State) ->
79+
ets:delete(Tbl, Node),
80+
{noreply, State};
81+
handle_info(_Msg, State) ->
82+
{noreply, State}.
83+
84+
terminate(_Reason, #?MODULE{nodes = Nodes}) ->
85+
%% only send terminate if reason is `shutdown`?
86+
_ = send_terminate(Nodes),
87+
ok.
88+
89+
code_change(_OldVsn, State, _Extra) ->
90+
{ok, State}.
91+
92+
93+
%% INTERNAL
94+
95+
beat_all(Nodes) ->
96+
[send(N, {hb, node()}) || N <- Nodes, N =/= node()].
97+
98+
send_terminate(Nodes) ->
99+
[send(N, {terminate, node()}) || N <- Nodes, N =/= node()].
100+
101+
send(Node, Msg) ->
102+
erlang:send({?SERVER, Node}, Msg, [noconnect, nosuspend]).

moduleindex.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,7 @@ rabbit:
803803
- rabbit_prelaunch_feature_flags
804804
- rabbit_prelaunch_logging
805805
- rabbit_prequeue
806+
- rabbit_presence
806807
- rabbit_priority_queue
807808
- rabbit_process
808809
- rabbit_queue_consumers

0 commit comments

Comments
 (0)