Skip to content

Commit a24aef6

Browse files
Merge pull request #13307 from rabbitmq/recover-exch-rk-from-amqp
Mc: introduce new function in mc_amqp to init mc from stream data
2 parents 82eb311 + 32615bf commit a24aef6

File tree

2 files changed

+61
-43
lines changed

2 files changed

+61
-43
lines changed

deps/rabbit/src/mc_amqp.erl

Lines changed: 56 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
prepare/2
1818
]).
1919

20+
-export([init_from_stream/2]).
21+
2022
-import(rabbit_misc,
2123
[maps_put_truthy/3]).
2224

@@ -99,10 +101,26 @@
99101

100102
-export_type([state/0]).
101103

104+
%% API
105+
106+
-spec init_from_stream(binary(), mc:annotations()) ->
107+
mc:state().
108+
init_from_stream(Payload, #{} = Anns0) ->
109+
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
110+
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
111+
%% when initalising from stored stream data the recovered
112+
%% annotations take precendence over the ones provided
113+
Anns = maps:merge(Anns0, essential_properties(Msg, recover)),
114+
mc:init(?MODULE, Msg, Anns).
115+
116+
%% CALLBACKS
117+
118+
init(#msg_body_encoded{} = Msg) ->
119+
{Msg, #{}};
102120
init(Payload) ->
103121
Sections = amqp10_framing:decode_bin(Payload, [server_mode]),
104122
Msg = msg_body_encoded(Sections, Payload, #msg_body_encoded{}),
105-
Anns = essential_properties(Msg),
123+
Anns = essential_properties(Msg, new),
106124
{Msg, Anns}.
107125

108126
convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
@@ -622,16 +640,44 @@ encode_deaths(Deaths) ->
622640
{map, Map}
623641
end, Deaths).
624642

625-
essential_properties(Msg) ->
643+
essential_properties(#msg_body_encoded{} = Msg, new) ->
626644
Durable = get_property(durable, Msg),
627645
Priority = get_property(priority, Msg),
628646
Timestamp = get_property(timestamp, Msg),
629647
Ttl = get_property(ttl, Msg),
630-
Anns = #{?ANN_DURABLE => Durable},
631-
maps_put_truthy(
632-
?ANN_PRIORITY, Priority,
633-
maps_put_truthy(
634-
?ANN_TIMESTAMP, Timestamp,
635-
maps_put_truthy(
636-
ttl, Ttl,
637-
Anns))).
648+
Anns0 = #{?ANN_DURABLE => Durable},
649+
Anns = maps_put_truthy(
650+
?ANN_PRIORITY, Priority,
651+
maps_put_truthy(
652+
?ANN_TIMESTAMP, Timestamp,
653+
maps_put_truthy(
654+
ttl, Ttl,
655+
Anns0))),
656+
Anns;
657+
essential_properties(#msg_body_encoded{message_annotations = MA} = Msg, recover) ->
658+
Anns = essential_properties(Msg, new),
659+
case MA of
660+
[] ->
661+
Anns;
662+
_ ->
663+
lists:foldl(
664+
fun ({{symbol, <<"x-routing-key">>},
665+
{utf8, Key}}, Acc) ->
666+
maps:update_with(?ANN_ROUTING_KEYS,
667+
fun(L) -> [Key | L] end,
668+
[Key],
669+
Acc);
670+
({{symbol, <<"x-cc">>},
671+
{list, CCs0}}, Acc) ->
672+
CCs = [CC || {_T, CC} <- CCs0],
673+
maps:update_with(?ANN_ROUTING_KEYS,
674+
fun(L) -> L ++ CCs end,
675+
CCs,
676+
Acc);
677+
({{symbol, <<"x-exchange">>},
678+
{utf8, Exchange}}, Acc) ->
679+
Acc#{?ANN_EXCHANGE => Exchange};
680+
(_, Acc) ->
681+
Acc
682+
end, Anns, MA)
683+
end.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,39 +1305,11 @@ parse_uncompressed_subbatch(
13051305
parse_uncompressed_subbatch(Rem, Offset + 1, StartOffset, QName,
13061306
Name, LocalPid, Filter, Acc).
13071307

1308-
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName}, Name, LocalPid, Filter) ->
1309-
Mc0 = mc:init(mc_amqp, Entry, #{}),
1310-
%% If exchange or routing keys annotation isn't present the entry most likely came
1311-
%% from the rabbitmq-stream plugin so we'll choose defaults that simulate use
1312-
%% of the direct exchange.
1313-
XHeaders = mc:x_headers(Mc0),
1314-
Exchange = case XHeaders of
1315-
#{<<"x-exchange">> := {utf8, X}} ->
1316-
X;
1317-
_ ->
1318-
<<>>
1319-
end,
1320-
RKeys0 = case XHeaders of
1321-
#{<<"x-cc">> := {list, CCs}} ->
1322-
[CC || {utf8, CC} <- CCs];
1323-
_ ->
1324-
[]
1325-
end,
1326-
RKeys1 = case XHeaders of
1327-
#{<<"x-routing-key">> := {utf8, RK}} ->
1328-
[RK | RKeys0];
1329-
_ ->
1330-
RKeys0
1331-
end,
1332-
RKeys = case RKeys1 of
1333-
[] ->
1334-
[QName];
1335-
_ ->
1336-
RKeys1
1337-
end,
1338-
Mc1 = mc:set_annotation(?ANN_EXCHANGE, Exchange, Mc0),
1339-
Mc2 = mc:set_annotation(?ANN_ROUTING_KEYS, RKeys, Mc1),
1340-
Mc = mc:set_annotation(<<"x-stream-offset">>, Offset, Mc2),
1308+
entry_to_msg(Entry, Offset, #resource{kind = queue, name = QName},
1309+
Name, LocalPid, Filter) ->
1310+
Mc = mc_amqp:init_from_stream(Entry, #{?ANN_EXCHANGE => <<>>,
1311+
?ANN_ROUTING_KEYS => [QName],
1312+
<<"x-stream-offset">> => Offset}),
13411313
case rabbit_amqp_filtex:filter(Filter, Mc) of
13421314
true ->
13431315
{Name, LocalPid, Offset, false, Mc};

0 commit comments

Comments
 (0)