diff --git a/config/config.exs b/config/config.exs index cada8230f..7879c2d7a 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,6 +8,7 @@ import Config config :realtime, + websocket_fullsweep_after: 20, ecto_repos: [Realtime.Repo], version: Mix.Project.config()[:version] diff --git a/lib/realtime/gen_rpc/pub_sub.ex b/lib/realtime/gen_rpc/pub_sub.ex index 3ba9e053a..00bd127c1 100644 --- a/lib/realtime/gen_rpc/pub_sub.ex +++ b/lib/realtime/gen_rpc/pub_sub.ex @@ -67,7 +67,7 @@ defmodule Realtime.GenRpcPubSub.Worker do @impl true def init(pubsub) do Process.flag(:message_queue_data, :off_heap) - Process.flag(:fullsweep_after, 100) + Process.flag(:fullsweep_after, 20) {:ok, pubsub} end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 104d9a077..255b6edfd 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -28,6 +28,7 @@ defmodule RealtimeWeb.RealtimeChannel do alias RealtimeWeb.RealtimeChannel.Tracker @confirm_token_ms_interval :timer.minutes(5) + @fullsweep_after Application.compile_env!(:realtime, :websocket_fullsweep_after) @impl true def join("realtime:", _params, socket) do @@ -42,6 +43,8 @@ defmodule RealtimeWeb.RealtimeChannel do transport_pid: transport_pid } = socket + Process.flag(:max_heap_size, max_heap_size()) + Process.flag(:fullsweep_after, @fullsweep_after) Tracker.track(socket.transport_pid) Logger.metadata(external_id: tenant_id, project: tenant_id) Logger.put_process_level(self(), log_level) @@ -799,4 +802,6 @@ defmodule RealtimeWeb.RealtimeChannel do end defp maybe_replay_messages(_, _, _, _), do: {:ok, MapSet.new()} + + defp max_heap_size(), do: Application.fetch_env!(:realtime, :websocket_max_heap_size) end diff --git a/lib/realtime_web/endpoint.ex b/lib/realtime_web/endpoint.ex index 894911803..dd91e7664 100644 --- a/lib/realtime_web/endpoint.ex +++ b/lib/realtime_web/endpoint.ex @@ -11,10 +11,12 @@ defmodule RealtimeWeb.Endpoint do signing_salt: "5OUq5X4H" ] + @fullsweep_after Application.compile_env!(:realtime, :websocket_fullsweep_after) + socket "/socket", RealtimeWeb.UserSocket, websocket: [ connect_info: [:peer_data, :uri, :x_headers], - fullsweep_after: 20, + fullsweep_after: @fullsweep_after, max_frame_size: 5_000_000, # https://github.com/ninenines/cowboy/blob/24d32de931a0c985ff7939077463fc8be939f0e9/doc/src/manual/cowboy_websocket.asciidoc#L228 # active_n: The number of packets Cowboy will request from the socket at once. diff --git a/mix.exs b/mix.exs index 6d47591de..301b2fcd1 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.54.0", + version: "2.54.1", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/gen_rpc_pub_sub_test.exs b/test/realtime/gen_rpc_pub_sub_test.exs index 517c6c369..f86a98a73 100644 --- a/test/realtime/gen_rpc_pub_sub_test.exs +++ b/test/realtime/gen_rpc_pub_sub_test.exs @@ -13,6 +13,6 @@ defmodule Realtime.GenRpcPubSubTest do test "it sets fullsweep_after flag on the workers" do assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1 |> Process.whereis() - |> Process.info(:fullsweep_after) == {:fullsweep_after, 100} + |> Process.info(:fullsweep_after) == {:fullsweep_after, 20} end end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 8022d6ebd..055516e64 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -28,12 +28,27 @@ defmodule RealtimeWeb.RealtimeChannelTest do setup :rls_context - test "max heap size is set", %{tenant: tenant} do + test "max heap size is set for both transport and channel processes", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt)) assert Process.info(socket.transport_pid, :max_heap_size) == {:max_heap_size, %{error_logger: true, include_shared_binaries: false, kill: true, size: 6_250_000}} + + assert {:ok, _, socket} = subscribe_and_join(socket, "realtime:test", %{}) + + assert Process.info(socket.channel_pid, :max_heap_size) == + {:max_heap_size, %{error_logger: true, include_shared_binaries: false, kill: true, size: 6_250_000}} + end + + # We don't test the socket because on unit tests Phoenix is not setting the fullsweep_after config + test "fullsweep_after is set on channel process", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt)) + + assert {:ok, _, socket} = subscribe_and_join(socket, "realtime:test", %{}) + + assert Process.info(socket.channel_pid, :fullsweep_after) == {:fullsweep_after, 20} end describe "broadcast" do