From 530cfe33883e7b0810074da9716668433af2310f Mon Sep 17 00:00:00 2001 From: Holger Weiss Date: Fri, 10 Jun 2022 19:43:20 +0200 Subject: [PATCH] Optionally queue outgoing data Support queueing outgoing stanzas and stream management elements for up to a configurable number of milliseconds (with a configurable queue size limit). This allows for batching up multiple XML elements into a single TCP packet in order to reduce the TCP/IP overhead. --- src/xmpp_socket.erl | 7 ++ src/xmpp_stream_in.erl | 152 +++++++++++++++++++++++++++++++++------- src/xmpp_stream_out.erl | 137 ++++++++++++++++++++++++++++++------ 3 files changed, 249 insertions(+), 47 deletions(-) diff --git a/src/xmpp_socket.erl b/src/xmpp_socket.erl index 08c12531..5bb6282c 100644 --- a/src/xmpp_socket.erl +++ b/src/xmpp_socket.erl @@ -28,6 +28,7 @@ compress/1, compress/2, reset_stream/1, + send_elements/2, send_element/2, send_header/2, send_trailer/1, @@ -196,6 +197,12 @@ reset_stream(#socket_state{xml_stream = XMLStream, SocketData#socket_state{socket = Socket1} end. +-spec send_elements(socket_state(), [fxml:xmlel()]) -> ok | {error, inet:posix()}. +send_elements(#socket_state{xml_stream = undefined}, _Els) -> + erlang:error(not_implemented); +send_elements(SocketData, Els) -> + send(SocketData, list_to_binary([fxml:element_to_binary(El) || El <- Els])). + -spec send_element(socket_state(), fxml:xmlel()) -> ok | {error, inet:posix()}. send_element(#socket_state{xml_stream = undefined} = SocketData, El) -> send_xml(SocketData, {xmlstreamelement, El}); diff --git a/src/xmpp_stream_in.erl b/src/xmpp_stream_in.erl index 48dd2d7b..705be640 100644 --- a/src/xmpp_stream_in.erl +++ b/src/xmpp_stream_in.erl @@ -25,8 +25,8 @@ %% API -export([start/3, start_link/3, call/3, cast/2, reply/2, stop/1, stop_async/1, accept/1, send/2, close/1, close/2, send_error/3, establish/1, - get_transport/1, change_shaper/2, set_timeout/2, format_error/1, - send_ws_ping/1]). + get_transport/1, change_shaper/2, configure_queue/3, set_timeout/2, + format_error/1, send_ws_ping/1]). %% gen_server callbacks -export([init/1, handle_cast/2, handle_call/3, handle_info/2, @@ -58,6 +58,9 @@ stream_encrypted => boolean(), stream_version => {non_neg_integer(), non_neg_integer()}, stream_authenticated => boolean(), + stream_queue := [xmpp_element() | xmlel()], + stream_queue_max := non_neg_integer(), + stream_queue_timeout => {non_neg_integer(), integer()}, ip => {inet:ip_address(), inet:port_number()}, codec_options => [xmpp:decode_option()], xmlns => binary(), @@ -226,7 +229,21 @@ close(Pid, Reason) -> establish(State) -> process_stream_established(State). --spec set_timeout(state(), non_neg_integer() | infinity) -> state(). +-spec configure_queue(state(), non_neg_integer(), non_neg_integer()) -> state(). +configure_queue(#{owner := Owner} = State, MaxSize, MaxDelay) + when Owner == self() -> + flush_queue(State), % Support reconfiguration. + if MaxSize == 0; MaxDelay == 0 -> + State#{stream_queue_max => 0}; + true -> + CurrentTime = p1_time_compat:monotonic_time(milli_seconds), + State#{stream_queue_max => MaxSize, + stream_queue_timeout => {MaxDelay, CurrentTime}} + end; +configure_queue(_, _, _) -> + erlang:error(badarg). + +-spec set_timeout(state(), timeout()) -> state(). set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() -> case Timeout of infinity -> State#{stream_timeout => infinity}; @@ -280,7 +297,9 @@ init([Mod, {SockMod, Socket}, Opts]) -> socket_mod => SockMod, socket_opts => Opts, stream_timeout => {Timeout, Time}, - stream_state => accepting}, + stream_state => accepting, + stream_queue => [], + stream_queue_max => 0}, {ok, State, Timeout}. -spec handle_cast(term(), state()) -> next_state(). @@ -424,6 +443,8 @@ handle_info({'$gen_all_state_event', {xmlstreamcdata, Data}}, noreply(try callback(handle_cdata, Data, State) catch _:{?MODULE, undef} -> State end); +handle_info(timeout, #{stream_queue := [_|_]} = State) -> + noreply(flush_queue(State)); handle_info(timeout, #{lang := Lang} = State) -> Disconnected = is_disconnected(State), noreply(try callback(handle_timeout, State) @@ -522,15 +543,32 @@ init_state(#{socket := Socket, mod := Mod} = State, Opts) -> end. -spec noreply(state()) -> noreply(); - ({stop, state()}) -> {stop, normal, state()}. + ({stop, state()}) -> {stop, normal, state()}; + ({stop, normal, state()}) -> {stop, normal, state()}. noreply({stop, State}) -> {stop, normal, State}; -noreply(#{stream_timeout := infinity} = State) -> - {noreply, State, infinity}; -noreply(#{stream_timeout := {MSecs, StartTime}} = State) -> +noreply({stop, normal, State}) -> + {stop, normal, State}; +noreply(State) -> + {noreply, State, get_timeout(State)}. + +-spec get_timeout(state()) -> timeout(). +get_timeout(State) -> + min(get_stream_timeout(State), get_queue_timeout(State)). + +-spec get_stream_timeout(state()) -> timeout(). +get_stream_timeout(#{stream_timeout := infinity}) -> + infinity; +get_stream_timeout(#{stream_timeout := {MSecs, StartTime}}) -> CurrentTime = p1_time_compat:monotonic_time(milli_seconds), - Timeout = max(0, MSecs - CurrentTime + StartTime), - {noreply, State, Timeout}. + max(0, MSecs - CurrentTime + StartTime). + +-spec get_queue_timeout(state()) -> timeout(). +get_queue_timeout(#{stream_queue := []}) -> + infinity; +get_queue_timeout(#{stream_queue_timeout := {MSecs, StartTime}}) -> + CurrentTime = p1_time_compat:monotonic_time(milli_seconds), + max(0, MSecs - CurrentTime + StartTime). -spec is_disconnected(state()) -> boolean(). is_disconnected(#{stream_state := StreamState}) -> @@ -1193,21 +1231,29 @@ send_header(State, _) -> -spec send_pkt(state(), xmpp_element() | xmlel()) -> state(). send_pkt(State, Pkt) -> - Result = socket_send(State, Pkt), - State1 = try callback(handle_send, Pkt, Result, State) - catch _:{?MODULE, undef} -> State - end, - case Result of - _ when is_record(Pkt, stream_error) -> - process_stream_end({stream, {out, Pkt}}, State1); - ok -> - State1; - {error, _Why} -> - % Queue process_stream_end instead of calling it directly, - % so we have opportunity to process incoming queued messages before - % terminating session. - self() ! {'$gen_event', closed}, - State1 + case check_queue(State, Pkt) of + flush -> + flush_queue(State, Pkt); + queue -> + queue_pkt(State, Pkt); + noqueue -> + State1 = flush_queue(State), + Result = socket_send(State1, Pkt), + State2 = try callback(handle_send, Pkt, Result, State1) + catch _:{?MODULE, undef} -> State1 + end, + case Result of + _ when is_record(Pkt, stream_error) -> + process_stream_end({stream, {out, Pkt}}, State2); + ok -> + State2; + {error, _Why} -> + % Queue process_stream_end instead of calling it directly, + % so we have the opportunity to process incoming queued + % messages before terminating the session. + self() ! {'$gen_event', closed}, + State2 + end end. -spec send_error(state(), xmpp_element() | xmlel(), stanza_error()) -> state(). @@ -1258,6 +1304,62 @@ close_socket(#{socket := Socket} = State) -> close_socket(State) -> State. +-spec check_queue(state(), xmpp_element() | xmlel()) -> flush | queue | noqueue. +check_queue(#{stream_queue_max := 0}, _Pkt) -> + noqueue; +check_queue(#{stream_state := StreamState}, _Pkt) + when StreamState /= established-> + noqueue; +check_queue(_State, Pkt) + when not ?is_stanza(Pkt), + not is_record(Pkt, sm_a), + not is_record(Pkt, sm_r) -> + noqueue; +check_queue(#{stream_queue := Q, stream_queue_max := MaxQueue}, _Pkt) + when length(Q) >= MaxQueue -> + flush; +check_queue(_State, _Pkt) -> + queue. + +-spec queue_pkt(state(), xmpp_element() | xmlel()) -> state(). +queue_pkt(#{stream_queue := [], + stream_queue_timeout := {MSecs, _PrevTime}} = State, Pkt) -> + CurrentTime = p1_time_compat:monotonic_time(milli_seconds), + State#{stream_queue := [Pkt], + stream_queue_timeout := {MSecs, CurrentTime}}; +queue_pkt(#{stream_queue := Q} = State, Pkt) -> + State#{stream_queue := [Pkt|Q]}. + +-spec flush_queue(state(), xmpp_element() | xmlel()) -> state(). +flush_queue(State, Pkt) -> + flush_queue(queue_pkt(State, Pkt)). + +-spec flush_queue(state()) -> state(). +flush_queue(#{stream_queue := []} = State) -> + State; +flush_queue(#{stream_queue := Q0, + socket := Sock, + xmlns := NS} = State0) -> + Q = lists:reverse(Q0), + Els = [xmpp:encode(Pkt, NS) || Pkt <- Q], + Result = xmpp_socket:send_elements(Sock, Els), + State1 = State0#{stream_queue := []}, + State2 = try lists:foldl( + fun(Pkt, State) -> + callback(handle_send, Pkt, Result, State) + end, State1, Q) + catch _:{?MODULE, undef} -> State1 + end, + case Result of + ok -> + State2; + {error, _Why} -> + self() ! {'$gen_event', closed}, + State2 + end; +flush_queue(#{stream_queue := _Q} = State) -> % Socket has been released. + State#{stream_queue := []}. + -spec select_lang(binary(), binary()) -> binary(). select_lang(Lang, <<"">>) -> Lang; select_lang(_, Lang) -> Lang. diff --git a/src/xmpp_stream_out.erl b/src/xmpp_stream_out.erl index a1122e05..0af1b8d9 100644 --- a/src/xmpp_stream_out.erl +++ b/src/xmpp_stream_out.erl @@ -26,7 +26,7 @@ %% API -export([start/3, start_link/3, call/3, cast/2, reply/2, connect/1, stop/1, stop_async/1, send/2, close/1, close/2, bind/2, establish/1, format_error/1, - set_timeout/2, get_transport/1, change_shaper/2]). + configure_queue/3, set_timeout/2, get_transport/1, change_shaper/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -67,6 +67,9 @@ stream_restarted := boolean(), stream_state := stream_state(), stream_remote_id => binary(), + stream_queue := [xmpp_element() | xmlel()], + stream_queue_max := non_neg_integer(), + stream_queue_timeout => {non_neg_integer(), integer()}, ip => {inet:ip_address(), inet:port_number()}, socket => xmpp_socket:socket(), socket_monitor => reference(), @@ -243,6 +246,19 @@ bind(#{stream_authenticated := true} = State, StreamFeatures) -> establish(State) -> process_stream_established(State). +-spec configure_queue(state(), non_neg_integer(), non_neg_integer()) -> state(). +configure_queue(#{owner := Owner} = State, MaxSize, MaxDelay) + when Owner == self() -> + flush_queue(State), % Support reconfiguration. + if MaxSize == 0; MaxDelay == 0 -> + State#{stream_queue_max => 0}; + true -> + State#{stream_queue_max => MaxSize, + stream_queue_timeout => {MaxDelay, current_time()}} + end; +configure_queue(_, _, _) -> + erlang:error(badarg). + -spec set_timeout(state(), timeout()) -> state(). set_timeout(#{owner := Owner} = State, Timeout) when Owner == self() -> case Timeout of @@ -318,7 +334,9 @@ init([Mod, From, To, Opts]) -> stream_verified => false, stream_authenticated => false, stream_restarted => false, - stream_state => connecting}, + stream_state => connecting, + stream_queue => [], + stream_queue_max => 0}, case try Mod:init([State, Opts]) catch _:undef -> {ok, State} end of @@ -451,6 +469,8 @@ handle_info({'$gen_event', {xmlstreamend, _}}, State) -> noreply(process_stream_end({stream, reset}, State)); handle_info({'$gen_event', closed}, State) -> noreply(process_stream_end({socket, closed}, State)); +handle_info(timeout, #{stream_queue := [_|_]} = State) -> + noreply(flush_queue(State)); handle_info(timeout, #{lang := Lang} = State) -> Disconnected = is_disconnected(State), try noreply(callback(handle_timeout, State)) @@ -936,21 +956,29 @@ send_header(#{remote_server := RemoteServer, -spec send_pkt(state(), xmpp_element() | xmlel()) -> state(). send_pkt(State, Pkt) -> - Result = socket_send(State, Pkt), - State1 = try callback(handle_send, Pkt, Result, State) - catch _:{?MODULE, undef} -> State - end, - case Result of - _ when is_record(Pkt, stream_error) -> - process_stream_end({stream, {out, Pkt}}, State1); - ok -> - State1; - {error, _Why} -> - % Queue process_stream_end instead of calling it directly, - % so we have opurtunity to process incoming queued messages before - % terminating session. - self() ! {'$gen_event', closed}, - State1 + case check_queue(State, Pkt) of + flush -> + flush_queue(State, Pkt); + queue -> + queue_pkt(State, Pkt); + noqueue -> + State1 = flush_queue(State), + Result = socket_send(State1, Pkt), + State2 = try callback(handle_send, Pkt, Result, State1) + catch _:{?MODULE, undef} -> State1 + end, + case Result of + _ when is_record(Pkt, stream_error) -> + process_stream_end({stream, {out, Pkt}}, State2); + ok -> + State2; + {error, _Why} -> + % Queue process_stream_end instead of calling it directly, + % so we have the opportunity to process incoming queued + % messages before terminating the session. + self() ! {'$gen_event', closed}, + State2 + end end. -spec send_error(state(), xmpp_element() | xmlel(), stanza_error()) -> state(). @@ -1015,6 +1043,62 @@ starttls(Socket, #{xmlns := NS, end, xmpp_socket:starttls(Socket, [connect, {sni, SNI}, {alpn, [ALPN]}|TLSOpts]). +-spec check_queue(state(), xmpp_element() | xmlel()) -> flush | queue | noqueue. +check_queue(#{stream_queue_max := 0}, _Pkt) -> + noqueue; +check_queue(#{stream_state := StreamState}, _Pkt) + when StreamState /= established-> + noqueue; +check_queue(_State, Pkt) + when not ?is_stanza(Pkt), + not is_record(Pkt, sm_a), + not is_record(Pkt, sm_r) -> + noqueue; +check_queue(#{stream_queue := Q, stream_queue_max := MaxQueue}, _Pkt) + when length(Q) >= MaxQueue -> + flush; +check_queue(_State, _Pkt) -> + queue. + +-spec queue_pkt(state(), xmpp_element() | xmlel()) -> state(). +queue_pkt(#{stream_queue := [], + stream_queue_timeout := {MSecs, _PrevTime}} = State, Pkt) -> + CurrentTime = current_time(), + State#{stream_queue := [Pkt], + stream_queue_timeout := {MSecs, CurrentTime}}; +queue_pkt(#{stream_queue := Q} = State, Pkt) -> + State#{stream_queue := [Pkt|Q]}. + +-spec flush_queue(state(), xmpp_element() | xmlel()) -> state(). +flush_queue(State, Pkt) -> + flush_queue(queue_pkt(State, Pkt)). + +-spec flush_queue(state()) -> state(). +flush_queue(#{stream_queue := []} = State) -> + State; +flush_queue(#{stream_queue := Q0, + socket := Sock, + xmlns := NS} = State0) -> + Q = lists:reverse(Q0), + Els = [xmpp:encode(Pkt, NS) || Pkt <- Q], + Result = xmpp_socket:send_elements(Sock, Els), + State1 = State0#{stream_queue := []}, + State2 = try lists:foldl( + fun(Pkt, State) -> + callback(handle_send, Pkt, Result, State) + end, State1, Q) + catch _:{?MODULE, undef} -> State1 + end, + case Result of + ok -> + State2; + {error, _Why} -> + self() ! {'$gen_event', closed}, + State2 + end; +flush_queue(#{stream_queue := _Q} = State) -> % Socket has been released. + State#{stream_queue := []}. + -spec select_lang(binary(), binary()) -> binary(). select_lang(Lang, <<"">>) -> Lang; select_lang(_, Lang) -> Lang. @@ -1071,11 +1155,20 @@ current_time() -> p1_time_compat:monotonic_time(milli_seconds). -spec get_timeout(state()) -> timeout(). -get_timeout(#{stream_timeout := ExpireTime}) -> - case ExpireTime of - infinity -> infinity; - _ -> max(0, ExpireTime - current_time()) - end. +get_timeout(State) -> + min(get_stream_timeout(State), get_queue_timeout(State)). + +-spec get_stream_timeout(state()) -> timeout(). +get_stream_timeout(#{stream_timeout := infinity}) -> + infinity; +get_stream_timeout(#{stream_timeout := ExpireTime}) -> + max(0, ExpireTime - current_time()). + +-spec get_queue_timeout(state()) -> timeout(). +get_queue_timeout(#{stream_queue := []}) -> + infinity; +get_queue_timeout(#{stream_queue_timeout := {MSecs, StartTime}}) -> + max(0, MSecs - current_time() + StartTime). %%%=================================================================== %%% State resets