Skip to content

fix: Add specific error for increased message rates #1453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ This is the list of operational codes that can help you understand your deployme
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
| MessagePerSecondRateLimitReached | The rate of messages per second from your clients as reached the channel limits |
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
| DatabaseLackOfConnections | Realtime was not able to connect to the tenant's database due to not having enough available connections |
Expand Down
12 changes: 11 additions & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,17 @@ defmodule Realtime.Tenants do
event_name: [:channel, :events],
measurements: %{limit: max_events_per_second},
metadata: %{tenant: tenant_id}
}
},
limit: [
value: max_events_per_second,
log: true,
log_fn: fn ->
Logger.error("MessagePerSecondRateLimitReached: Too many messages per second",
external_id: tenant_id,
project: tenant_id
)
end
]
]

%RateCounter.Args{id: events_per_second_key(tenant_id), opts: opts}
Expand Down
28 changes: 3 additions & 25 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ defmodule RealtimeWeb.RealtimeChannel do
Logging.log_error(socket, "ConnectionRateLimitReached", msg)

{:error, :too_many_joins} ->
msg = "Too many joins per second"
msg = "ClientJoinRateLimitReached: Too many joins per second"
{:error, %{reason: msg}}

{:error, :increase_connection_pool} ->
Expand Down Expand Up @@ -203,7 +203,6 @@ defmodule RealtimeWeb.RealtimeChannel do

if rate_counter.avg > max do
message = "Too many messages per second"

shutdown_response(socket, message)
else
{:noreply, socket}
Expand Down Expand Up @@ -367,13 +366,6 @@ defmodule RealtimeWeb.RealtimeChannel do
PresenceHandler.handle(payload, socket)
end

def handle_in(_, _, %{assigns: %{rate_counter: %{avg: avg}, limits: %{max_events_per_second: max}}} = socket)
when avg > max do
message = "Too many messages per second"

shutdown_response(socket, message)
end

def handle_in("access_token", %{"access_token" => "sb_" <> _}, socket) do
{:noreply, socket}
end
Expand Down Expand Up @@ -592,10 +584,9 @@ defmodule RealtimeWeb.RealtimeChannel do
end

defp shutdown_response(socket, message) when is_binary(message) do
%{assigns: %{channel_name: channel_name, access_token: access_token}} = socket
metadata = log_metadata(access_token)
%{assigns: %{channel_name: channel_name}} = socket
push_system_message("system", socket, "error", message, channel_name)
log_warning("ChannelShutdown", message, metadata)
Logging.maybe_log_warning(socket, "ChannelShutdown", message)
{:stop, :normal, socket}
end

Expand Down Expand Up @@ -753,17 +744,4 @@ defmodule RealtimeWeb.RealtimeChannel do
do: {:error, :private_only},
else: :ok
end

defp log_metadata(access_token) do
access_token
|> Joken.peek_claims()
|> then(fn
{:ok, claims} -> Map.get(claims, "sub")
_ -> nil
end)
|> then(fn
nil -> []
sub -> [sub: sub]
end)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.42.3",
version: "2.42.4",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
62 changes: 28 additions & 34 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1204,21 +1204,24 @@ defmodule Realtime.Integration.RtChannelTest do
# token expires in between joins so it needs to be handled by the channel and not the socket
Process.sleep(1000)
realtime_topic = "realtime:#{topic}"
WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token})

assert_receive %Message{
event: "phx_reply",
payload: %{
"status" => "error",
"response" => %{
"reason" => "InvalidJWTToken: Token has expired 0 seconds ago"
}
},
topic: ^realtime_topic
},
500
log =
capture_log(fn ->
WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token})

assert_receive %Message{
event: "phx_reply",
payload: %{
"status" => "error",
"response" => %{"reason" => "InvalidJWTToken: Token has expired 0 seconds ago"}
},
topic: ^realtime_topic
},
500
end)

assert_receive %Message{event: "phx_close"}
assert log =~ "#{tenant.external_id}"
end

test "token loses claims in between joins", %{tenant: tenant, topic: topic} do
Expand Down Expand Up @@ -1759,36 +1762,27 @@ defmodule Realtime.Integration.RtChannelTest do
test "max_events_per_second limit respected", %{tenant: tenant} do
%{max_events_per_second: max_concurrent_users} = Tenants.get_tenant_by_external_id(tenant.external_id)
on_exit(fn -> change_tenant_configuration(tenant, :max_events_per_second, max_concurrent_users) end)
change_tenant_configuration(tenant, :max_events_per_second, 1)

{socket, _} = get_connection(tenant, "authenticated")
config = %{broadcast: %{self: true}, private: false}
config = %{broadcast: %{self: true}, private: false, presence: %{enabled: false}}
realtime_topic = "realtime:#{random_string()}"

WebsocketClient.join(socket, realtime_topic, %{config: config})
assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500

for _ <- 1..1000 do
try do
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
catch
_, _ -> :ok
end

1..5 |> Enum.random() |> Process.sleep()
end
log =
capture_log(fn ->
for _ <- 1..1000, Process.alive?(socket) do
WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{})
Process.sleep(10)
end

assert_receive %Message{
event: "system",
payload: %{
"status" => "error",
"extension" => "system",
"message" => "Too many messages per second"
}
},
2000
# Wait for the rate counter to run logger function
Process.sleep(1500)
end)

assert_receive %Message{event: "phx_close"}
assert_process_down(socket)
assert log =~ "MessagePerSecondRateLimitReached"
end

test "max_channels_per_client limit respected", %{tenant: tenant} do
Expand Down Expand Up @@ -1849,7 +1843,7 @@ defmodule Realtime.Integration.RtChannelTest do
event: "phx_reply",
payload: %{
"response" => %{
"reason" => "Too many joins per second"
"reason" => "ClientJoinRateLimitReached: Too many joins per second"
},
"status" => "error"
}
Expand Down
Loading