Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 52 additions & 32 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ defmodule Realtime.Tenants.Connect do
replication_connection_pid: pid(),
replication_connection_reference: reference(),
backoff: Backoff.t(),
replication_connection_attempts: non_neg_integer(),
replication_recovery_started_at: non_neg_integer() | nil,
check_connected_user_interval: non_neg_integer(),
connected_users_bucket: list(non_neg_integer()),
check_connect_region_interval: non_neg_integer(),
Expand All @@ -50,7 +50,7 @@ defmodule Realtime.Tenants.Connect do
replication_connection_pid: nil,
replication_connection_reference: nil,
backoff: nil,
replication_connection_attempts: 0,
replication_recovery_started_at: nil,
check_connected_user_interval: nil,
connected_users_bucket: [1],
check_connect_region_interval: nil,
Expand Down Expand Up @@ -127,6 +127,9 @@ defmodule Realtime.Tenants.Connect do
{pid, %{conn: nil}} ->
wait_for_connection(pid, tenant_id)

{_, %{conn: conn, replication_conn: nil}} ->
{:ok, conn}

{_, %{conn: conn}} ->
{:ok, conn}

Expand Down Expand Up @@ -246,7 +249,7 @@ defmodule Realtime.Tenants.Connect do
tenant_id: tenant_id,
check_connected_user_interval: check_connected_user_interval,
check_connect_region_interval: check_connect_region_interval,
backoff: Backoff.new(min: :timer.seconds(1), max: :timer.seconds(15), type: :rand_exp)
backoff: Backoff.new(backoff_min: :timer.seconds(1), backoff_max: :timer.seconds(15), backoff_type: :rand_exp)
}

opts = Keyword.put(opts, :name, {:via, :syn, name})
Expand Down Expand Up @@ -388,47 +391,58 @@ defmodule Realtime.Tenants.Connect do
%{replication_connection_reference: replication_connection_reference, tenant_id: tenant_id} = state
) do
%{backoff: backoff} = state
log_warning("ReplicationConnectionDown", "Replication connection has been terminated")
log_warning("ReplicationConnectionDown", "Replication connection has been terminated, recovery window opened")
update_syn_replication_conn(tenant_id, nil)
{timeout, backoff} = Backoff.backoff(backoff)
Process.send_after(self(), :recover_replication_connection, timeout)
state = %{state | replication_connection_pid: nil, replication_connection_reference: nil, backoff: backoff}

recovery_started_at = state.replication_recovery_started_at || System.monotonic_time(:millisecond)

