Skip to content

Commit 80a687f

Browse files
committed
Simplify splitting large messages
1 parent cf3bbe9 commit 80a687f

File tree

6 files changed

+16
-13
lines changed

6 files changed

+16
-13
lines changed

deps/amqp10_client/src/amqp10_client_internal.hrl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
-define(AMQP_PROTOCOL_HEADER, <<"AMQP", 0, 1, 0, 0>>).
99
-define(SASL_PROTOCOL_HEADER, <<"AMQP", 3, 1, 0, 0>>).
10-
-define(FRAME_HEADER_SIZE, 8).
1110

1211
-define(TIMEOUT, 5000).
1312

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ send_transfer(Transfer0, Sections0, FooterOpt, MaxMessageSize,
636636
true ->
637637
% TODO: this does not take the extended header into account
638638
% see: 2.3
639-
MaxPayloadSize = OutMaxFrameSize - TransferSize - ?FRAME_HEADER_SIZE,
639+
MaxPayloadSize = OutMaxFrameSize - ?FRAME_HEADER_SIZE - TransferSize,
640640
Frames = build_frames(Channel, Transfer, SectionsBin, MaxPayloadSize, []),
641641
ok = socket_send(Socket, Frames),
642642
{ok, length(Frames)}

deps/amqp10_common/include/amqp10_types.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
% [1.6.5]
44
-type uint() :: 0..?UINT_MAX.
5+
6+
% [2.3.1]
7+
-define(FRAME_HEADER_SIZE, 8).
8+
59
% [2.8.4]
610
-type link_handle() :: uint().
711
% [2.8.8]

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -827,14 +827,9 @@ send_to_new_session(
827827
container_id = ContainerId,
828828
name = ConnName},
829829
writer = WriterPid} = State) ->
830-
%% Subtract fixed frame header size.
831-
OutgoingMaxFrameSize = case MaxFrame of
832-
unlimited -> unlimited;
833-
_ -> MaxFrame - 8
834-
end,
835830
ChildArgs = [WriterPid,
836831
ChannelNum,
837-
OutgoingMaxFrameSize,
832+
MaxFrame,
838833
User,
839834
Vhost,
840835
ContainerId,

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3163,19 +3163,20 @@ transfer_frames(Transfer, Sections, unlimited) ->
31633163
[[Transfer, Sections]];
31643164
transfer_frames(Transfer, Sections, MaxFrameSize) ->
31653165
PerformativeSize = iolist_size(amqp10_framing:encode_bin(Transfer)),
3166-
encode_frames(Transfer, Sections, MaxFrameSize - PerformativeSize, []).
3166+
MaxPayloadSize = MaxFrameSize - ?FRAME_HEADER_SIZE - PerformativeSize,
3167+
split_msg(Transfer, Sections, MaxPayloadSize, []).
31673168

3168-
encode_frames(_T, _Msg, MaxPayloadSize, _Transfers) when MaxPayloadSize =< 0 ->
3169+
split_msg(_T, _Msg, MaxPayloadSize, _Transfers) when MaxPayloadSize =< 0 ->
31693170
protocol_error(?V_1_0_AMQP_ERROR_FRAME_SIZE_TOO_SMALL,
31703171
"Frame size is too small by ~b bytes",
31713172
[-MaxPayloadSize]);
3172-
encode_frames(T, Msg, MaxPayloadSize, Transfers) ->
3173+
split_msg(T, Msg, MaxPayloadSize, Transfers) ->
31733174
case iolist_size(Msg) > MaxPayloadSize of
31743175
true ->
31753176
MsgBin = iolist_to_binary(Msg),
31763177
{Chunk, Rest} = split_binary(MsgBin, MaxPayloadSize),
31773178
T1 = T#'v1_0.transfer'{more = true},
3178-
encode_frames(T, Rest, MaxPayloadSize, [[T1, Chunk] | Transfers]);
3179+
split_msg(T, Rest, MaxPayloadSize, [[T1, Chunk] | Transfers]);
31793180
false ->
31803181
lists:reverse([[T, Msg] | Transfers])
31813182
end.

deps/rabbit/test/amqp_dotnet_SUITE_data/fsharp-tests/Program.fs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,11 @@ module Test =
279279

280280
let fragmentation uri =
281281
for frameSize, size in
282-
[1024u, 1024
282+
[1024u, 990
283+
1024u, 1000
284+
1024u, 1010
285+
1024u, 1020
286+
1024u, 1024
283287
1024u, 1100
284288
1024u, 2048
285289
2048u, 2048] do

0 commit comments

Comments
 (0)