From 3a007946281e0cb94fbc2813b50da144ba6791ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Wed, 9 Jul 2025 12:11:34 +0100 Subject: [PATCH 1/3] fix: Add specific error for increased message rates Add specific error for increased message rate errors --- README.md | 1 + lib/realtime/tenants.ex | 12 +++- lib/realtime_web/channels/realtime_channel.ex | 21 +------ test/integration/rt_channel_test.exs | 63 +++++++++---------- 4 files changed, 44 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index e62227f84..5fbbfb585 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,7 @@ This is the list of operational codes that can help you understand your deployme | ChannelRateLimitReached | The number of channels you can create has reached its limit | | ConnectionRateLimitReached | The number of connected clients as reached its limit | | ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits | +| MessagePerSecondRateLimitReached | The rate of messages per second from your clients as reached the channel limits | | RealtimeDisabledForTenant | Realtime has been disabled for the tenant | | UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database | | DatabaseLackOfConnections | Realtime was not able to connect to the tenant's database due to not having enough available connections | diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 4c5fdcf7a..550492450 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -196,7 +196,17 @@ defmodule Realtime.Tenants do event_name: [:channel, :events], measurements: %{limit: max_events_per_second}, metadata: %{tenant: tenant_id} - } + }, + limit: [ + value: max_events_per_second, + log: true, + log_fn: fn -> + Logger.error("MessagePerSecondRateLimitReached: Too many messages per second", + external_id: tenant_id, + project: tenant_id + ) + end + ] ] %RateCounter.Args{id: events_per_second_key(tenant_id), opts: opts} diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 0a0762627..ccbd7c5de 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -203,7 +203,6 @@ defmodule RealtimeWeb.RealtimeChannel do if rate_counter.avg > max do message = "Too many messages per second" - shutdown_response(socket, message) else {:noreply, socket} @@ -370,7 +369,7 @@ defmodule RealtimeWeb.RealtimeChannel do def handle_in(_, _, %{assigns: %{rate_counter: %{avg: avg}, limits: %{max_events_per_second: max}}} = socket) when avg > max do message = "Too many messages per second" - + Logging.log_error(socket, "MessagePerSecondRateLimitReached", message) shutdown_response(socket, message) end @@ -592,10 +591,9 @@ defmodule RealtimeWeb.RealtimeChannel do end defp shutdown_response(socket, message) when is_binary(message) do - %{assigns: %{channel_name: channel_name, access_token: access_token}} = socket - metadata = log_metadata(access_token) + %{assigns: %{channel_name: channel_name}} = socket push_system_message("system", socket, "error", message, channel_name) - log_warning("ChannelShutdown", message, metadata) + Logging.maybe_log_warning(socket, "ChannelShutdown", message) {:stop, :normal, socket} end @@ -753,17 +751,4 @@ defmodule RealtimeWeb.RealtimeChannel do do: {:error, :private_only}, else: :ok end - - defp log_metadata(access_token) do - access_token - |> Joken.peek_claims() - |> then(fn - {:ok, claims} -> Map.get(claims, "sub") - _ -> nil - end) - |> then(fn - nil -> [] - sub -> [sub: sub] - end) - end end diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index f06afdba5..16569c746 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -1204,21 +1204,25 @@ defmodule Realtime.Integration.RtChannelTest do # token expires in between joins so it needs to be handled by the channel and not the socket Process.sleep(1000) realtime_topic = "realtime:#{topic}" - WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token}) - assert_receive %Message{ - event: "phx_reply", - payload: %{ - "status" => "error", - "response" => %{ - "reason" => "InvalidJWTToken: Token has expired 0 seconds ago" - } - }, - topic: ^realtime_topic - }, - 500 + log = + capture_log(fn -> + WebsocketClient.join(socket, realtime_topic, %{config: config, access_token: access_token}) + + assert_receive %Message{ + event: "phx_reply", + payload: %{ + "status" => "error", + "response" => %{"reason" => "InvalidJWTToken: Token has expired 0 seconds ago"} + }, + topic: ^realtime_topic + }, + 500 + end) assert_receive %Message{event: "phx_close"} + assert log =~ "#{tenant.external_id}" + assert log =~ "#{tenant.external_id}" end test "token loses claims in between joins", %{tenant: tenant, topic: topic} do @@ -1759,36 +1763,27 @@ defmodule Realtime.Integration.RtChannelTest do test "max_events_per_second limit respected", %{tenant: tenant} do %{max_events_per_second: max_concurrent_users} = Tenants.get_tenant_by_external_id(tenant.external_id) on_exit(fn -> change_tenant_configuration(tenant, :max_events_per_second, max_concurrent_users) end) - change_tenant_configuration(tenant, :max_events_per_second, 1) {socket, _} = get_connection(tenant, "authenticated") - config = %{broadcast: %{self: true}, private: false} + config = %{broadcast: %{self: true}, private: false, presence: %{enabled: false}} realtime_topic = "realtime:#{random_string()}" WebsocketClient.join(socket, realtime_topic, %{config: config}) + assert_receive %Message{event: "phx_reply", payload: %{"status" => "ok"}, topic: ^realtime_topic}, 500 - for _ <- 1..1000 do - try do - WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{}) - catch - _, _ -> :ok - end - - 1..5 |> Enum.random() |> Process.sleep() - end + log = + capture_log(fn -> + for _ <- 1..1000, Process.alive?(socket) do + WebsocketClient.send_event(socket, realtime_topic, "broadcast", %{}) + Process.sleep(10) + end - assert_receive %Message{ - event: "system", - payload: %{ - "status" => "error", - "extension" => "system", - "message" => "Too many messages per second" - } - }, - 2000 + # Wait for the rate counter to run logger function + Process.sleep(1500) + end) assert_receive %Message{event: "phx_close"} - assert_process_down(socket) + assert log =~ "MessagePerSecondRateLimitReached" end test "max_channels_per_client limit respected", %{tenant: tenant} do @@ -1849,7 +1844,7 @@ defmodule Realtime.Integration.RtChannelTest do event: "phx_reply", payload: %{ "response" => %{ - "reason" => "Too many joins per second" + "reason" => "ClientJoinRateLimitReached: Too many joins per second" }, "status" => "error" } From f8d9308c44ef9082cdb8efd6441d6e2af615297a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Thu, 14 Aug 2025 12:47:01 +0100 Subject: [PATCH 2/3] PR feedback --- lib/realtime_web/channels/realtime_channel.ex | 7 ------- mix.exs | 2 +- test/integration/rt_channel_test.exs | 1 - 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index ccbd7c5de..92cd6ffae 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -366,13 +366,6 @@ defmodule RealtimeWeb.RealtimeChannel do PresenceHandler.handle(payload, socket) end - def handle_in(_, _, %{assigns: %{rate_counter: %{avg: avg}, limits: %{max_events_per_second: max}}} = socket) - when avg > max do - message = "Too many messages per second" - Logging.log_error(socket, "MessagePerSecondRateLimitReached", message) - shutdown_response(socket, message) - end - def handle_in("access_token", %{"access_token" => "sb_" <> _}, socket) do {:noreply, socket} end diff --git a/mix.exs b/mix.exs index 5c8d61d3a..71987d87e 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.42.3", + version: "2.42.4", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 16569c746..a9fbc8430 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -1222,7 +1222,6 @@ defmodule Realtime.Integration.RtChannelTest do assert_receive %Message{event: "phx_close"} assert log =~ "#{tenant.external_id}" - assert log =~ "#{tenant.external_id}" end test "token loses claims in between joins", %{tenant: tenant, topic: topic} do From 48b7355bf3a61a29292ea29ac8cab084644d17f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Mon, 18 Aug 2025 19:11:04 +0100 Subject: [PATCH 3/3] update error message --- lib/realtime_web/channels/realtime_channel.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 92cd6ffae..22dd21979 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -138,7 +138,7 @@ defmodule RealtimeWeb.RealtimeChannel do Logging.log_error(socket, "ConnectionRateLimitReached", msg) {:error, :too_many_joins} -> - msg = "Too many joins per second" + msg = "ClientJoinRateLimitReached: Too many joins per second" {:error, %{reason: msg}} {:error, :increase_connection_pool} ->