Skip to content

Commit 321a511

Browse files
authored
fix: update postgrex to 0.20.0 (#1474)
1 parent b9aa992 commit 321a511

File tree

7 files changed

+87
-57
lines changed

7 files changed

+87
-57
lines changed

lib/realtime/tenants/replication_connection.ex

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,20 @@ defmodule Realtime.Tenants.ReplicationConnection do
102102
}
103103

104104
case DynamicSupervisor.start_child(supervisor_spec, child_spec) do
105-
{:ok, pid} -> {:ok, pid}
106-
{:error, {:already_started, pid}} -> {:ok, pid}
107-
{:error, {:bad_return_from_init, {:stop, error, _}}} -> {:error, error}
108-
{:error, %Postgrex.Error{postgres: %{pg_code: "53300"}}} -> {:error, :max_wal_senders_reached}
109-
error -> error
105+
{:ok, pid} ->
106+
{:ok, pid}
107+
108+
{:error, {:already_started, pid}} ->
109+
{:ok, pid}
110+
111+
{:error, {:bad_return_from_init, {:stop, error, _}}} ->
112+
{:error, error}
113+
114+
{:error, %Postgrex.Error{postgres: %{pg_code: pg_code}}} when pg_code in ~w(53300 53400) ->
115+
{:error, :max_wal_senders_reached}
116+
117+
error ->
118+
error
110119
end
111120
end
112121

@@ -215,7 +224,6 @@ defmodule Realtime.Tenants.ReplicationConnection do
215224
{:query, "SELECT 1", %{state | step: :start_replication_slot}}
216225
end
217226

