Skip to content

Commit d6b9fa0

Browse files
committed
wip: add feature flag and put RaUids in nodes
1 parent f3b7638 commit d6b9fa0

File tree

3 files changed

+74
-37
lines changed

3 files changed

+74
-37
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,3 +211,10 @@
211211
stability => stable,
212212
depends_on => ['rabbitmq_4.0.0']
213213
}}).
214+
215+
-rabbit_feature_flag(
216+
{'track_qq_members_uids',
217+
#{desc => "Track queue members UIDs in the metadata store",
218+
stability => stable,
219+
depends_on => []
220+
}}).

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -279,9 +279,14 @@ start_cluster(Q) ->
279279
UIDs = maps:from_list([{Node, ra:new_uid(ra_lib:to_binary(RaName))}
280280
|| Node <- [LeaderNode | FollowerNodes]]),
281281
NewQ0 = amqqueue:set_pid(Q, LeaderId),
282-
NewQ1 = amqqueue:set_type_state(NewQ0,
283-
#{nodes => [LeaderNode | FollowerNodes],
284-
uids => UIDs}),
282+
NewQ1 = case rabbit_feature_flags:is_enabled(track_qq_members_uids) of
283+
false ->
284+
amqqueue:set_type_state(NewQ0,
285+
#{nodes => [LeaderNode | FollowerNodes]});
286+
true ->
287+
amqqueue:set_type_state(NewQ0,
288+
#{nodes => UIDs})
289+
end,
285290

286291
Versions = [V || {ok, V} <- erpc:multicall(FollowerNodes,
287292
rabbit_fifo, version, [],
@@ -726,7 +731,7 @@ repair_amqqueue_nodes(Q0) ->
726731
{Name, _} = amqqueue:get_pid(Q0),
727732
Members = ra_leaderboard:lookup_members(Name),
728733
RaNodes = [N || {_, N} <- Members],
729-
#{nodes := Nodes} = amqqueue:get_type_state(Q0),
734+
Nodes = get_nodes(Q0),
730735
case lists:sort(RaNodes) =:= lists:sort(Nodes) of
731736
true ->
732737
%% up to date
@@ -735,7 +740,18 @@ repair_amqqueue_nodes(Q0) ->
735740
%% update amqqueue record
736741
Fun = fun (Q) ->
737742
TS0 = amqqueue:get_type_state(Q),
738-
TS = TS0#{nodes => RaNodes},
743+
TS = case rabbit_feature_flags:is_enabled(track_qq_members_uids)
744+
andalso has_uuid_tracking(TS0)
745+
of
746+
false ->
747+
TS0#{nodes => RaNodes};
748+
true ->
749+
RaUids = maps:from_list([{N, erpc:call(N, ra_directory, uid_of,
750+
[?RA_SYSTEM, Name],
751+
?RPC_TIMEOUT)}
752+
|| N <- RaNodes]),
753+
TS0#{nodes => RaUids}
754+
end,
739755
amqqueue:set_type_state(Q, TS)
740756
end,
741757
_ = rabbit_amqqueue:update(QName, Fun),
@@ -796,10 +812,9 @@ recover(_Vhost, Queues) ->
796812
QName = amqqueue:get_name(Q0),
797813
MutConf = make_mutable_config(Q0),
798814
RaUId = ra_directory:uid_of(?RA_SYSTEM, Name),
799-
QTypeState0 = amqqueue:get_type_state(Q0),
800-
RaUIds = maps:get(uids, QTypeState0, undefined),
801-
QTypeState = case RaUIds of
802-
undefined ->
815+
#{nodes := Nodes} = QTypeState0 = amqqueue:get_type_state(Q0),
816+
QTypeState = case Nodes of
817+
List when is_list(List) ->
803818
%% Queue is not aware of node to uid mapping, do nothing
804819
QTypeState0;
805820
#{node() := RaUId} ->
@@ -810,7 +825,7 @@ recover(_Vhost, Queues) ->
810825
%% does not match the one returned by ra_directory, regen uid
811826
maybe_delete_data_dir(RaUId),
812827
NewRaUId = ra:new_uid(ra_lib:to_binary(Name)),
813-
QTypeState0#{uids := RaUIds#{node() => NewRaUId}}
828+
QTypeState0#{nodes := Nodes#{node() => NewRaUId}}
814829
end,
815830
Q = amqqueue:set_type_state(Q0, QTypeState),
816831
Res = case ra:restart_server(?RA_SYSTEM, ServerId, MutConf) of
@@ -1413,21 +1428,20 @@ do_add_member(Q0, Node, Membership, Timeout)
14131428
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
14141429
ServerId = {RaName, Node},
14151430
Members = members(Q0),
1416-
QTypeState0 = amqqueue:get_type_state(Q0),
1417-
RaUIds = maps:get(uids, QTypeState0, undefined),
1418-
QTypeState = case RaUIds of
1419-
undefined ->
1420-
%% Queue is not aware of node to uid mapping, do nothing
1421-
QTypeState0;
1422-
#{Node := _} ->
1423-
%% Queue is aware and uid for targeted node exists, do nothing
1424-
QTypeState0;
1425-
_ ->
1426-
%% Queue is aware but current node has no UId, regen uid
1427-
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1428-
QTypeState0#{uids := RaUIds#{Node => NewRaUId}}
1429-
end,
1430-
Q = amqqueue:set_type_state(Q0, QTypeState),
1431+
QTypeState0 = #{nodes := _Nodes}= amqqueue:get_type_state(Q0),
1432+
NewRaUId = ra:new_uid(ra_lib:to_binary(RaName)),
1433+
%QTypeState = case Nodes of
1434+
% L when is_list(L) ->
1435+
% %% Queue is not aware of node to uid mapping, just add the new node
1436+
% QTypeState0#{nodes := lists:usort([Node | Nodes])};
1437+
% #{Node := _} ->
1438+
% %% Queue is aware and uid for targeted node exists, do nothing
1439+
% QTypeState0;
1440+
% _ ->
1441+
% %% Queue is aware but current node has no UId, regen uid
1442+
% QTypeState0#{nodes := Nodes#{Node => NewRaUId}}
1443+
%end,
1444+
Q = amqqueue:set_type_state(Q0, QTypeState0),
14311445
MachineVersion = erpc_call(Node, rabbit_fifo, version, [], infinity),
14321446
Conf = make_ra_conf(Q, ServerId, Membership, MachineVersion),
14331447
case ra:start_server(?RA_SYSTEM, Conf) of
@@ -1443,8 +1457,12 @@ do_add_member(Q0, Node, Membership, Timeout)
14431457
{ok, {RaIndex, RaTerm}, Leader} ->
14441458
Fun = fun(Q1) ->
14451459
Q2 = update_type_state(
1446-
Q1, fun(#{nodes := Nodes} = Ts) ->
1447-
Ts#{nodes => lists:usort([Node | Nodes])}
1460+
Q1, fun(#{nodes := NodesList} = Ts) when is_list(NodesList) ->
1461+
Ts#{nodes => lists:usort([Node | NodesList])};
1462+
(#{nodes := #{Node := _} = _NodesMap} = Ts) ->
1463+
Ts;
1464+
(#{nodes := NodesMap} = Ts) when is_map(NodesMap) ->
1465+
Ts#{nodes => maps:put(Node, NewRaUId, NodesMap)}
14481466
end),
14491467
amqqueue:set_pid(Q2, Leader)
14501468
end,
@@ -1517,12 +1535,10 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
15171535
Fun = fun(Q1) ->
15181536
update_type_state(
15191537
Q1,
1520-
fun(#{nodes := Nodes,
1521-
uids := UIds} = Ts) ->
1522-
Ts#{nodes => lists:delete(Node, Nodes),
1523-
uids => maps:remove(Node, UIds)};
1524-
(#{nodes := Nodes} = Ts) ->
1525-
Ts#{nodes => lists:delete(Node, Nodes)}
1538+
fun(#{nodes := Nodes} = Ts) when is_list(Nodes) ->
1539+
Ts#{nodes => lists:delete(Node, Nodes)};
1540+
(#{nodes := Nodes} = Ts) when is_map(Nodes) ->
1541+
Ts#{nodes => maps:remove(Node, Nodes)}
15261542
end)
15271543
end,
15281544
_ = rabbit_amqqueue:update(QName, Fun),
@@ -2069,7 +2085,12 @@ make_mutable_config(Q) ->
20692085

20702086
get_nodes(Q) when ?is_amqqueue(Q) ->
20712087
#{nodes := Nodes} = amqqueue:get_type_state(Q),
2072-
Nodes.
2088+
case Nodes of
2089+
List when is_list(List) ->
2090+
List;
2091+
Map when is_map(Map) ->
2092+
maps:keys(Map)
2093+
end.
20732094

20742095
get_connected_nodes(Q) when ?is_amqqueue(Q) ->
20752096
ErlangNodes = [node() | nodes()],
@@ -2425,3 +2446,8 @@ queue_vm_stats_sups() ->
24252446
queue_vm_ets() ->
24262447
{[quorum_ets],
24272448
[[ra_log_ets]]}.
2449+
2450+
has_uuid_tracking(#{nodes := Nodes}) when is_map(Nodes) ->
2451+
true;
2452+
has_uuid_tracking(_QTypeState) ->
2453+
false.

deps/rabbit/test/quorum_queue_SUITE.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2852,10 +2852,14 @@ add_member_2(Config) ->
28522852
{<<"x-quorum-initial-group-size">>, long, 1}])),
28532853
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
28542854
[<<"/">>, QQ, Server0, 5000])),
2855-
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
2856-
[rabbit_misc:r(<<"/">>, queue, QQ)]),
2855+
#{online := Onlines} = ?awaitMatch(#{online := [_One, _Two]},
2856+
maps:from_list(rpc:call(Server0,
2857+
rabbit_quorum_queue,
2858+
infos,
2859+
[rabbit_misc:r(<<"/">>, queue, QQ)])),
2860+
3000),
28572861
Servers = lists:sort([Server0, Server1]),
2858-
?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
2862+
?assertEqual(Servers, lists:sort(Onlines)).
28592863

28602864
delete_member_not_running(Config) ->
28612865
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),

0 commit comments

Comments
 (0)