diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index ec1136885..53ef78a40 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -37,7 +37,7 @@ defmodule Realtime.Tenants.Connect do replication_connection_pid: pid(), replication_connection_reference: reference(), backoff: Backoff.t(), - replication_connection_attempts: non_neg_integer(), + replication_recovery_started_at: non_neg_integer() | nil, check_connected_user_interval: non_neg_integer(), connected_users_bucket: list(non_neg_integer()), check_connect_region_interval: non_neg_integer(), @@ -50,7 +50,7 @@ defmodule Realtime.Tenants.Connect do replication_connection_pid: nil, replication_connection_reference: nil, backoff: nil, - replication_connection_attempts: 0, + replication_recovery_started_at: nil, check_connected_user_interval: nil, connected_users_bucket: [1], check_connect_region_interval: nil, @@ -127,6 +127,9 @@ defmodule Realtime.Tenants.Connect do {pid, %{conn: nil}} -> wait_for_connection(pid, tenant_id) + {_, %{conn: conn, replication_conn: nil}} -> + {:ok, conn} + {_, %{conn: conn}} -> {:ok, conn} @@ -246,7 +249,7 @@ defmodule Realtime.Tenants.Connect do tenant_id: tenant_id, check_connected_user_interval: check_connected_user_interval, check_connect_region_interval: check_connect_region_interval, - backoff: Backoff.new(min: :timer.seconds(1), max: :timer.seconds(15), type: :rand_exp) + backoff: Backoff.new(backoff_min: :timer.seconds(1), backoff_max: :timer.seconds(15), backoff_type: :rand_exp) } opts = Keyword.put(opts, :name, {:via, :syn, name}) @@ -388,47 +391,58 @@ defmodule Realtime.Tenants.Connect do %{replication_connection_reference: replication_connection_reference, tenant_id: tenant_id} = state ) do %{backoff: backoff} = state - log_warning("ReplicationConnectionDown", "Replication connection has been terminated") + log_warning("ReplicationConnectionDown", "Replication connection has been terminated, recovery window opened") update_syn_replication_conn(tenant_id, nil) {timeout, backoff} = Backoff.backoff(backoff) Process.send_after(self(), :recover_replication_connection, timeout) - state = %{state | replication_connection_pid: nil, replication_connection_reference: nil, backoff: backoff} + + recovery_started_at = state.replication_recovery_started_at || System.monotonic_time(:millisecond) + + state = %{ + state + | replication_connection_pid: nil, + replication_connection_reference: nil, + backoff: backoff, + replication_recovery_started_at: recovery_started_at + } + {:noreply, state} end @replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'" - @max_replication_connection_attempts 60 - def handle_info( - :recover_replication_connection, - %{replication_connection_attempts: @max_replication_connection_attempts} = state - ) do - Logger.warning("Max replication connection attempts reached, terminating connection") - {:stop, :shutdown, state} + @max_replication_recovery_ms :timer.hours(2) + def handle_info(:recover_replication_connection, %{replication_recovery_started_at: nil} = state) do + {:noreply, state} end def handle_info(:recover_replication_connection, state) do - %{backoff: backoff, db_conn_pid: db_conn_pid, replication_connection_attempts: replication_connection_attempts} = - state + %{backoff: backoff, db_conn_pid: db_conn_pid, replication_recovery_started_at: started_at} = state + elapsed = System.monotonic_time(:millisecond) - started_at - with %{num_rows: 0} <- Postgrex.query!(db_conn_pid, @replication_connection_query, []), - {:ok, state} <- start_replication_connection(state) do - {:noreply, %{state | backoff: Backoff.reset(backoff), replication_connection_attempts: 0}} - else - {:error, error} -> - {timeout, backoff} = Backoff.backoff(backoff) - - log_error( - "ReplicationConnectionRecoveryFailed", - "Replication connection recovery failed, next retry in #{timeout}ms : #{inspect(error)}" - ) - - Process.send_after(self(), :recover_replication_connection, timeout) - {:noreply, %{state | backoff: backoff, replication_connection_attempts: replication_connection_attempts + 1}} + if elapsed > @max_replication_recovery_ms do + log_warning( + "ReplicationRecoveryWindowExceeded", + "Replication recovery window exceeded after #{elapsed}ms, terminating connection" + ) - _ -> - {timeout, backoff} = Backoff.backoff(backoff) - Process.send_after(self(), :recover_replication_connection, timeout) - {:noreply, %{state | backoff: backoff, replication_connection_attempts: replication_connection_attempts + 1}} + {:stop, :shutdown, state} + else + with {:query, {:ok, %{num_rows: 0}}} <- {:query, Postgrex.query(db_conn_pid, @replication_connection_query, [])}, + {:start, {:ok, state}} <- {:start, start_replication_connection(state)} do + {:noreply, %{state | backoff: Backoff.reset(backoff), replication_recovery_started_at: nil}} + else + {:query, {:ok, %{num_rows: _}}} -> + Logger.info("Waiting for old walsender to exit") + {:noreply, schedule_replication_retry(state)} + + {:query, {:error, error}} -> + log_error("ReplicationConnectionRecoveryFailed", "DB check failed during recovery: #{inspect(error)}") + {:noreply, schedule_replication_retry(state)} + + {:start, {:error, error}} -> + log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed: #{inspect(error)}") + {:noreply, schedule_replication_retry(state)} + end end end @@ -489,6 +503,12 @@ defmodule Realtime.Tenants.Connect do defp rebalance_check_interval_in_ms(), do: Application.fetch_env!(:realtime, :rebalance_check_interval_in_ms) + defp schedule_replication_retry(%{backoff: backoff} = state) do + {timeout, backoff} = Backoff.backoff(backoff) + Process.send_after(self(), :recover_replication_connection, timeout) + %{state | backoff: backoff} + end + defp update_syn_replication_conn(tenant_id, pid) do :syn.update_registry(__MODULE__, tenant_id, fn _pid, meta -> %{meta | replication_conn: pid} end) end diff --git a/lib/realtime/tenants/replication_connection.ex b/lib/realtime/tenants/replication_connection.ex index ad6dbdae5..d5c04b848 100644 --- a/lib/realtime/tenants/replication_connection.ex +++ b/lib/realtime/tenants/replication_connection.ex @@ -195,8 +195,9 @@ defmodule Realtime.Tenants.ReplicationConnection do end @impl true - def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :check_replication_slot}) do - {:disconnect, {:shutdown, "Temporary Replication slot already exists and in use"}} + def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :check_replication_slot} = _state) do + Logger.info("Replication slot already exists and in use, deferring connection") + {:disconnect, {:shutdown, :replication_slot_in_use}} end def handle_result([%Postgrex.Result{num_rows: 0}], %__MODULE__{step: :check_replication_slot} = state) do diff --git a/test/realtime/monitoring/latency_test.exs b/test/realtime/monitoring/latency_test.exs index f0763bc03..379e5f212 100644 --- a/test/realtime/monitoring/latency_test.exs +++ b/test/realtime/monitoring/latency_test.exs @@ -34,28 +34,28 @@ defmodule Realtime.LatencyTest do describe "ping/3" do setup do - Node.stop() + for node <- Node.list(), do: Node.disconnect(node) :ok end - @tag skip: "Clustered tests creating flakiness, requires time to analyse" - test "emulate a healthy remote node" do - assert [{%Task{}, {:ok, %{response: {:ok, {:pong, "not_set"}}}}}] = Latency.ping() + test "returns pong from healthy remote node" do + {:ok, _node} = Clustered.start() + results = Latency.ping() + assert Enum.all?(results, fn {%Task{}, result} -> match?({:ok, %{response: {:ok, {:pong, _}}}}, result) end) end - @tag skip: "Clustered tests creating flakiness, requires time to analyse" - test "emulate a slow but healthy remote node" do - assert [{%Task{}, {:ok, %{response: {:ok, {:pong, "not_set"}}}}}] = Latency.ping(5_000, 10_000, 30_000) + test "returns pong from slow but healthy remote node" do + {:ok, _node} = Clustered.start() + results = Latency.ping(100, 10_000, 30_000) + assert Enum.all?(results, fn {%Task{}, result} -> match?({:ok, %{response: {:ok, {:pong, _}}}}, result) end) end - @tag skip: "Clustered tests creating flakiness, requires time to analyse" - test "emulate an unhealthy remote node" do - assert [{%Task{}, {:ok, %{response: {:badrpc, :timeout}}}}] = Latency.ping(5_000, 1_000) + test "returns error when remote node exceeds timer timeout" do + assert [{%Task{}, {:ok, %{response: {:error, :rpc_error, _}}}}] = Latency.ping(500, 100) end - @tag skip: "Clustered tests creating flakiness, requires time to analyse" - test "no response from our Task for a remote node at all" do - assert [{%Task{}, nil}] = Latency.ping(10_000, 5_000, 2_000) + test "returns nil when task does not yield before yield timeout" do + assert [{%Task{}, nil}] = Latency.ping(1_000, 500, 100) end end end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 66c04634d..8fd9bff1c 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -689,6 +689,117 @@ defmodule Realtime.Tenants.ConnectTest do end end + describe "backoff configuration" do + test "backoff is configured with correct min/max/type values", %{tenant: tenant} do + assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + pid = Connect.whereis(tenant.external_id) + state = :sys.get_state(pid) + assert state.backoff.min == :timer.seconds(1) + assert state.backoff.max == :timer.seconds(15) + assert state.backoff.type == :rand_exp + end + end + + describe "replication recovery" do + test "recovery reschedules without stopping when pg_stat_activity shows existing walsender", %{tenant: tenant} do + assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert Connect.ready?(tenant.external_id) + + pid = Connect.whereis(tenant.external_id) + + # The real replication connection is active, so pg_stat_activity returns num_rows: 1 naturally + send(pid, :recover_replication_connection) + Process.sleep(100) + + assert Process.alive?(pid) + end + + test "recovery stops when elapsed time exceeds 2-hour window", %{tenant: tenant} do + assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert Connect.ready?(tenant.external_id) + + pid = Connect.whereis(tenant.external_id) + ref = Process.monitor(pid) + + past_ts = System.monotonic_time(:millisecond) - :timer.hours(3) + :sys.replace_state(pid, fn state -> %{state | replication_recovery_started_at: past_ts} end) + + log = + capture_log(fn -> + send(pid, :recover_replication_connection) + assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, 1000 + end) + + assert log =~ "Replication recovery window exceeded" + end + + test "recovery preserves replication_recovery_started_at across multiple crashes", %{tenant: tenant} do + assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert Connect.ready?(tenant.external_id) + + pid = Connect.whereis(tenant.external_id) + original_ts = System.monotonic_time(:millisecond) - 1000 + + ref = make_ref() + + :sys.replace_state(pid, fn state -> + %{ + state + | replication_connection_reference: ref, + replication_connection_pid: self(), + replication_recovery_started_at: original_ts + } + end) + + send(pid, {:DOWN, ref, :process, self(), :simulated_crash}) + Process.sleep(100) + + state = :sys.get_state(pid) + assert state.replication_recovery_started_at == original_ts + + Connect.shutdown(tenant.external_id) + end + + test "recovery resets replication_recovery_started_at on successful reconnection", %{tenant: tenant} do + assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert Connect.ready?(tenant.external_id) + + pid = Connect.whereis(tenant.external_id) + + replication_pid = ReplicationConnection.whereis(tenant.external_id) + Process.monitor(replication_pid) + Process.exit(replication_pid, :kill) + assert_receive {:DOWN, _, :process, ^replication_pid, _}, 1000 + + Process.sleep(100) + assert {:error, :not_connected} = Connect.replication_status(tenant.external_id) + + assert {:ok, _} = assert_replication_status(tenant.external_id) + + state = :sys.get_state(pid) + assert state.replication_recovery_started_at == nil + assert Process.alive?(pid) + + Connect.shutdown(tenant.external_id) + end + end + + describe "get_status/1 degraded state" do + test "returns {:ok, conn} when replication_conn is nil in syn", %{tenant: tenant} do + assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id) + assert Connect.ready?(tenant.external_id) + + tenant_id = tenant.external_id + + :syn.update_registry(Connect, tenant_id, fn _pid, meta -> %{meta | replication_conn: nil} end) + + assert {:ok, conn} = Connect.get_status(tenant_id) + assert is_pid(conn) + + Connect.shutdown(tenant_id) + end + end + describe "registers into local registry" do test "successfully registers a process", %{tenant: %{external_id: external_id}} do assert {:ok, _db_conn} = Connect.lookup_or_start_connection(external_id) diff --git a/test/realtime/tenants/replication_connection_test.exs b/test/realtime/tenants/replication_connection_test.exs index 4b3276265..ee92b1768 100644 --- a/test/realtime/tenants/replication_connection_test.exs +++ b/test/realtime/tenants/replication_connection_test.exs @@ -505,7 +505,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do Postgrex.query!(db_conn, "SELECT pg_create_logical_replication_slot($1, 'test_decoding')", [name]) - assert {:error, {:shutdown, "Temporary Replication slot already exists and in use"}} = + assert {:error, {:shutdown, :replication_slot_in_use}} = ReplicationConnection.start(tenant, self()) Postgrex.query!(db_conn, "SELECT pg_drop_replication_slot($1)", [name])