218-
@impl true
219227
def handle_result([%Postgrex.Result{}], %__MODULE__{step: :start_replication_slot} = state) do
220228
%__MODULE__{
221229
proto_version: proto_version,
@@ -233,6 +241,11 @@ defmodule Realtime.Tenants.ReplicationConnection do
233241
{:stream, query, [], %{state | step: :streaming}}
234242
end
235243

244+
# %Postgrex.Error{message: nil, postgres: %{code: :configuration_limit_exceeded, line: "291", message: "all replication slots are in use", file: "slot.c", unknown: "ERROR", severity: "ERROR", hint: "Free one or increase max_replication_slots.", routine: "ReplicationSlotCreate", pg_code: "53400"}, connection_id: 217538, query: nil}
245+
def handle_result(%Postgrex.Error{postgres: %{pg_code: pg_code}}, _state) when pg_code in ~w(53300 53400) do
246+
{:disconnect, :max_wal_senders_reached}
247+
end
248+
236249
def handle_result(%Postgrex.Error{postgres: %{message: message}}, _state) do
237250
{:disconnect, "Error starting replication: #{message}"}
238251
end

lib/realtime/tenants/repo/migrations/20240523004032_redefine_authorization_tables.ex

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ defmodule Realtime.Tenants.Migrations.RedefineAuthorizationTables do
44
use Ecto.Migration
55

66
def change do
7-
drop table(:broadcasts, mode: :cascade)
8-
drop table(:presences, mode: :cascade)
9-
drop table(:channels, mode: :cascade)
7+
drop table(:broadcasts), mode: :cascade
8+
drop table(:presences), mode: :cascade
9+
drop table(:channels), mode: :cascade
1010

1111
create_if_not_exists table(:messages) do
1212
add :topic, :text, null: false

mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.41.21",
7+
version: "2.41.22",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,
@@ -57,7 +57,7 @@ defmodule Realtime.MixProject do
5757
{:phoenix_ecto, "~> 4.4.0"},
5858
{:ecto_sql, "~> 3.11"},
5959
{:ecto_psql_extras, "~> 0.8"},
60-
{:postgrex, "~> 0.19.0"},
60+
{:postgrex, "~> 0.20.0"},
6161
{:phoenix_html, "~> 3.2"},
6262
{:phoenix_live_view, "~> 0.18"},
6363
{:phoenix_live_reload, "~> 1.2", only: :dev},

mix.lock

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
1313
"credo": {:hex, :credo, "1.7.11", "d3e805f7ddf6c9c854fd36f089649d7cf6ba74c42bc3795d587814e3c9847102", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "56826b4306843253a66e47ae45e98e7d284ee1f95d53d1612bb483f88a8cf219"},
1414
"ctx": {:hex, :ctx, "0.6.0", "8ff88b70e6400c4df90142e7f130625b82086077a45364a78d208ed3ed53c7fe", [:rebar3], [], "hexpm", "a14ed2d1b67723dbebbe423b28d7615eb0bdcba6ff28f2d1f1b0a7e1d4aa5fc2"},
15-
"db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"},
15+
"db_connection": {:hex, :db_connection, "2.8.0", "64fd82cfa6d8e25ec6660cea73e92a4cbc6a18b31343910427b702838c4b33b2", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "008399dae5eee1bf5caa6e86d204dcb44242c82b1ed5e22c881f2c34da201b15"},
1616
"decimal": {:hex, :decimal, "2.3.0", "3ad6255aa77b4a3c4f818171b12d237500e63525c2fd056699967a3e7ea20f62", [:mix], [], "hexpm", "a4d66355cb29cb47c3cf30e71329e58361cfcb37c34235ef3bf1d7bf3773aeac"},
1717
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
1818
"dialyxir": {:hex, :dialyxir, "1.4.5", "ca1571ac18e0f88d4ab245f0b60fa31ff1b12cbae2b11bd25d207f865e8ae78a", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "b0fb08bb8107c750db5c0b324fa2df5ceaa0f9307690ee3c1f6ba5b9eb5d35c3"},
19-
"ecto": {:hex, :ecto, "3.11.2", "e1d26be989db350a633667c5cda9c3d115ae779b66da567c68c80cfb26a8c9ee", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3c38bca2c6f8d8023f2145326cc8a80100c3ffe4dcbd9842ff867f7fc6156c65"},
20-
"ecto_psql_extras": {:hex, :ecto_psql_extras, "0.8.3", "0c1df205bd051eaf599b3671e75356b121aa71eac09b63ecf921cb1a080c072e", [:mix], [{:ecto_sql, "~> 3.7", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:postgrex, "> 0.16.0 and < 0.20.0", [hex: :postgrex, repo: "hexpm", optional: false]}, {:table_rex, "~> 3.1.1 or ~> 4.0.0", [hex: :table_rex, repo: "hexpm", optional: false]}], "hexpm", "d0e35ea160359e759a2993a00c3a5389a9ca7ece6df5d0753fa927f988c7351a"},
21-
"ecto_sql": {:hex, :ecto_sql, "3.11.3", "4eb7348ff8101fbc4e6bbc5a4404a24fecbe73a3372d16569526b0cf34ebc195", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5f36e3d736b99c7fee3e631333b8394ade4bafe9d96d35669fca2d81c2be928"},
19+
"ecto": {:hex, :ecto, "3.13.2", "7d0c0863f3fc8d71d17fc3ad3b9424beae13f02712ad84191a826c7169484f01", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "669d9291370513ff56e7b7e7081b7af3283d02e046cf3d403053c557894a0b3e"},
20+
"ecto_psql_extras": {:hex, :ecto_psql_extras, "0.8.8", "aa02529c97f69aed5722899f5dc6360128735a92dd169f23c5d50b1f7fdede08", [:mix], [{:ecto_sql, "~> 3.7", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:postgrex, "> 0.16.0", [hex: :postgrex, repo: "hexpm", optional: false]}, {:table_rex, "~> 3.1.1 or ~> 4.0", [hex: :table_rex, repo: "hexpm", optional: false]}], "hexpm", "04c63d92b141723ad6fed2e60a4b461ca00b3594d16df47bbc48f1f4534f2c49"},
21+
"ecto_sql": {:hex, :ecto_sql, "3.13.2", "a07d2461d84107b3d037097c822ffdd36ed69d1cf7c0f70e12a3d1decf04e2e1", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.13.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.7", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.19 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "539274ab0ecf1a0078a6a72ef3465629e4d6018a3028095dc90f60a19c371717"},
2222
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
2323
"esbuild": {:hex, :esbuild, "0.8.2", "5f379dfa383ef482b738e7771daf238b2d1cfb0222bef9d3b20d4c8f06c7a7ac", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "558a8a08ed78eb820efbfda1de196569d8bfa9b51e8371a1934fbb31345feda7"},
2424
"eternal": {:hex, :eternal, "1.2.2", "d1641c86368de99375b98d183042dd6c2b234262b8d08dfd72b9eeaafc2a1abd", [:mix], [], "hexpm", "2c9fe32b9c3726703ba5e1d43a1d255a4f3f2d8f8f9bc19f094c7cb1a7a9e782"},
@@ -79,8 +79,8 @@
7979
"plug_cowboy": {:hex, :plug_cowboy, "2.7.2", "fdadb973799ae691bf9ecad99125b16625b1c6039999da5fe544d99218e662e4", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "245d8a11ee2306094840c000e8816f0cbed69a23fc0ac2bcf8d7835ae019bb2f"},
8080
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
8181
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
82-
"postgres_replication": {:git, "https://github.com/filipecabaco/postgres_replication.git", "951ff8b2d114504d0819b4cffaa075bee5bd66a5", []},
83-
"postgrex": {:hex, :postgrex, "0.19.3", "a0bda6e3bc75ec07fca5b0a89bffd242ca209a4822a9533e7d3e84ee80707e19", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d31c28053655b78f47f948c85bb1cf86a9c1f8ead346ba1aa0d0df017fa05b61"},
82+
"postgres_replication": {:git, "https://github.com/filipecabaco/postgres_replication.git", "69129221f0263aa13faa5fbb8af97c28aeb4f71c", []},
83+
"postgrex": {:hex, :postgrex, "0.20.0", "363ed03ab4757f6bc47942eff7720640795eb557e1935951c1626f0d303a3aed", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "d36ef8b36f323d29505314f704e21a1a038e2dc387c6409ee0cd24144e187c0f"},
8484
"prom_ex": {:hex, :prom_ex, "1.9.0", "63e6dda6c05cdeec1f26c48443dcc38ffd2118b3665ae8d2bd0e5b79f2aea03e", [:mix], [{:absinthe, ">= 1.6.0", [hex: :absinthe, repo: "hexpm", optional: true]}, {:broadway, ">= 1.0.2", [hex: :broadway, repo: "hexpm", optional: true]}, {:ecto, ">= 3.5.0", [hex: :ecto, repo: "hexpm", optional: true]}, {:finch, "~> 0.15", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:oban, ">= 2.4.0", [hex: :oban, repo: "hexpm", optional: true]}, {:octo_fetch, "~> 0.3", [hex: :octo_fetch, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, ">= 0.14.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, ">= 1.12.1", [hex: :plug, repo: "hexpm", optional: true]}, {:plug_cowboy, "~> 2.5", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry, ">= 1.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}, {:telemetry_poller, "~> 1.0", [hex: :telemetry_poller, repo: "hexpm", optional: false]}], "hexpm", "01f3d4f69ec93068219e686cc65e58a29c42bea5429a8ff4e2121f19db178ee6"},
8585
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
8686
"recon": {:hex, :recon, "2.5.6", "9052588e83bfedfd9b72e1034532aee2a5369d9d9343b61aeb7fbce761010741", [:mix, :rebar3], [], "hexpm", "96c6799792d735cc0f0fd0f86267e9d351e63339cbe03df9d162010cefc26bb0"},
@@ -91,7 +91,7 @@
9191
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
9292
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
9393
"syn": {:hex, :syn, "3.3.0", "4684a909efdfea35ce75a9662fc523e4a8a4e8169a3df275e4de4fa63f99c486", [:rebar3], [], "hexpm", "e58ee447bc1094bdd21bf0acc102b1fbf99541a508cd48060bf783c245eaf7d6"},
94-
"table_rex": {:hex, :table_rex, "4.0.0", "3c613a68ebdc6d4d1e731bc973c233500974ec3993c99fcdabb210407b90959b", [:mix], [], "hexpm", "c35c4d5612ca49ebb0344ea10387da4d2afe278387d4019e4d8111e815df8f55"},
94+
"table_rex": {:hex, :table_rex, "4.1.0", "fbaa8b1ce154c9772012bf445bfb86b587430fb96f3b12022d3f35ee4a68c918", [:mix], [], "hexpm", "95932701df195d43bc2d1c6531178fc8338aa8f38c80f098504d529c43bc2601"},
9595
"tailwind": {:hex, :tailwind, "0.2.4", "5706ec47182d4e7045901302bf3a333e80f3d1af65c442ba9a9eed152fb26c2e", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}], "hexpm", "c6e4a82b8727bab593700c998a4d98cf3d8025678bfde059aed71d0000c3e463"},
9696
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
9797
"telemetry_metrics": {:hex, :telemetry_metrics, "0.6.2", "2caabe9344ec17eafe5403304771c3539f3b6e2f7fb6a6f602558c825d0d0bfb", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9b43db0dc33863930b9ef9d27137e78974756f5f198cae18409970ed6fa5b561"},

test/realtime/tenants/connect_test.exs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -373,24 +373,30 @@ defmodule Realtime.Tenants.ConnectTest do
373373

374374
# This creates a loop of errors that occupies all WAL senders and lets us test the error handling
375375
pids =
376-
for i <- 0..10 do
376+
for i <- 0..4 do
377377
replication_slot_opts =
378378
%PostgresReplication{
379379
connection_opts: opts,
380380
table: :all,
381381
output_plugin: "pgoutput",
382-
output_plugin_options: [],
383-
handler_module: TestHandler,
382+
output_plugin_options: [proto_version: "1", publication_names: "test_#{i}_publication"],
383+
handler_module: Replication.TestHandler,
384384
publication_name: "test_#{i}_publication",
385385
replication_slot_name: "test_#{i}_slot"
386386
}
387387

388-
{:ok, pid} = PostgresReplication.start_link(replication_slot_opts)
389-
pid
388+
spawn(fn ->
389+
{:ok, pid} = PostgresReplication.start_link(replication_slot_opts)
390+
391+
receive do
392+
:stop -> Process.exit(pid, :kill)
393+
end
394+
end)
390395
end
391396

392397
on_exit(fn ->
393-
Enum.each(pids, &Process.exit(&1, :normal))
398+
Enum.each(pids, &send(&1, :stop))
399+
Process.sleep(2000)
394400
end)
395401

396402
log =

test/realtime/tenants/replication_connection_test.exs

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -284,30 +284,42 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
284284

285285
test "handle standby connections exceeds max_wal_senders", %{tenant: tenant} do
286286
opts = Database.from_tenant(tenant, "realtime_test", :stop) |> Database.opts()
287+
parent = self()
287288

288289
# This creates a loop of errors that occupies all WAL senders and lets us test the error handling
289290
pids =
290-
for i <- 0..4 do
291+
for i <- 0..5 do
291292
replication_slot_opts =
292293
%PostgresReplication{
293294
connection_opts: opts,
294295
table: :all,
295296
output_plugin: "pgoutput",
296-
output_plugin_options: [],
297-
handler_module: TestHandler,
297+
output_plugin_options: [proto_version: "1", publication_names: "test_#{i}_publication"],
298+
handler_module: Replication.TestHandler,
298299
publication_name: "test_#{i}_publication",
299300
replication_slot_name: "test_#{i}_slot"
300301
}
301302

302-
{:ok, pid} = PostgresReplication.start_link(replication_slot_opts)
303-
pid
303+
spawn(fn ->
304+
{:ok, pid} = PostgresReplication.start_link(replication_slot_opts)
305+
send(parent, :ready)
306+
307+
receive do
308+
:stop -> Process.exit(pid, :kill)
309+
end
310+
end)
304311
end
305312

306313
on_exit(fn ->
307-
Enum.each(pids, &Process.exit(&1, :kill))
314+
Enum.each(pids, &send(&1, :stop))
308315
Process.sleep(2000)
309316
end)
310317

318+
assert_receive :ready, 5000
319+
assert_receive :ready, 5000
320+
assert_receive :ready, 5000
321+
assert_receive :ready, 5000
322+
311323
assert {:error, :max_wal_senders_reached} = ReplicationConnection.start(tenant, self())
312324
end
313325
end
@@ -372,33 +384,6 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do
372384
end
373385
end
374386

375-
defmodule TestHandler do
376-
@behaviour PostgresReplication.Handler
377-
import PostgresReplication.Protocol
378-
alias PostgresReplication.Protocol.KeepAlive
379-
380-
@impl true
381-
def call(message, _metadata) when is_write(message) do
382-
:noreply
383-
end
384-
385-
def call(message, _metadata) when is_keep_alive(message) do
386-
reply =
387-
case parse(message) do
388-
%KeepAlive{reply: :now, wal_end: wal_end} ->
389-
wal_end = wal_end + 1
390-
standby(wal_end, wal_end, wal_end, :now)
391-
392-
_ ->
393-
hold()
394-
end
395-
396-
{:reply, reply}
397-
end
398-
399-
def call(_, _), do: :noreply
400-
end
401-
402387
defp subscribe(tenant_topic, topic) do
403388
fastlane =
404389
RealtimeWeb.RealtimeChannel.MessageDispatcher.fastlane_metadata(
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
defmodule Replication.TestHandler do
2+
@behaviour PostgresReplication.Handler
3+
import PostgresReplication.Protocol
4+
alias PostgresReplication.Protocol.KeepAlive
5+
6+
@impl true
7+
def call(message, _metadata) when is_write(message) do
8+
:noreply
9+
end
10+
11+
def call(message, _metadata) when is_keep_alive(message) do
12+
reply =
13+
case parse(message) do
14+
%KeepAlive{reply: :now, wal_end: wal_end} ->
15+
wal_end = wal_end + 1
16+
standby(wal_end, wal_end, wal_end, :now)
17+
18+
_ ->
19+
hold()
20+
end
21+
22+
{:reply, reply}
23+
end
24+
25+
def call(_, _), do: :noreply
26+
end

0 commit comments

Comments
 (0)