state = %{
state
| replication_connection_pid: nil,
replication_connection_reference: nil,
backoff: backoff,
replication_recovery_started_at: recovery_started_at
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

{:noreply, state}
end

@replication_connection_query "SELECT 1 from pg_stat_activity where application_name='realtime_replication_connection'"
@max_replication_connection_attempts 60
def handle_info(
:recover_replication_connection,
%{replication_connection_attempts: @max_replication_connection_attempts} = state
) do
Logger.warning("Max replication connection attempts reached, terminating connection")
{:stop, :shutdown, state}
@max_replication_recovery_ms :timer.hours(2)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not 24h, if there's exponential backoff?

def handle_info(:recover_replication_connection, %{replication_recovery_started_at: nil} = state) do
{:noreply, state}
end

def handle_info(:recover_replication_connection, state) do
%{backoff: backoff, db_conn_pid: db_conn_pid, replication_connection_attempts: replication_connection_attempts} =
state
%{backoff: backoff, db_conn_pid: db_conn_pid, replication_recovery_started_at: started_at} = state
elapsed = System.monotonic_time(:millisecond) - started_at

with %{num_rows: 0} <- Postgrex.query!(db_conn_pid, @replication_connection_query, []),
{:ok, state} <- start_replication_connection(state) do
{:noreply, %{state | backoff: Backoff.reset(backoff), replication_connection_attempts: 0}}
else
{:error, error} ->
{timeout, backoff} = Backoff.backoff(backoff)

log_error(
"ReplicationConnectionRecoveryFailed",
"Replication connection recovery failed, next retry in #{timeout}ms : #{inspect(error)}"
)

Process.send_after(self(), :recover_replication_connection, timeout)
{:noreply, %{state | backoff: backoff, replication_connection_attempts: replication_connection_attempts + 1}}
if elapsed > @max_replication_recovery_ms do
log_warning(
"ReplicationRecoveryWindowExceeded",
"Replication recovery window exceeded after #{elapsed}ms, terminating connection"
)

_ ->
{timeout, backoff} = Backoff.backoff(backoff)
Process.send_after(self(), :recover_replication_connection, timeout)
{:noreply, %{state | backoff: backoff, replication_connection_attempts: replication_connection_attempts + 1}}
{:stop, :shutdown, state}
else
with {:query, {:ok, %{num_rows: 0}}} <- {:query, Postgrex.query(db_conn_pid, @replication_connection_query, [])},
{:start, {:ok, state}} <- {:start, start_replication_connection(state)} do
{:noreply, %{state | backoff: Backoff.reset(backoff), replication_recovery_started_at: nil}}
else
{:query, {:ok, %{num_rows: _}}} ->
Logger.info("Waiting for old walsender to exit")
{:noreply, schedule_replication_retry(state)}
Comment thread
filipecabaco marked this conversation as resolved.

{:query, {:error, error}} ->
log_error("ReplicationConnectionRecoveryFailed", "DB check failed during recovery: #{inspect(error)}")
{:noreply, schedule_replication_retry(state)}

{:start, {:error, error}} ->
log_error("ReplicationConnectionRecoveryFailed", "Replication connection recovery failed: #{inspect(error)}")
{:noreply, schedule_replication_retry(state)}
end
end
end

Expand Down Expand Up @@ -489,6 +503,12 @@ defmodule Realtime.Tenants.Connect do

defp rebalance_check_interval_in_ms(), do: Application.fetch_env!(:realtime, :rebalance_check_interval_in_ms)

defp schedule_replication_retry(%{backoff: backoff} = state) do
{timeout, backoff} = Backoff.backoff(backoff)
Process.send_after(self(), :recover_replication_connection, timeout)
%{state | backoff: backoff}
end

defp update_syn_replication_conn(tenant_id, pid) do
:syn.update_registry(__MODULE__, tenant_id, fn _pid, meta -> %{meta | replication_conn: pid} end)
end
Expand Down
5 changes: 3 additions & 2 deletions lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ defmodule Realtime.Tenants.ReplicationConnection do
end

@impl true
def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :check_replication_slot}) do
{:disconnect, {:shutdown, "Temporary Replication slot already exists and in use"}}
def handle_result([%Postgrex.Result{num_rows: 1}], %__MODULE__{step: :check_replication_slot} = _state) do
Logger.info("Replication slot already exists and in use, deferring connection")
{:disconnect, {:shutdown, :replication_slot_in_use}}
end

def handle_result([%Postgrex.Result{num_rows: 0}], %__MODULE__{step: :check_replication_slot} = state) do
Expand Down
26 changes: 13 additions & 13 deletions test/realtime/monitoring/latency_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ defmodule Realtime.LatencyTest do

describe "ping/3" do
setup do
Node.stop()
for node <- Node.list(), do: Node.disconnect(node)
:ok
end

@tag skip: "Clustered tests creating flakiness, requires time to analyse"
test "emulate a healthy remote node" do
assert [{%Task{}, {:ok, %{response: {:ok, {:pong, "not_set"}}}}}] = Latency.ping()
test "returns pong from healthy remote node" do
{:ok, _node} = Clustered.start()
results = Latency.ping()
assert Enum.all?(results, fn {%Task{}, result} -> match?({:ok, %{response: {:ok, {:pong, _}}}}, result) end)
end

@tag skip: "Clustered tests creating flakiness, requires time to analyse"
test "emulate a slow but healthy remote node" do
assert [{%Task{}, {:ok, %{response: {:ok, {:pong, "not_set"}}}}}] = Latency.ping(5_000, 10_000, 30_000)
test "returns pong from slow but healthy remote node" do
{:ok, _node} = Clustered.start()
results = Latency.ping(100, 10_000, 30_000)
assert Enum.all?(results, fn {%Task{}, result} -> match?({:ok, %{response: {:ok, {:pong, _}}}}, result) end)
end

@tag skip: "Clustered tests creating flakiness, requires time to analyse"
test "emulate an unhealthy remote node" do
assert [{%Task{}, {:ok, %{response: {:badrpc, :timeout}}}}] = Latency.ping(5_000, 1_000)
test "returns error when remote node exceeds timer timeout" do
assert [{%Task{}, {:ok, %{response: {:error, :rpc_error, _}}}}] = Latency.ping(500, 100)
end

