Skip to content

Commit a11fc67

Browse files
authored
fix: Add specific error for increased message rates (#1453)
1 parent 51a7533 commit a11fc67

File tree

5 files changed

+44
-61
lines changed

5 files changed

+44
-61
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ This is the list of operational codes that can help you understand your deployme
227227
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
228228
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
229229
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
230+
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients as reached the channel limits |
230231
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
231232
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
232233
| DatabaseLackOfConnections | Realtime was not able to connect to the tenant's database due to not having enough available connections |

lib/realtime/tenants.ex

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,17 @@ defmodule Realtime.Tenants do
196196
event_name: [:channel, :events],
197197
measurements: %{limit: max_events_per_second},
198198
metadata: %{tenant: tenant_id}
199-
}
199+
},
200+
limit: [
201+
value: max_events_per_second,
202+
log: true,
203+
log_fn: fn ->
204+
Logger.error("MessagePerSecondRateLimitReached: Too many messages per second",
205+
external_id: tenant_id,
206+
project: tenant_id
207+
)
208+
end
209+
]
200210
]
201211

202212
%RateCounter.Args{id: events_per_second_key(tenant_id), opts: opts}

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ defmodule RealtimeWeb.RealtimeChannel do
138138
Logging.log_error(socket, "ConnectionRateLimitReached", msg)
139139

140140
{:error, :too_many_joins} ->
141-
msg = "Too many joins per second"
141+
msg = "ClientJoinRateLimitReached: Too many joins per second"
142142
{:error, %{reason: msg}}
143143

144144
{:error, :increase_connection_pool} ->
@@ -203,7 +203,6 @@ defmodule RealtimeWeb.RealtimeChannel do
203203

204204
if rate_counter.avg > max do
205205
message = "Too many messages per second"
206-
207206
shutdown_response(socket, message)
208207
else
209208
{:noreply, socket}
@@ -367,13 +366,6 @@ defmodule RealtimeWeb.RealtimeChannel do
367366
PresenceHandler.handle(payload, socket)
368367
end
369368

370-
def handle_in(_, _, %{assigns: %{rate_counter: %{avg: avg}, limits: %{max_events_per_second: max}}} = socket)
371-
when avg > max do
372-
message = "Too many messages per second"
373-
374-
shutdown_response(socket, message)
375-
end
376-
377369
def handle_in("access_token", %{"access_token" => "sb_" <> _}, socket) do
378370
{:noreply, socket}
379371
end
@@ -592,10 +584,9 @@ defmodule RealtimeWeb.RealtimeChannel do
592584
end
593585

594586
defp shutdown_response(socket, message) when is_binary(message) do
595-
%{assigns: %{channel_name: channel_name, access_token: access_token}} = socket
596-
metadata = log_metadata(access_token)
587+
%{assigns: %{channel_name: channel_name}} = socket
597588
push_system_message("system", socket, "error", message, channel_name)
598-
log_warning("ChannelShutdown", message, metadata)
589+
Logging.maybe_log_warning(socket, "ChannelShutdown", message)
599590
{:stop, :normal, socket}
600591
end
601592

@@ -753,17 +744,4 @@ defmodule RealtimeWeb.RealtimeChannel do
753744
do: {:error, :private_only},
754745
else: :ok
755746
end
756-
757-
defp log_metadata(access_token) do
758-
access_token
759-
|> Joken.peek_claims()
760-
|> then(fn
761-
{:ok, claims} -> Map.get(claims, "sub")
762-
_ -> nil
763-
end)
764-
|> then(fn
765-
nil -> []
766-
sub -> [sub: sub]
767-
end)
768-
end
769747
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.42.3",
7+
version: "2.42.4",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,21 +1204,24 @@ defmodule Realtime.Integration.RtChannelTest do
12041204
# token expires in between joins so it needs to be handled by the channel and not the socket
12051205
Process.sleep(1000)
12061206
realtime_topic = "realtime:#{topic}"
1207-
WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token})
12081207

1209-
assert_receive %Message{
1210-
event: "phx_reply",
1211-
payload: %{
1212-
"status" => "error",
1213-
"response" => %{
1214-
"reason" => "InvalidJWTToken: Token has expired 0 seconds ago"
1215-
}
1216-
},
1217-
topic: ^realtime_topic
1218-
},
1219-
500
1208+
log =
1209+
capture_log(fn ->
1210+
WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token})
1211+
1212+
assert_receive %Message{
1213+
event: "phx_reply",
1214+
payload: %{
1215+
"status" => "error",
1216+
"response" => %{"reason" => "InvalidJWTToken: Token has expired 0 seconds ago"}
1217+
},
1218+
topic: ^realtime_topic
1219+
},
1220+
500
1221+
end)
12201222

12211223
assert_receive %Message{event: "phx_close"}
1224+
assert log =~ "#{tenant.external_id}"
12221225
end
12231226

12241227
test "token loses claims in between joins", %{tenant: tenant, topic: topic} do
@@ -1759,36 +1762,27 @@ defmodule Realtime.Integration.RtChannelTest do
17591762
test "max_events_per_second limit respected", %{tenant: tenant} do
17601763
%{max_events_per_second: max_concurrent_users} = Tenants.get_tenant_by_external_id(tenant.external_id)
17611764
on_exit(fn -> change_tenant_configuration(tenant, :max_events_per_second, max_concurrent_users) end)
1762-
change_tenant_configuration(tenant, :max_events_per_second, 1)
17631765

17641766
{socket, _} = get_connection(tenant, "authenticated")
1765-
config = %{broadcast: %{self: true}, private: false}
1767+
config = %{broadcast: %{self: true}, private: false, presence: %{enabled: false}}
17661768
realtime_topic = "realtime:#{random_string()}"
17671769

17681770
WebsocketClient.join(socket, realtime_topic, %{config: config})
1771+
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500
17691772

1770-
for _ <- 1..1000 do
1771-
try do
1772-
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
1773-
catch
1774-
_, _ -> :ok
1775-
end
1776-
1777-
1..5 |> Enum.random() |> Process.sleep()
1778-
end
1773+
log =
1774+
capture_log(fn ->
1775+
for _ <- 1..1000, Process.alive?(socket) do
1776+
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
1777+
Process.sleep(10)
1778+
end
17791779

1780-
assert_receive %Message{
1781-
event: "system",
1782-
payload: %{
1783-
"status" => "error",
1784-
"extension" => "system",
1785-
"message" => "Too many messages per second"
1786-
}
1787-
},
1788-
2000
1780+
# Wait for the rate counter to run logger function
1781+
Process.sleep(1500)
1782+
end)
17891783

17901784
assert_receive %Message{event: "phx_close"}
1791-
assert_process_down(socket)
1785+
assert log =~ "MessagePerSecondRateLimitReached"
17921786
end
17931787

17941788
test "max_channels_per_client limit respected", %{tenant: tenant} do
@@ -1849,7 +1843,7 @@ defmodule Realtime.Integration.RtChannelTest do
18491843
event: "phx_reply",
18501844
payload: %{
18511845
"response" => %{
1852-
"reason" => "Too many joins per second"
1846+
"reason" => "ClientJoinRateLimitReached: Too many joins per second"
18531847
},
18541848
"status" => "error"
18551849
}

0 commit comments

Comments
 (0)