Skip to content

Commit b7c2458

Browse files
committed
fix: Add specific error for increased message rates
Add specific error for increased message rate errors
1 parent 9686617 commit b7c2458

File tree

5 files changed

+45
-54
lines changed

5 files changed

+45
-54
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ This is the list of operational codes that can help you understand your deployme
227227
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
228228
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
229229
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
230+
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients as reached the channel limits |
230231
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
231232
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
232233
| DatabaseLackOfConnections | Realtime was not able to connect to the tenant's database due to not having enough available connections |

lib/realtime/tenants.ex

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,17 @@ defmodule Realtime.Tenants do
196196
event_name: [:channel, :events],
197197
measurements: %{limit: max_events_per_second},
198198
metadata: %{tenant: tenant_id}
199-
}
199+
},
200+
limit: [
201+
value: max_events_per_second,
202+
log: true,
203+
log_fn: fn ->
204+
Logger.error("MessagePerSecondRateLimitReached: Too many messages per second",
205+
external_id: tenant_id,
206+
project: tenant_id
207+
)
208+
end
209+
]
200210
]
201211

202212
%RateCounter.Args{id: events_per_second_key(tenant_id), opts: opts}

lib/realtime_web/channels/realtime_channel.ex

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ defmodule RealtimeWeb.RealtimeChannel do
203203

204204
if rate_counter.avg > max do
205205
message = "Too many messages per second"
206-
207206
shutdown_response(socket, message)
208207
else
209208
{:noreply, socket}
@@ -370,7 +369,7 @@ defmodule RealtimeWeb.RealtimeChannel do
370369
def handle_in(_, _, %{assigns: %{rate_counter: %{avg: avg}, limits: %{max_events_per_second: max}}} = socket)
371370
when avg > max do
372371
message = "Too many messages per second"
373-
372+
Logging.log_error(socket, "MessagePerSecondRateLimitReached", message)
374373
shutdown_response(socket, message)
375374
end
376375

@@ -592,10 +591,9 @@ defmodule RealtimeWeb.RealtimeChannel do
592591
end
593592

594593
defp shutdown_response(socket, message) when is_binary(message) do
595-
%{assigns: %{channel_name: channel_name, access_token: access_token}} = socket
596-
metadata = log_metadata(access_token)
594+
%{assigns: %{channel_name: channel_name}} = socket
597595
push_system_message("system", socket, "error", message, channel_name)
598-
log_warning("ChannelShutdown", message, metadata)
596+
Logging.maybe_log_warning(socket, "ChannelShutdown", message)
599597
{:stop, :normal, socket}
600598
end
601599

@@ -753,17 +751,4 @@ defmodule RealtimeWeb.RealtimeChannel do
753751
do: {:error, :private_only},
754752
else: :ok
755753
end
756-
757-
defp log_metadata(access_token) do
758-
access_token
759-
|> Joken.peek_claims()
760-
|> then(fn
761-
{:ok, claims} -> Map.get(claims, "sub")
762-
_ -> nil
763-
end)
764-
|> then(fn
765-
nil -> []
766-
sub -> [sub: sub]
767-
end)
768-
end
769754
end

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.41.25",
7+
version: "2.41.26",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,21 +1204,25 @@ defmodule Realtime.Integration.RtChannelTest do
12041204
# token expires in between joins so it needs to be handled by the channel and not the socket
12051205
Process.sleep(1000)
12061206
realtime_topic = "realtime:#{topic}"
1207-
WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token})
12081207

1209-
assert_receive %Message{
1210-
event: "phx_reply",
1211-
payload: %{
1212-
"status" => "error",
1213-
"response" => %{
1214-
"reason" => "InvalidJWTToken: Token has expired 0 seconds ago"
1215-
}
1216-
},
1217-
topic: ^realtime_topic
1218-
},
1219-
500
1208+
log =
1209+
capture_log(fn ->
1210+
WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token})
1211+
1212+
assert_receive %Message{
1213+
event: "phx_reply",
1214+
payload: %{
1215+
"status" => "error",
1216+
"response" => %{"reason" => "InvalidJWTToken: Token has expired 0 seconds ago"}
1217+
},
1218+
topic: ^realtime_topic
1219+
},
1220+
500
1221+
end)
12201222

12211223
assert_receive %Message{event: "phx_close"}
1224+
assert log =~ "#{tenant.external_id}"
1225+
assert log =~ "#{tenant.external_id}"
12221226
end
12231227

12241228
test "token loses claims in between joins", %{tenant: tenant, topic: topic} do
@@ -1759,36 +1763,27 @@ defmodule Realtime.Integration.RtChannelTest do
17591763
test "max_events_per_second limit respected", %{tenant: tenant} do
17601764
%{max_events_per_second: max_concurrent_users} = Tenants.get_tenant_by_external_id(tenant.external_id)
17611765
on_exit(fn -> change_tenant_configuration(tenant, :max_events_per_second, max_concurrent_users) end)
1762-
change_tenant_configuration(tenant, :max_events_per_second, 1)
17631766

17641767
{socket, _} = get_connection(tenant, "authenticated")
1765-
config = %{broadcast: %{self: true}, private: false}
1768+
config = %{broadcast: %{self: true}, private: false, presence: %{enabled: false}}
17661769
realtime_topic = "realtime:#{random_string()}"
17671770

17681771
WebsocketClient.join(socket, realtime_topic, %{config: config})
1772+
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500
17691773

1770-
for _ <- 1..1000 do
1771-
try do
1772-
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
1773-
catch
1774-
_, _ -> :ok
1775-
end
1776-
1777-
1..5 |> Enum.random() |> Process.sleep()
1778-
end
1774+
log =
1775+
capture_log(fn ->
1776+
for _ <- 1..1000, Process.alive?(socket) do
1777+
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
1778+
Process.sleep(10)
1779+
end
17791780

1780-
assert_receive %Message{
1781-
event: "system",
1782-
payload: %{
1783-
"status" => "error",
1784-
"extension" => "system",
1785-
"message" => "Too many messages per second"
1786-
}
1787-
},
1788-
2000
1781+
# Wait for the rate counter to run logger function
1782+
Process.sleep(1500)
1783+
end)
17891784

17901785
assert_receive %Message{event: "phx_close"}
1791-
assert_process_down(socket)
1786+
assert log =~ "MessagePerSecondRateLimitReached"
17921787
end
17931788

17941789
test "max_channels_per_client limit respected", %{tenant: tenant} do
@@ -1849,7 +1844,7 @@ defmodule Realtime.Integration.RtChannelTest do
18491844
event: "phx_reply",
18501845
payload: %{
18511846
"response" => %{
1852-
"reason" => "Too many joins per second"
1847+
"reason" => "ClientJoinRateLimitReached: Too many joins per second"
18531848
},
18541849
"status" => "error"
18551850
}

0 commit comments

Comments
 (0)