@tag skip: "Clustered tests creating flakiness, requires time to analyse"
test "no response from our Task for a remote node at all" do
assert [{%Task{}, nil}] = Latency.ping(10_000, 5_000, 2_000)
test "returns nil when task does not yield before yield timeout" do
assert [{%Task{}, nil}] = Latency.ping(1_000, 500, 100)
end
end
end
111 changes: 111 additions & 0 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,117 @@ defmodule Realtime.Tenants.ConnectTest do
end
end

describe "backoff configuration" do
test "backoff is configured with correct min/max/type values", %{tenant: tenant} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
pid = Connect.whereis(tenant.external_id)
state = :sys.get_state(pid)
assert state.backoff.min == :timer.seconds(1)
assert state.backoff.max == :timer.seconds(15)
assert state.backoff.type == :rand_exp
end
end

describe "replication recovery" do
test "recovery reschedules without stopping when pg_stat_activity shows existing walsender", %{tenant: tenant} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert Connect.ready?(tenant.external_id)

pid = Connect.whereis(tenant.external_id)

# The real replication connection is active, so pg_stat_activity returns num_rows: 1 naturally
send(pid, :recover_replication_connection)
Process.sleep(100)

assert Process.alive?(pid)
end

test "recovery stops when elapsed time exceeds 2-hour window", %{tenant: tenant} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert Connect.ready?(tenant.external_id)

pid = Connect.whereis(tenant.external_id)
ref = Process.monitor(pid)

past_ts = System.monotonic_time(:millisecond) - :timer.hours(3)
:sys.replace_state(pid, fn state -> %{state | replication_recovery_started_at: past_ts} end)

log =
capture_log(fn ->
send(pid, :recover_replication_connection)
assert_receive {:DOWN, ^ref, :process, ^pid, _reason}, 1000
end)

assert log =~ "Replication recovery window exceeded"
end

test "recovery preserves replication_recovery_started_at across multiple crashes", %{tenant: tenant} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert Connect.ready?(tenant.external_id)

pid = Connect.whereis(tenant.external_id)
original_ts = System.monotonic_time(:millisecond) - 1000

ref = make_ref()

:sys.replace_state(pid, fn state ->
%{
state
| replication_connection_reference: ref,
replication_connection_pid: self(),
replication_recovery_started_at: original_ts
}
end)

send(pid, {:DOWN, ref, :process, self(), :simulated_crash})
Process.sleep(100)

state = :sys.get_state(pid)
assert state.replication_recovery_started_at == original_ts

Connect.shutdown(tenant.external_id)
end

test "recovery resets replication_recovery_started_at on successful reconnection", %{tenant: tenant} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert Connect.ready?(tenant.external_id)

pid = Connect.whereis(tenant.external_id)

replication_pid = ReplicationConnection.whereis(tenant.external_id)
Process.monitor(replication_pid)
Process.exit(replication_pid, :kill)
assert_receive {:DOWN, _, :process, ^replication_pid, _}, 1000

Process.sleep(100)
assert {:error, :not_connected} = Connect.replication_status(tenant.external_id)

assert {:ok, _} = assert_replication_status(tenant.external_id)

state = :sys.get_state(pid)
assert state.replication_recovery_started_at == nil
assert Process.alive?(pid)

Connect.shutdown(tenant.external_id)
end
end

describe "get_status/1 degraded state" do
test "returns {:ok, conn} when replication_conn is nil in syn", %{tenant: tenant} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
assert Connect.ready?(tenant.external_id)

tenant_id = tenant.external_id

:syn.update_registry(Connect, tenant_id, fn _pid, meta -> %{meta | replication_conn: nil} end)

assert {:ok, conn} = Connect.get_status(tenant_id)
assert is_pid(conn)

Connect.shutdown(tenant_id)
end
end

describe "registers into local registry" do
test "successfully registers a process", %{tenant: %{external_id: external_id}} do
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(external_id)
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/replication_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do

Postgrex.query!(db_conn, "SELECT pg_create_logical_replication_slot($1, 'test_decoding')", [name])

assert {:error, {:shutdown, "Temporary Replication slot already exists and in use"}} =
assert {:error, {:shutdown, :replication_slot_in_use}} =
ReplicationConnection.start(tenant, self())

Postgrex.query!(db_conn, "SELECT pg_drop_replication_slot($1)", [name])
Expand Down