Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import Config

config :realtime,
websocket_fullsweep_after: 20,
ecto_repos: [Realtime.Repo],
version: Mix.Project.config()[:version]

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/gen_rpc/pub_sub.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ defmodule Realtime.GenRpcPubSub.Worker do
@impl true
def init(pubsub) do
Process.flag(:message_queue_data, :off_heap)
Process.flag(:fullsweep_after, 100)
Process.flag(:fullsweep_after, 20)
{:ok, pubsub}
end

Expand Down
5 changes: 5 additions & 0 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ defmodule RealtimeWeb.RealtimeChannel do
alias RealtimeWeb.RealtimeChannel.Tracker

@confirm_token_ms_interval :timer.minutes(5)
@fullsweep_after Application.compile_env!(:realtime, :websocket_fullsweep_after)

@impl true
def join("realtime:", _params, socket) do
Expand All @@ -42,6 +43,8 @@ defmodule RealtimeWeb.RealtimeChannel do
transport_pid: transport_pid
} = socket

Process.flag(:max_heap_size, max_heap_size())
Process.flag(:fullsweep_after, @fullsweep_after)
Tracker.track(socket.transport_pid)
Logger.metadata(external_id: tenant_id, project: tenant_id)
Logger.put_process_level(self(), log_level)
Expand Down Expand Up @@ -799,4 +802,6 @@ defmodule RealtimeWeb.RealtimeChannel do
end

defp maybe_replay_messages(_, _, _, _), do: {:ok, MapSet.new()}

defp max_heap_size(), do: Application.fetch_env!(:realtime, :websocket_max_heap_size)
end
4 changes: 3 additions & 1 deletion lib/realtime_web/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ defmodule RealtimeWeb.Endpoint do
signing_salt: "5OUq5X4H"
]

@fullsweep_after Application.compile_env!(:realtime, :websocket_fullsweep_after)

socket "/socket", RealtimeWeb.UserSocket,
websocket: [
connect_info: [:peer_data, :uri, :x_headers],
fullsweep_after: 20,
fullsweep_after: @fullsweep_after,
max_frame_size: 5_000_000,
# https://github.com/ninenines/cowboy/blob/24d32de931a0c985ff7939077463fc8be939f0e9/doc/src/manual/cowboy_websocket.asciidoc#L228
# active_n: The number of packets Cowboy will request from the socket at once.
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.54.0",
version: "2.54.1",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/gen_rpc_pub_sub_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ defmodule Realtime.GenRpcPubSubTest do
test "it sets fullsweep_after flag on the workers" do
assert Realtime.PubSubElixir.Realtime.PubSub.Adapter_1
|> Process.whereis()
|> Process.info(:fullsweep_after) == {:fullsweep_after, 100}
|> Process.info(:fullsweep_after) == {:fullsweep_after, 20}
end
end
17 changes: 16 additions & 1 deletion test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,27 @@ defmodule RealtimeWeb.RealtimeChannelTest do

setup :rls_context

test "max heap size is set", %{tenant: tenant} do
test "max heap size is set for both transport and channel processes", %{tenant: tenant} do
jwt = Generators.generate_jwt_token(tenant)
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))

assert Process.info(socket.transport_pid, :max_heap_size) ==
{:max_heap_size, %{error_logger: true, include_shared_binaries: false, kill: true, size: 6_250_000}}

assert {:ok, _, socket} = subscribe_and_join(socket, "realtime:test", %{})

assert Process.info(socket.channel_pid, :max_heap_size) ==
{:max_heap_size, %{error_logger: true, include_shared_binaries: false, kill: true, size: 6_250_000}}
end

# We don't test the socket because on unit tests Phoenix is not setting the fullsweep_after config
test "fullsweep_after is set on channel process", %{tenant: tenant} do
jwt = Generators.generate_jwt_token(tenant)
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant, jwt))

assert {:ok, _, socket} = subscribe_and_join(socket, "realtime:test", %{})

assert Process.info(socket.channel_pid, :fullsweep_after) == {:fullsweep_after, 20}
end

describe "broadcast" do
Expand Down