Skip to content

Commit 22b3a62

Browse files
authored
fix: wait for Connect.lookup_or_start_connection to be ready (#1466)
This helps with cold starts as multiple WebSocket connections might happen at the same time
1 parent 1efdf78 commit 22b3a62

File tree

7 files changed

+112
-20
lines changed

7 files changed

+112
-20
lines changed

lib/realtime/syn_handler.ex

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ defmodule Realtime.SynHandler do
44
"""
55
require Logger
66
alias RealtimeWeb.Endpoint
7+
alias Realtime.Tenants.Connect
8+
9+
@behaviour :syn_event_handler
10+
11+
@impl true
12+
def on_registry_process_updated(Connect, tenant_id, _pid, %{conn: conn}, :normal) when is_pid(conn) do
13+
# Update that a database connection is ready
14+
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{conn: conn})
15+
end
16+
17+
def on_registry_process_updated(_scope, _name, _pid, _meta, _reason), do: :ok
718

819
@doc """
920
When processes registered with :syn are unregistered, either manually or by stopping, this
@@ -14,6 +25,7 @@ defmodule Realtime.SynHandler do
1425
We want to log conflict resolutions to know when more than one process on the cluster
1526
was started, and subsequently stopped because :syn handled the conflict.
1627
"""
28+
@impl true
1729
def on_process_unregistered(mod, name, _pid, _meta, reason) do
1830
case reason do
1931
:syn_conflict_resolution ->
@@ -27,6 +39,7 @@ defmodule Realtime.SynHandler do
2739
:ok
2840
end
2941

42+
@impl true
3043
def resolve_registry_conflict(mod, name, {pid1, %{region: region}, time1}, {pid2, _, time2}) do
3144
platform_region = Realtime.Nodes.platform_region_translator(region)
3245

lib/realtime/tenants/connect.ex

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ defmodule Realtime.Tenants.Connect do
6666
call_external_node(tenant_id, opts)
6767

6868
{:error, :tenant_database_connection_initializing} ->
69-
Process.sleep(100)
7069
call_external_node(tenant_id, opts)
7170

7271
{:error, :initializing} ->
@@ -84,8 +83,8 @@ defmodule Realtime.Tenants.Connect do
8483
| {:error, :tenant_database_connection_initializing}
8584
def get_status(tenant_id) do
8685
case :syn.lookup(__MODULE__, tenant_id) do
87-
{_, %{conn: nil}} ->
88-
{:error, :initializing}
86+
{_pid, %{conn: nil}} ->
87+
wait_for_connection(tenant_id)
8988

9089
{_, %{conn: conn}} ->
9190
{:ok, conn}
@@ -100,6 +99,28 @@ defmodule Realtime.Tenants.Connect do
10099
end
101100
end
102101

102+
def syn_topic(tenant_id), do: "connect:#{tenant_id}"
103+
104+
defp wait_for_connection(tenant_id) do
105+
RealtimeWeb.Endpoint.subscribe(syn_topic(tenant_id))
106+
107+
# We do a lookup after subscribing because we could've missed a message while subscribing
108+
case :syn.lookup(__MODULE__, tenant_id) do
109+
{_pid, %{conn: conn}} when is_pid(conn) ->
110+
{:ok, conn}
111+
112+
_ ->
113+
# Wait for up to 5 seconds for the ready event
114+
receive do
115+
%{event: "ready", payload: %{conn: conn}} -> {:ok, conn}
116+
after
117+
5_000 -> {:error, :initializing}
118+
end
119+
end
120+
after
121+
RealtimeWeb.Endpoint.unsubscribe(syn_topic(tenant_id))
122+
end
123+
103124
@doc """
104125
Connects to a tenant's database and stores the DBConnection in the process :syn metadata
105126
"""

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
44
def project do
55
[
66
app: :realtime,
7-
version: "2.41.5",
7+
version: "2.41.6",
88
elixir: "~> 1.17.3",
99
elixirc_paths: elixirc_paths(Mix.env()),
1010
start_permanent: Mix.env() == :prod,

test/integration/rt_channel_test.exs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,8 @@ defmodule Realtime.Integration.RtChannelTest do
660660
:syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: nil} end)
661661
payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
662662
WebsocketClient.send_event(service_role_socket, topic, "broadcast", payload)
663-
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 500
663+
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
664+
refute_receive %Message{event: "broadcast", payload: ^payload, topic: ^topic}, 6000
664665
end)
665666

666667
assert log =~ "UnableToHandleBroadcast"
@@ -837,7 +838,8 @@ defmodule Realtime.Integration.RtChannelTest do
837838
WebsocketClient.send_event(socket, topic, "presence", payload)
838839

839840
refute_receive %Message{event: "presence_diff"}, 500
840-
refute_receive %Message{event: "phx_leave", topic: ^topic}
841+
# Waiting more than 5 seconds as this is the amount of time we will wait for the Connection to be ready
842+
refute_receive %Message{event: "phx_leave", topic: ^topic}, 6000
841843
end)
842844

843845
assert log =~ "UnableToHandlePresence"
@@ -2251,11 +2253,10 @@ defmodule Realtime.Integration.RtChannelTest do
22512253
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
22522254
clean_table(db_conn, "realtime", "messages")
22532255
topic = Map.get(context, :topic, random_string())
2254-
message = message_fixture(tenant, %{topic: topic})
22552256

2256-
if policies = context[:policies], do: create_rls_policies(db_conn, policies, message)
2257+
if policies = context[:policies], do: create_rls_policies(db_conn, policies, %{topic: topic})
22572258

2258-
%{topic: message.topic}
2259+
%{topic: topic}
22592260
end
22602261

22612262
defp setup_trigger(%{tenant: tenant, topic: topic}) do

test/realtime/gen_rpc_test.exs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,15 @@ defmodule Realtime.GenRpcTest do
99
setup context do
1010
{:ok, node} = Clustered.start(nil, extra_config: context[:extra_config] || [])
1111

12-
:telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self())
13-
on_exit(fn -> :telemetry.detach(__MODULE__) end)
1412
%{node: node}
1513
end
1614

1715
describe "call/5" do
16+
setup do
17+
:telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self())
18+
on_exit(fn -> :telemetry.detach(__MODULE__) end)
19+
end
20+
1821
test "returns the result calling local node" do
1922
current_node = node()
2023

@@ -172,6 +175,11 @@ defmodule Realtime.GenRpcTest do
172175
end
173176

174177
describe "multicall/4" do
178+
setup do
179+
:telemetry.attach(__MODULE__, [:realtime, :rpc], &__MODULE__.handle_telemetry/4, pid: self())
180+
on_exit(fn -> :telemetry.detach(__MODULE__) end)
181+
end
182+
175183
test "returns the result of the function call per node", %{node: node} do
176184
current_node = node()
177185

test/realtime/tenants/connect_test.exs

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,62 @@ defmodule Realtime.Tenants.ConnectTest do
3535
refute_receive {:DOWN, ^ref, :process, ^pid, _reason}, timeout
3636
end
3737

38+
describe "handle cold start" do
39+
test "multiple proccesses succeed together", %{tenant: tenant} do
40+
parent = self()
41+
42+
# Let's slow down Connect starting
43+
expect(Database, :check_tenant_connection, fn t ->
44+
:timer.sleep(1000)
45+
call_original(Database, :check_tenant_connection, [t])
46+
end)
47+
48+
connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end
49+
50+
# Start an early connect
51+
spawn(connect)
52+
:timer.sleep(100)
53+
54+
# Start others
55+
spawn(connect)
56+
spawn(connect)
57+
58+
# This one should block and wait for the first Connect
59+
{:ok, pid} = Connect.lookup_or_start_connection(tenant.external_id)
60+
61+
assert_receive {:ok, ^pid}
62+
assert_receive {:ok, ^pid}
63+
assert_receive {:ok, ^pid}
64+
end
65+
66+
test "more than 5 seconds passed error out", %{tenant: tenant} do
67+
parent = self()
68+
69+
# Let's slow down Connect starting
70+
expect(Database, :check_tenant_connection, fn t ->
71+
:timer.sleep(5500)
72+
call_original(Database, :check_tenant_connection, [t])
73+
end)
74+
75+
connect = fn -> send(parent, Connect.lookup_or_start_connection(tenant.external_id)) end
76+
77+
# Start an early connect
78+
spawn(connect)
79+
:timer.sleep(100)
80+
81+
# Start others
82+
spawn(connect)
83+
spawn(connect)
84+
85+
{:error, :tenant_database_unavailable} = Connect.lookup_or_start_connection(tenant.external_id)
86+
87+
# Only one will succeed the others timed out waiting
88+
assert_receive {:error, :tenant_database_unavailable}
89+
assert_receive {:error, :tenant_database_unavailable}
90+
assert_receive {:ok, _pid}, 7000
91+
end
92+
end
93+
3894
describe "region rebalancing" do
3995
test "rebalancing needed process stops", %{tenant: tenant} do
4096
external_id = tenant.external_id
@@ -346,14 +402,6 @@ defmodule Realtime.Tenants.ConnectTest do
346402
assert log =~ "ReplicationMaxWalSendersReached"
347403
end
348404

349-
test "syn with no connection", %{tenant: tenant} do
350-
external_id = tenant.external_id
351-
expect(:syn, :lookup, 2, fn Connect, ^external_id -> {nil, %{conn: nil}} end)
352-
353-
assert {:error, :tenant_database_unavailable} = Connect.lookup_or_start_connection(external_id)
354-
assert {:error, :initializing} = Connect.get_status(external_id)
355-
end
356-
357405
test "handle rpc errors gracefully" do
358406
expect(Realtime.Nodes, :get_node_for_tenant, fn _ -> {:ok, :potato@nohost, "us-east-1"} end)
359407

test/test_helper.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,11 @@ Mimic.copy(Realtime.RateCounter)
5656
Mimic.copy(Realtime.Tenants.Authorization)
5757
Mimic.copy(Realtime.Tenants.Cache)
5858
Mimic.copy(Realtime.Tenants.Connect)
59+
Mimic.copy(Realtime.Database)
5960
Mimic.copy(Realtime.Tenants.Migrations)
6061
Mimic.copy(Realtime.Tenants.Rebalancer)
62+
Mimic.copy(Realtime.Tenants.ReplicationConnection)
6163
Mimic.copy(RealtimeWeb.ChannelsAuthorization)
6264
Mimic.copy(RealtimeWeb.Endpoint)
6365
Mimic.copy(RealtimeWeb.JwtVerification)
64-
Mimic.copy(Realtime.Tenants.ReplicationConnection)
6566
Mimic.copy(RealtimeWeb.TenantBroadcaster)

0 commit comments

Comments
 (0)