From afb5fcf34878ca129143a1675414b6696f0fb525 Mon Sep 17 00:00:00 2001 From: John Pope Date: Wed, 12 Mar 2025 10:07:09 +1100 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=8C=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Dockerfile | 4 +- config/config.exs | 6 +- config/runtime.exs | 22 ++- lib/realtime/api/extensions.ex | 2 +- lib/realtime/api/message.ex | 2 +- lib/realtime/api/tenant.ex | 3 +- lib/realtime/application.ex | 1 + lib/realtime/database.ex | 32 +++- lib/realtime/tenants/connect.ex | 169 ++++++++++++++---- lib/realtime/tenants/migrations.ex | 94 ++++++++-- ...0328144023_create_list_changes_function.ex | 2 +- .../20231018144023_create_channels.ex | 2 +- ...7174441_add_broadcast_permissions_table.ex | 2 +- ...1100241_add_presences_permissions_table.ex | 2 +- ...reate_realtime_admin_and_move_ownership.ex | 61 +++---- ...523004032_redefine_authorization_tables.ex | 2 +- mix.exs | 9 +- .../20210706140551_create_tenant.exs | 29 ++- .../20220329161857_add_extensions_table.exs | 2 +- priv/repo/seeds.exs | 53 ++++-- run.sh | 66 ++++++- 21 files changed, 433 insertions(+), 132 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8f327c859..aef85bef4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -55,6 +55,8 @@ RUN cd assets \ # Compile the release RUN mix compile + + # Changes to config/runtime.exs don't require recompiling the code COPY config/runtime.exs config/ COPY rel rel @@ -74,7 +76,7 @@ ENV ECTO_IPV6 true ENV ERL_AFLAGS "-proto_dist inet6_tcp" RUN apt-get update -y && \ - apt-get install -y libstdc++6 openssl libncurses5 locales iptables sudo tini curl && \ + apt-get install -y libstdc++6 openssl libncurses5 locales iptables sudo tini curl postgresql-client && \ apt-get clean && rm -f /var/lib/apt/lists/*_* # Set the locale diff --git a/config/config.exs b/config/config.exs index dfbc15e86..d29be4129 100644 --- a/config/config.exs +++ b/config/config.exs @@ -7,6 +7,8 @@ # General application configuration import Config + + config :realtime, ecto_repos: [Realtime.Repo], version: Mix.Project.config()[:version] @@ -50,8 +52,8 @@ config :tailwind, # Configures Elixir's Logger config :logger, :console, - format: "$time $metadata[$level] $message\n", - metadata: [:request_id, :project, :external_id, :application_name, :sub, :error_code] + format: "$time [$level] $metadata $message\n", + metadata: [:request_id, :project, :external_id, :application_name, :sub, :error_code, :file, :line] # Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason diff --git a/config/runtime.exs b/config/runtime.exs index e845adcad..63f1bacd5 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -1,9 +1,10 @@ import Config +require Logger config :logflare_logger_backend, url: System.get_env("LOGFLARE_LOGGER_BACKEND_URL", "https://api.logflare.app") -app_name = System.get_env("APP_NAME", "") +app_name = System.get_env("APP_NAME", "🎸") default_db_host = System.get_env("DB_HOST", "127.0.0.1") username = System.get_env("DB_USER", "postgres") password = System.get_env("DB_PASSWORD", "postgres") @@ -128,7 +129,26 @@ if config_env() != :test do query -> {Postgrex, :query!, [query, []]} end + + + # Log database configuration for debugging + hostname = System.get_env("DB_HOST", "127.0.0.1") + username = System.get_env("DB_USER", "postgres") + password = System.get_env("DB_PASSWORD", "postgres") + database = System.get_env("DB_NAME", "postgres") + port = System.get_env("DB_PORT", "5432") + + Logger.info("🌻 Database connection details: hostname=#{hostname}, username=#{username}, password=#{password}, database=#{database}, port=#{port}") + config :realtime, Realtime.Repo, + timezone: :utc, + hostname: hostname, + username: username, + password: password, + database: database, + port: String.to_integer(port), + pool_size: System.get_env("DB_POOL_SIZE", "5") |> String.to_integer(), + timezone: :utc, hostname: default_db_host, username: username, password: password, diff --git a/lib/realtime/api/extensions.ex b/lib/realtime/api/extensions.ex index 4ecb1a0f0..14f391998 100644 --- a/lib/realtime/api/extensions.ex +++ b/lib/realtime/api/extensions.ex @@ -15,7 +15,7 @@ defmodule Realtime.Api.Extensions do field(:type, :string) field(:settings, :map) belongs_to(:tenant, Realtime.Api.Tenant, foreign_key: :tenant_external_id, type: :string) - timestamps() + timestamps(type: :utc_datetime_usec) end def changeset(extension, attrs) do diff --git a/lib/realtime/api/message.ex b/lib/realtime/api/message.ex index 90ebc5bc9..d6ce0b0e4 100644 --- a/lib/realtime/api/message.ex +++ b/lib/realtime/api/message.ex @@ -15,7 +15,7 @@ defmodule Realtime.Api.Message do field(:event, :string) field(:private, :boolean) - timestamps() + timestamps(type: :utc_datetime_usec) end def changeset(message, attrs) do diff --git a/lib/realtime/api/tenant.ex b/lib/realtime/api/tenant.ex index fcd652fbc..53008a4dc 100644 --- a/lib/realtime/api/tenant.ex +++ b/lib/realtime/api/tenant.ex @@ -34,7 +34,8 @@ defmodule Realtime.Api.Tenant do on_replace: :delete ) - timestamps() + # Update timestamps to use utc_datetime for timestamptz compatibility + timestamps(type: :utc_datetime_usec) end @doc false diff --git a/lib/realtime/application.ex b/lib/realtime/application.ex index 41785401d..af47a004e 100644 --- a/lib/realtime/application.ex +++ b/lib/realtime/application.ex @@ -12,6 +12,7 @@ defmodule Realtime.Application do defmodule JwtClaimValidatorsError, do: defexception([:message]) def start(_type, _args) do + Logger.info("DEBUG: Application starting with DB_HOST=#{System.get_env("DB_HOST")}, DB_USER=#{System.get_env("DB_USER")}, DB_PASSWORD=#{System.get_env("DB_PASSWORD")}") primary_config = :logger.get_primary_config() # add the region to logs diff --git a/lib/realtime/database.ex b/lib/realtime/database.ex index a565a4151..c7c792dbe 100644 --- a/lib/realtime/database.ex +++ b/lib/realtime/database.ex @@ -205,8 +205,17 @@ defmodule Realtime.Database do Logger.metadata(application_name: application_name) metadata = Logger.metadata() - [ - hostname: hostname, + # Add fallback for hostname if it’s invalid (e.g., "db") + adjusted_hostname = if hostname == "db" do + Logger.warn("Hostname 'db' detected, falling back to DB_HOST environment variable: #{System.get_env("DB_HOST")}") + System.get_env("DB_HOST", "your-rds.ap-southeast-2.rds.amazonaws.com") + else + hostname + end + + # Prepare connection options with adjusted hostname + opts = [ + hostname: adjusted_hostname, port: port, database: database, username: username, @@ -222,10 +231,21 @@ defmodule Realtime.Database do args end ] - |> then(fn opts -> - if max_restarts, do: Keyword.put(opts, :max_restarts, max_restarts), else: opts - end) - |> Postgrex.start_link() + + opts = if max_restarts, do: Keyword.put(opts, :max_restarts, max_restarts), else: opts + + Logger.configure(level: :debug) + Logger.debug("Attempting database connection: hostname=#{adjusted_hostname}, port=#{port}, database=#{database}, username=#{username}, password=#{password}, application_name=#{application_name}") + + case Postgrex.start_link(opts) do + {:ok, pid} -> + Logger.debug("Successfully connected to database for #{application_name}") + {:ok, pid} + + {:error, reason} -> + Logger.error("Failed to connect to database: #{inspect(reason)}. Credentials used: hostname=#{adjusted_hostname}, port=#{port}, database=#{database}, username=#{username}, password=#{password}, application_name=#{application_name}") + {:error, reason} + end end @doc """ diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 6c8b23f7e..58c08b21d 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -35,7 +35,8 @@ defmodule Realtime.Tenants.Connect do broadcast_changes_pid: nil, listen_pid: nil, check_connected_user_interval: nil, - connected_users_bucket: [1] + connected_users_bucket: [1], + tenant: nil # Added to store tenant data for logging @doc """ Returns the database connection for a tenant. If the tenant is not connected, it will attempt to connect to the tenant's database. @@ -47,18 +48,23 @@ defmodule Realtime.Tenants.Connect do | {:error, :tenant_database_connection_initializing} | {:error, :rpc_error, term()} def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do + Logger.info("Looking up or starting connection for tenant: #{tenant_id}") case get_status(tenant_id) do {:ok, conn} -> + Logger.info("Found existing connection for tenant: #{tenant_id}, pid: #{inspect(conn)}") {:ok, conn} {:error, :tenant_database_unavailable} -> + Logger.warning("Tenant database unavailable for: #{tenant_id}, attempting external node") call_external_node(tenant_id, opts) {:error, :tenant_database_connection_initializing} -> + Logger.info("Connection initializing for tenant: #{tenant_id}, retrying after 100ms") Process.sleep(100) call_external_node(tenant_id, opts) {:error, :initializing} -> + Logger.warning("Tenant connection in initializing state for: #{tenant_id}") {:error, :tenant_database_unavailable} end end @@ -72,19 +78,23 @@ defmodule Realtime.Tenants.Connect do | {:error, :initializing} | {:error, :tenant_database_connection_initializing} def get_status(tenant_id) do + Logger.debug("Checking connection status for tenant: #{tenant_id}") case :syn.lookup(__MODULE__, tenant_id) do {_, %{conn: nil}} -> + Logger.debug("Tenant #{tenant_id} in initializing state") {:error, :initializing} {_, %{conn: conn}} -> + Logger.debug("Found active connection for tenant: #{tenant_id}, pid: #{inspect(conn)}") {:ok, conn} :undefined -> - Logger.warning("Connection process starting up") + Logger.info("No connection found for tenant: #{tenant_id}, starting connection process") {:error, :tenant_database_connection_initializing} error -> log_error("SynInitializationError", error) + Logger.error("Syn lookup failed for tenant #{tenant_id}: #{inspect(error)}") {:error, :tenant_database_unavailable} end end @@ -94,6 +104,7 @@ defmodule Realtime.Tenants.Connect do """ @spec connect(binary(), keyword()) :: {:ok, DBConnection.t()} | {:error, term()} def connect(tenant_id, opts \\ []) do + Logger.info("Initiating connection process for tenant: #{tenant_id}") supervisor = {:via, PartitionSupervisor, {Realtime.Tenants.Connect.DynamicSupervisor, tenant_id}} @@ -101,23 +112,29 @@ defmodule Realtime.Tenants.Connect do case DynamicSupervisor.start_child(supervisor, spec) do {:ok, _} -> + Logger.info("Successfully started connection process for tenant: #{tenant_id}") get_status(tenant_id) {:error, {:already_started, _}} -> + Logger.info("Connection process already started for tenant: #{tenant_id}") get_status(tenant_id) {:error, {:shutdown, :tenant_db_too_many_connections}} -> + Logger.error("Too many connections for tenant: #{tenant_id}") {:error, :tenant_db_too_many_connections} {:error, {:shutdown, :tenant_not_found}} -> + Logger.error("Tenant not found: #{tenant_id}") {:error, :tenant_not_found} {:error, :shutdown} -> log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database") + Logger.error("Unable to connect to tenant database for: #{tenant_id}") {:error, :tenant_database_unavailable} {:error, error} -> log_error("UnableToConnectToTenantDatabase", error) + Logger.error("Connection failed for tenant #{tenant_id}: #{inspect(error)}") {:error, :tenant_database_unavailable} end end @@ -127,6 +144,7 @@ defmodule Realtime.Tenants.Connect do """ @spec whereis(binary()) :: pid | nil def whereis(tenant_id) do + Logger.debug("Looking up pid for tenant: #{tenant_id}") case :syn.lookup(__MODULE__, tenant_id) do {pid, _} -> pid :undefined -> nil @@ -138,14 +156,21 @@ defmodule Realtime.Tenants.Connect do """ @spec shutdown(binary()) :: :ok | nil def shutdown(tenant_id) do + Logger.info("Shutting down connection for tenant: #{tenant_id}") case whereis(tenant_id) do - pid when is_pid(pid) -> GenServer.stop(pid) - _ -> :ok + pid when is_pid(pid) -> + Logger.info("Stopping connection process for tenant: #{tenant_id}, pid: #{inspect(pid)}") + GenServer.stop(pid) + + _ -> + Logger.debug("No connection process found to shutdown for tenant: #{tenant_id}") + :ok end end def start_link(opts) do tenant_id = Keyword.get(opts, :tenant_id) + Logger.info("Starting connection GenServer for tenant: #{tenant_id}") check_connected_user_interval = Keyword.get(opts, :check_connected_user_interval, @check_connected_user_interval_default) @@ -163,10 +188,10 @@ defmodule Realtime.Tenants.Connect do end ## GenServer callbacks - # Needs to be done on init/1 to guarantee the GenServer only starts if we are able to connect to the database @impl GenServer def init(%{tenant_id: tenant_id} = state) do Logger.metadata(external_id: tenant_id, project: tenant_id) + Logger.info("Initializing connection for tenant: #{tenant_id}") pipes = [ GetTenant, @@ -175,58 +200,86 @@ defmodule Realtime.Tenants.Connect do RegisterProcess ] + Logger.debug("Running connection pipeline for tenant: #{tenant_id}, pipes: #{inspect(pipes)}") case Piper.run(pipes, state) do - {:ok, acc} -> - {:ok, acc, {:continue, :run_migrations}} + {:ok, %{tenant: tenant} = acc} -> + Logger.info("Connection pipeline completed successfully for tenant: #{tenant_id}, external_id: #{tenant.external_id}") + {:ok, %{acc | tenant: tenant}, {:continue, :run_migrations}} {:error, :tenant_not_found} -> + Logger.error("Tenant not found during initialization: #{tenant_id}") {:stop, {:shutdown, :tenant_not_found}} {:error, :tenant_db_too_many_connections} -> + Logger.error("Too many connections for tenant: #{tenant_id}") {:stop, {:shutdown, :tenant_db_too_many_connections}} {:error, error} -> log_error("UnableToConnectToTenantDatabase", error) + Logger.error("Connection initialization failed for tenant #{tenant_id}: #{inspect(error)}") {:stop, :shutdown} end end - def handle_continue(:run_migrations, state) do - %{tenant: tenant, db_conn_pid: db_conn_pid} = state + def handle_continue(:run_migrations, %{tenant: tenant, db_conn_pid: db_conn_pid} = state) do + Logger.info("Running migrations for tenant: #{tenant.external_id}, db_conn_pid: #{inspect(db_conn_pid)}") - with :ok <- Migrations.run_migrations(tenant), - :ok <- Migrations.create_partitions(db_conn_pid) do - {:noreply, state, {:continue, :start_listen_and_replication}} - else - error -> + # Run migrations with detailed logging + Logger.debug("Starting migrations for tenant: #{tenant.external_id}") + case Migrations.run_migrations(tenant) do + :ok -> + Logger.info("βœ… Migrations completed successfully for tenant: #{tenant.external_id}") + + {:error, error} -> log_error("MigrationsFailedToRun", error) - {:stop, :shutdown, state} + Logger.error("❌ Migrations failed for tenant #{tenant.external_id}: #{inspect(error)}") end + + # Run partition creation with detailed logging + Logger.debug("Creating partitions for tenant: #{tenant.external_id}") + case Migrations.create_partitions(db_conn_pid) do + :ok -> + Logger.info("βœ… Partitions created successfully for tenant: #{tenant.external_id}") + + {:error, error} -> + log_error("PartitionCreationFailed", error) + Logger.error("❌ Partition creation failed for tenant #{tenant.external_id}: #{inspect(error)}") + end + + Logger.info("Proceeding to start listen and replication for tenant: #{tenant.external_id}") + {:noreply, state, {:continue, :start_listen_and_replication}} rescue error -> - log_error("MigrationsFailedToRun", error) - {:stop, :shutdown, state} + log_error("UnexpectedMigrationError", error) + Logger.error("❌ Unexpected error during migrations for tenant #{state.tenant && state.tenant.external_id || state.tenant_id}: #{inspect(error)}") + {:noreply, state, {:continue, :start_listen_and_replication}} end - def handle_continue(:start_listen_and_replication, state) do - %{tenant: tenant} = state + def handle_continue(:start_listen_and_replication, %{tenant: tenant} = state) do + Logger.info("Starting listen and replication for tenant: #{tenant.external_id}") + Logger.debug("Starting replication connection for tenant: #{tenant.external_id}") with {:ok, broadcast_changes_pid} <- ReplicationConnection.start(tenant, self()), + Logger.debug("Starting listen process for tenant: #{tenant.external_id}"), {:ok, listen_pid} <- Listen.start(tenant, self()) do + Logger.info("βœ… Listen and replication started successfully for tenant: #{tenant.external_id}, broadcast_pid: #{inspect(broadcast_changes_pid)}, listen_pid: #{inspect(listen_pid)}") {:noreply, %{state | broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid}, {:continue, :setup_connected_user_events}} else {:error, :max_wal_senders_reached} -> log_error("ReplicationMaxWalSendersReached", "Tenant database has reached the maximum number of WAL senders") + Logger.error("❌ Max WAL senders reached for tenant: #{tenant.external_id}") {:stop, :shutdown, state} {:error, error} -> log_error("StartListenAndReplicationFailed", error) + Logger.error("❌ Failed to start listen and replication for tenant #{tenant.external_id}: #{inspect(error)}") {:stop, :shutdown, state} end rescue error -> log_error("StartListenAndReplicationFailed", error) + Logger.error("❌ Unexpected error during listen/replication setup for tenant #{state.tenant && state.tenant.external_id || state.tenant_id}: #{inspect(error)}") {:stop, :shutdown, state} end @@ -235,12 +288,21 @@ defmodule Realtime.Tenants.Connect do %{ check_connected_user_interval: check_connected_user_interval, connected_users_bucket: connected_users_bucket, - tenant_id: tenant_id + tenant_id: tenant_id, + tenant: tenant } = state + Logger.info("Setting up connected user events for tenant: #{tenant_id}, external_id: #{tenant.external_id}") + Logger.debug("Subscribing to PubSub topic: realtime:operations:#{tenant_id}") :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant_id) + + Logger.debug("Scheduling connected user check with interval: #{check_connected_user_interval}ms") send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) + + Logger.debug("Inserting tenant_id into ETS: #{tenant_id}") :ets.insert(__MODULE__, {tenant_id}) + + Logger.info("βœ… Connected user events setup completed for tenant: #{tenant_id}") {:noreply, state} end @@ -250,71 +312,85 @@ defmodule Realtime.Tenants.Connect do %{ tenant_id: tenant_id, check_connected_user_interval: check_connected_user_interval, - connected_users_bucket: connected_users_bucket + connected_users_bucket: connected_users_bucket, + tenant: tenant } = state ) do + Logger.debug("Checking connected users for tenant: #{tenant_id}, external_id: #{tenant.external_id}") connected_users_bucket = tenant_id |> update_connected_users_bucket(connected_users_bucket) |> send_connected_user_check_message(check_connected_user_interval) + Logger.debug("Updated connected users bucket: #{inspect(connected_users_bucket)}") {:noreply, %{state | connected_users_bucket: connected_users_bucket}} end - def handle_info(:shutdown, state) do + def handle_info(:shutdown, %{tenant_id: tenant_id, tenant: tenant} = state) do %{ db_conn_pid: db_conn_pid, broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid } = state - Logger.info("Tenant has no connected users, database connection will be terminated") + Logger.info("Initiating shutdown for tenant: #{tenant_id}, external_id: #{tenant.external_id} due to no connected users") + Logger.debug("Stopping database connection, pid: #{inspect(db_conn_pid)}") :ok = GenServer.stop(db_conn_pid, :normal, 500) - broadcast_changes_pid && Process.alive?(broadcast_changes_pid) && + if broadcast_changes_pid && Process.alive?(broadcast_changes_pid) do + Logger.debug("Stopping broadcast changes process, pid: #{inspect(broadcast_changes_pid)}") GenServer.stop(broadcast_changes_pid, :normal, 500) + end - listen_pid && Process.alive?(listen_pid) && + if listen_pid && Process.alive?(listen_pid) do + Logger.debug("Stopping listen process, pid: #{inspect(listen_pid)}") GenServer.stop(listen_pid, :normal, 500) + end {:stop, :normal, state} end - def handle_info(:suspend_tenant, state) do + def handle_info(:suspend_tenant, %{tenant_id: tenant_id, tenant: tenant} = state) do %{ db_conn_pid: db_conn_pid, broadcast_changes_pid: broadcast_changes_pid, listen_pid: listen_pid } = state - Logger.warning("Tenant was suspended, database connection will be terminated") + Logger.warning("Suspending tenant: #{tenant_id}, external_id: #{tenant.external_id}") + Logger.debug("Stopping database connection, pid: #{inspect(db_conn_pid)}") :ok = GenServer.stop(db_conn_pid, :normal, 500) - broadcast_changes_pid && Process.alive?(broadcast_changes_pid) && + if broadcast_changes_pid && Process.alive?(broadcast_changes_pid) do + Logger.debug("Stopping broadcast changes process, pid: #{inspect(broadcast_changes_pid)}") GenServer.stop(broadcast_changes_pid, :normal, 500) + end - listen_pid && Process.alive?(listen_pid) && + if listen_pid && Process.alive?(listen_pid) do + Logger.debug("Stopping listen process, pid: #{inspect(listen_pid)}") GenServer.stop(listen_pid, :normal, 500) + end {:stop, :normal, state} end def handle_info( - {:DOWN, db_conn_reference, _, _, _}, - %{db_conn_reference: db_conn_reference} = state + {:DOWN, db_conn_reference, _, _, reason}, + %{db_conn_reference: db_conn_reference, tenant_id: tenant_id, tenant: tenant} = state ) do - Logger.info("Database connection has been terminated") + Logger.info("Database connection terminated for tenant: #{tenant_id}, external_id: #{tenant.external_id}, reason: #{inspect(reason)}") {:stop, :normal, state} end # Ignore messages to avoid handle_info unmatched functions - def handle_info(_, state) do + def handle_info(msg, %{tenant_id: tenant_id, tenant: tenant} = state) do + Logger.debug("Ignoring unknown message for tenant #{tenant_id}, external_id: #{tenant && tenant.external_id || "unknown"}: #{inspect(msg)}") {:noreply, state} end @impl true - def terminate(reason, %{tenant_id: tenant_id}) do - Logger.info("Tenant #{tenant_id} has been terminated: #{inspect(reason)}") + def terminate(reason, %{tenant_id: tenant_id, tenant: tenant}) do + Logger.info("Tenant #{tenant_id}, external_id: #{tenant && tenant.external_id || "unknown"} terminated, reason: #{inspect(reason)}") Realtime.MetricsCleaner.delete_metric(tenant_id) :ok end @@ -322,32 +398,49 @@ defmodule Realtime.Tenants.Connect do ## Private functions defp call_external_node(tenant_id, opts) do rpc_timeout = Keyword.get(opts, :rpc_timeout, @rpc_timeout_default) + Logger.info("Calling external node for tenant: #{tenant_id}, timeout: #{rpc_timeout}ms") with tenant <- Tenants.Cache.get_tenant_by_external_id(tenant_id), :ok <- tenant_suspended?(tenant), {:ok, node} <- Realtime.Nodes.get_node_for_tenant(tenant) do - Rpc.enhanced_call(node, __MODULE__, :connect, [tenant_id, opts], timeout: rpc_timeout, tenant: tenant_id) + Logger.debug("Found node for tenant: #{tenant_id}, node: #{inspect(node)}") + result = Rpc.enhanced_call(node, __MODULE__, :connect, [tenant_id, opts], timeout: rpc_timeout, tenant: tenant_id) + Logger.info("External node call result for tenant #{tenant_id}: #{inspect(result)}") + result + else + error -> + Logger.error("External node call failed for tenant #{tenant_id}: #{inspect(error)}") + {:error, :rpc_error, error} end end defp update_connected_users_bucket(tenant_id, connected_users_bucket) do - connected_users_bucket + Logger.debug("Updating connected users bucket for tenant: #{tenant_id}") + new_bucket = connected_users_bucket |> then(&(&1 ++ [UsersCounter.tenant_users(tenant_id)])) |> Enum.take(-6) + Logger.debug("New connected users bucket: #{inspect(new_bucket)}") + new_bucket end defp send_connected_user_check_message( @connected_users_bucket_shutdown, check_connected_user_interval ) do + Logger.info("Scheduling shutdown check due to no connected users, interval: #{check_connected_user_interval}ms") Process.send_after(self(), :shutdown, check_connected_user_interval) end defp send_connected_user_check_message(connected_users_bucket, check_connected_user_interval) do + Logger.debug("Scheduling next connected users check, interval: #{check_connected_user_interval}ms") Process.send_after(self(), :check_connected_users, check_connected_user_interval) connected_users_bucket end - defp tenant_suspended?(%Tenant{suspend: true}), do: {:error, :tenant_suspended} + defp tenant_suspended?(%Tenant{suspend: true}) do + Logger.warning("Tenant is suspended") + {:error, :tenant_suspended} + end + defp tenant_suspended?(_), do: :ok end diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index ddb4f4bde..2ea0aa77c 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -159,31 +159,54 @@ defmodule Realtime.Tenants.Migrations do spec = {__MODULE__, attrs} + Logger.info("Starting migration process for tenant: #{tenant.external_id}") case DynamicSupervisor.start_child(supervisor, spec) do - :ignore -> :ok - error -> error + :ignore -> + Logger.info("Migration process for tenant #{tenant.external_id} already running, skipping") + :ok + {:ok, _pid} -> + Logger.info("Migration process started for tenant #{tenant.external_id}") + :ok + {:error, error} -> + Logger.error("Failed to start migration process for tenant #{tenant.external_id}: #{inspect(error)}") + {:error, error} end end def start_link(%__MODULE__{tenant_external_id: tenant_external_id} = attrs) do name = {:via, Registry, {Unique, {__MODULE__, :host, tenant_external_id}}} + Logger.info("Initializing migration GenServer for tenant: #{tenant_external_id}") GenServer.start_link(__MODULE__, attrs, name: name) end def init(%__MODULE__{tenant_external_id: tenant_external_id, settings: settings}) do Logger.metadata(external_id: tenant_external_id, project: tenant_external_id) + Logger.info("Starting migration initialization for tenant: #{tenant_external_id}") case migrate(settings) do - {:ok, _} -> :ignore - {:error, error} -> {:stop, error} + {:ok, result} -> + Logger.info("Migration initialization completed successfully for tenant #{tenant_external_id}") + {:ok, result} + {:error, error} -> + Logger.error("Migration initialization failed for tenant #{tenant_external_id}: #{inspect(error)}") + {:stop, error} end end defp migrate(settings) do settings = Database.from_settings(settings, "realtime_migrations", :stop) - [ - hostname: settings.hostname, + # Apply fallback for hostname if it's "db" + adjusted_hostname = if settings.hostname == "db" do + Logger.warn("Hostname 'db' detected in migration, falling back to DB_HOST: #{System.get_env("DB_HOST")}") + System.get_env("DB_HOST", "your-rds.ap-southeast-2.rds.amazonaws.com") + else + settings.hostname + end + + Logger.info("Configuring database connection for migration: hostname=#{adjusted_hostname}, database=#{settings.database}, port=#{settings.port}") + connection_config = [ + hostname: adjusted_hostname, port: settings.port, database: settings.database, password: settings.password, @@ -191,20 +214,47 @@ defmodule Realtime.Tenants.Migrations do pool_size: settings.pool_size, backoff_type: settings.backoff_type, socket_options: settings.socket_options, - parameters: [application_name: settings.application_name], + parameters: [application_name: settings.application_name, search_path: "realtime"], ssl: settings.ssl ] - |> Repo.with_dynamic_repo(fn repo -> - Logger.info("Applying migrations to #{settings.hostname}") + Repo.with_dynamic_repo(connection_config, fn repo -> + Logger.info("Applying migrations with dynamic repo for tenant, schema: realtime") try do + # Ensure the realtime schema exists (no need to stop conn manually) + Postgrex.query!(Repo.get_dynamic_repo(), "CREATE SCHEMA IF NOT EXISTS realtime", []) + opts = [all: true, prefix: "realtime", dynamic_repo: repo] - res = Ecto.Migrator.run(Repo, @migrations, :up, opts) + Logger.info("Migration options: #{inspect(opts)}") + total_count = 0 + failed_count = 0 + + migrations = @migrations + Logger.info("Found migrations: #{inspect(migrations)}") - {:ok, res} + for {version, module} <- migrations do + applied = Enum.any?(Ecto.Migrator.migrated_versions(repo), &(&1 == version)) + if applied do + Logger.info("Migration #{version} (#{module}) already applied") + else + Logger.info("Applying migration: #{version} (#{module})") + case Ecto.Migrator.up(repo, version, module) do + :ok -> + total_count = total_count + 1 + Logger.info("βœ… Successfully applied migration: #{version} (#{module})") + {:error, error} -> + failed_count = failed_count + 1 + Logger.error("❌ Error applying migration #{version} (#{module}): #{inspect(error)}") + end + end + end + + Logger.info("Migration process completed - Success: #{total_count}, Failed: #{failed_count}") + {:ok, %{total: total_count, failed: failed_count}} rescue error -> log_error("MigrationsFailedToRun", error) + Logger.error("Unexpected error during migration: #{inspect(error)}") {:error, error} end end) @@ -215,7 +265,7 @@ defmodule Realtime.Tenants.Migrations do """ @spec create_partitions(pid()) :: :ok def create_partitions(db_conn_pid) do - Logger.info("Creating partitions for realtime.messages") + Logger.info("Starting partition creation for realtime.messages") today = Date.utc_today() yesterday = Date.add(today, -1) future = Date.add(today, 3) @@ -227,21 +277,27 @@ defmodule Realtime.Tenants.Migrations do start_timestamp = Date.to_string(date) end_timestamp = Date.to_string(Date.add(date, 1)) - Database.transaction(db_conn_pid, fn conn -> + Logger.info("Creating partition: #{partition_name}, range: #{start_timestamp} to #{end_timestamp}") + case Database.transaction(db_conn_pid, fn conn -> query = """ CREATE TABLE IF NOT EXISTS realtime.#{partition_name} PARTITION OF realtime.messages FOR VALUES FROM ('#{start_timestamp}') TO ('#{end_timestamp}'); """ - case Postgrex.query(conn, query, []) do - {:ok, _} -> Logger.debug("Partition #{partition_name} created") - {:error, %Postgrex.Error{postgres: %{code: :duplicate_table}}} -> :ok - {:error, error} -> log_error("PartitionCreationFailed", error) - end - end) + Postgrex.query(conn, query, []) + end) do + {:ok, _} -> + Logger.info("βœ… Partition #{partition_name} created successfully") + {:error, %Postgrex.Error{postgres: %{code: :duplicate_table}}} -> + Logger.info("⚠ Partition #{partition_name} already exists, skipping") + {:error, error} -> + log_error("PartitionCreationFailed", error) + Logger.error("❌ Failed to create partition #{partition_name}: #{inspect(error)}") + end end) + Logger.info("Partition creation process completed") :ok end end diff --git a/lib/realtime/tenants/repo/migrations/20230328144023_create_list_changes_function.ex b/lib/realtime/tenants/repo/migrations/20230328144023_create_list_changes_function.ex index cb55b152b..ec2ce72f5 100644 --- a/lib/realtime/tenants/repo/migrations/20230328144023_create_list_changes_function.ex +++ b/lib/realtime/tenants/repo/migrations/20230328144023_create_list_changes_function.ex @@ -8,7 +8,7 @@ defmodule Realtime.Tenants.Migrations.CreateListChangesFunction do "create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int) returns setof realtime.wal_rls language sql - set log_min_messages to 'fatal' + as $$ with pub as ( select diff --git a/lib/realtime/tenants/repo/migrations/20231018144023_create_channels.ex b/lib/realtime/tenants/repo/migrations/20231018144023_create_channels.ex index 779ca6d03..16592c037 100644 --- a/lib/realtime/tenants/repo/migrations/20231018144023_create_channels.ex +++ b/lib/realtime/tenants/repo/migrations/20231018144023_create_channels.ex @@ -6,7 +6,7 @@ defmodule Realtime.Tenants.Migrations.CreateChannels do def change do create table(:channels, prefix: "realtime") do add(:name, :string, null: false) - timestamps() + timestamps(type: :utc_datetime_usec) end create unique_index(:channels, [:name], prefix: "realtime") diff --git a/lib/realtime/tenants/repo/migrations/20240227174441_add_broadcast_permissions_table.ex b/lib/realtime/tenants/repo/migrations/20240227174441_add_broadcast_permissions_table.ex index 37a8b31bc..329028e85 100644 --- a/lib/realtime/tenants/repo/migrations/20240227174441_add_broadcast_permissions_table.ex +++ b/lib/realtime/tenants/repo/migrations/20240227174441_add_broadcast_permissions_table.ex @@ -7,7 +7,7 @@ defmodule Realtime.Tenants.Migrations.AddBroadcastsPoliciesTable do create table(:broadcasts) do add :channel_id, references(:channels, on_delete: :delete_all), null: false add :check, :boolean, default: false, null: false - timestamps() + timestamps(type: :utc_datetime_usec) end create unique_index(:broadcasts, :channel_id) diff --git a/lib/realtime/tenants/repo/migrations/20240321100241_add_presences_permissions_table.ex b/lib/realtime/tenants/repo/migrations/20240321100241_add_presences_permissions_table.ex index e2d029934..ef7a01fb3 100644 --- a/lib/realtime/tenants/repo/migrations/20240321100241_add_presences_permissions_table.ex +++ b/lib/realtime/tenants/repo/migrations/20240321100241_add_presences_permissions_table.ex @@ -7,7 +7,7 @@ defmodule Realtime.Tenants.Migrations.AddPresencesPoliciesTable do create table(:presences) do add :channel_id, references(:channels, on_delete: :delete_all), null: false add :check, :boolean, default: false, null: false - timestamps() + timestamps(type: :utc_datetime_usec) end create unique_index(:presences, :channel_id) diff --git a/lib/realtime/tenants/repo/migrations/20240401105812_create_realtime_admin_and_move_ownership.ex b/lib/realtime/tenants/repo/migrations/20240401105812_create_realtime_admin_and_move_ownership.ex index bc0672472..1c09f122c 100644 --- a/lib/realtime/tenants/repo/migrations/20240401105812_create_realtime_admin_and_move_ownership.ex +++ b/lib/realtime/tenants/repo/migrations/20240401105812_create_realtime_admin_and_move_ownership.ex @@ -1,35 +1,36 @@ defmodule Realtime.Tenants.Migrations.CreateRealtimeAdminAndMoveOwnership do - @moduledoc false + @moduledoc false - use Ecto.Migration + use Ecto.Migration - def change do - execute(""" - DO - $do$ - BEGIN - IF EXISTS ( - SELECT FROM pg_catalog.pg_roles - WHERE rolname = 'supabase_realtime_admin') THEN + def change do + # Create role without NOINHERIT and grant to postgres + execute """ + DO + $do$ + BEGIN + IF EXISTS ( + SELECT FROM pg_catalog.pg_roles + WHERE rolname = 'supabase_realtime_admin') THEN + RAISE NOTICE 'Role "supabase_realtime_admin" already exists. Skipping.'; + ELSE + CREATE ROLE supabase_realtime_admin WITH NOLOGIN NOREPLICATION; + GRANT supabase_realtime_admin TO postgres; + END IF; + END + $do$; + """ - RAISE NOTICE 'Role "supabase_realtime_admin" already exists. Skipping.'; - ELSE - CREATE ROLE supabase_realtime_admin WITH NOINHERIT NOLOGIN NOREPLICATION; - END IF; - END - $do$; - """) + # Grant privileges to supabase_realtime_admin + execute "GRANT ALL PRIVILEGES ON SCHEMA realtime TO supabase_realtime_admin" + execute "GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA realtime TO supabase_realtime_admin" + execute "GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA realtime TO supabase_realtime_admin" + execute "GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA realtime TO supabase_realtime_admin" - execute("GRANT ALL PRIVILEGES ON SCHEMA realtime TO supabase_realtime_admin") - execute("GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA realtime TO supabase_realtime_admin") - execute("GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA realtime TO supabase_realtime_admin") - execute("GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA realtime TO supabase_realtime_admin") - - execute("ALTER table realtime.channels OWNER to supabase_realtime_admin") - execute("ALTER table realtime.broadcasts OWNER to supabase_realtime_admin") - execute("ALTER table realtime.presences OWNER TO supabase_realtime_admin") - execute("ALTER function realtime.channel_name() owner to supabase_realtime_admin") - - execute("GRANT supabase_realtime_admin TO postgres") - end -end + # Transfer ownership of tables and function + execute "ALTER TABLE realtime.channels OWNER TO supabase_realtime_admin" + execute "ALTER TABLE realtime.broadcasts OWNER TO supabase_realtime_admin" + execute "ALTER TABLE realtime.presences OWNER TO supabase_realtime_admin" + execute "ALTER FUNCTION realtime.channel_name() OWNER TO supabase_realtime_admin" + end + end diff --git a/lib/realtime/tenants/repo/migrations/20240523004032_redefine_authorization_tables.ex b/lib/realtime/tenants/repo/migrations/20240523004032_redefine_authorization_tables.ex index 8db981653..c8684db89 100644 --- a/lib/realtime/tenants/repo/migrations/20240523004032_redefine_authorization_tables.ex +++ b/lib/realtime/tenants/repo/migrations/20240523004032_redefine_authorization_tables.ex @@ -11,7 +11,7 @@ defmodule Realtime.Tenants.Migrations.RedefineAuthorizationTables do create_if_not_exists table(:messages) do add :topic, :text, null: false add :extension, :text, null: false - timestamps() + timestamps(type: :utc_datetime_usec) end create_if_not_exists index(:messages, [:topic]) diff --git a/mix.exs b/mix.exs index eddc68e4f..31caeaaf9 100644 --- a/mix.exs +++ b/mix.exs @@ -11,7 +11,14 @@ defmodule Realtime.MixProject do aliases: aliases(), deps: deps(), dialyzer: dialyzer(), - test_coverage: [tool: ExCoveralls] + test_coverage: [tool: ExCoveralls], + # releases: [ + # realtime: [ + # include_executables_for: [:unix], + # applications: [runtime_tools: :permanent], + # exclude: ["priv/repo/migrations"] # Exclude migrations from the release + # ] + # ] ] end diff --git a/priv/repo/migrations/20210706140551_create_tenant.exs b/priv/repo/migrations/20210706140551_create_tenant.exs index eb1f0bc6b..4787008c7 100644 --- a/priv/repo/migrations/20210706140551_create_tenant.exs +++ b/priv/repo/migrations/20210706140551_create_tenant.exs @@ -1,16 +1,31 @@ defmodule Realtime.Repo.Migrations.CreateTenants do use Ecto.Migration + require Logger def change do + Logger.info("Starting migration to create tenants table in realtime schema") + + # Ensure the schema exists (optional, for robustness) + execute("CREATE SCHEMA IF NOT EXISTS realtime") + + # Set search path temporarily to ensure correct schema + execute("SET search_path TO realtime") + + # Create the table create table(:tenants, primary_key: false) do - add(:id, :binary_id, primary_key: true) - add(:name, :string) - add(:external_id, :string) - add(:jwt_secret, :string, size: 500) - add(:max_concurrent_users, :integer, default: 10_000) - timestamps() + add :id, :binary_id, primary_key: true + add :name, :string + add :external_id, :string + add :jwt_secret, :string, size: 500 + add :max_concurrent_users, :integer, default: 10_000 + timestamps(type: :utc_datetime_usec) end - create(index(:tenants, [:external_id], unique: true)) + Logger.info("Created tenants table, creating index") + + # Create the index + execute("CREATE UNIQUE INDEX tenants_external_id_index ON realtime.tenants (external_id);") + + Logger.info("Created tenants_external_id_index") end end diff --git a/priv/repo/migrations/20220329161857_add_extensions_table.exs b/priv/repo/migrations/20220329161857_add_extensions_table.exs index 625c00241..8a1843b03 100644 --- a/priv/repo/migrations/20220329161857_add_extensions_table.exs +++ b/priv/repo/migrations/20220329161857_add_extensions_table.exs @@ -12,7 +12,7 @@ defmodule Realtime.Repo.Migrations.AddExtensionsTable do references(:tenants, on_delete: :delete_all, type: :string, column: :external_id) ) - timestamps() + timestamps(type: :utc_datetime_usec) end create(index(:extensions, [:tenant_external_id, :type], unique: true)) diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs index 0f61b119f..aa4b5bcd0 100644 --- a/priv/repo/seeds.exs +++ b/priv/repo/seeds.exs @@ -6,14 +6,26 @@ tenant_name = System.get_env("SELF_HOST_TENANT_NAME", "realtime-dev") env = if :ets.whereis(Mix.State) != :undefined, do: Mix.env(), else: :prod default_db_host = if env in [:dev, :test], do: "127.0.0.1", else: "host.docker.internal" +Logger.info("Starting seeding process for tenant: #{tenant_name}, environment: #{env}") +Logger.info("Default DB host: #{default_db_host}") + Repo.transaction(fn -> + {:ok, conn} = Repo.checkout() + current_schema = Postgrex.query!(conn, "SHOW search_path", []) |> Map.get(:rows) |> List.first() |> List.first() + Logger.info("Current schema in transaction: #{current_schema}") + case Repo.get_by(Tenant, external_id: tenant_name) do - %Tenant{} = tenant -> Repo.delete!(tenant) - nil -> {:ok, nil} + %Tenant{} = tenant -> + Logger.info("Deleting existing tenant: #{tenant_name}") + Repo.delete!(tenant) + nil -> + Logger.info("No existing tenant found for: #{tenant_name}") end - %Tenant{} - |> Tenant.changeset(%{ + now = DateTime.utc_now() + Logger.info("Generated timestamp for insertion: #{inspect(now)}") + + changeset = Tenant.changeset(%Tenant{}, %{ "name" => tenant_name, "external_id" => tenant_name, "jwt_secret" => System.get_env("API_JWT_SECRET", "super-secret-jwt-token-with-at-least-32-characters-long"), @@ -23,35 +35,52 @@ Repo.transaction(fn -> "type" => "postgres_cdc_rls", "settings" => %{ "db_name" => System.get_env("DB_NAME", "postgres"), - "db_host" => System.get_env("DB_HOST", default_db_host), + "db_host" => System.get_env("DB_HOST", "your-rds.ap-southeast-2.rds.amazonaws.com"), "db_user" => System.get_env("DB_USER", "supabase_admin"), - "db_password" => System.get_env("DB_PASSWORD", "postgres"), - "db_port" => System.get_env("DB_PORT", "5433"), - "region" => "us-east-1", + "db_password" => System.get_env("DB_PASSWORD", "your-super-secret-and-long-postgres-password"), + "db_port" => System.get_env("DB_PORT", "5432"), + "region" => "ap-southeast-2", "poll_interval_ms" => 100, "poll_max_record_bytes" => 1_048_576, "ssl_enforced" => false } } - ] + ], + "inserted_at" => now, + "updated_at" => now }) - |> Repo.insert!() + + Logger.info("Changeset before insert: #{inspect(changeset.changes)}") + case Repo.insert(changeset) do + {:ok, tenant} -> + Logger.info("βœ… Successfully inserted tenant: #{tenant.external_id}") + {:error, changeset} -> + Logger.error("❌ Failed to insert tenant: #{inspect(changeset.errors)}") + raise "Seeding failed" + end end) if env in [:dev, :test] do publication = "supabase_realtime" + Logger.info("Setting up test environment with publication: #{publication}") {:ok, _} = Repo.transaction(fn -> [ "drop publication if exists #{publication}", "drop table if exists public.test_tenant;", - "create table public.test_tenant ( id SERIAL PRIMARY KEY, details text );", + "create table public.test_tenant (id SERIAL PRIMARY KEY, details text);", "grant all on table public.test_tenant to anon;", "grant all on table public.test_tenant to postgres;", "grant all on table public.test_tenant to authenticated;", "create publication #{publication} for table public.test_tenant" ] - |> Enum.each(&query(Repo, &1, [])) + |> Enum.each(fn sql -> + Logger.info("Executing SQL: #{sql}") + case query(Repo, sql, []) do + {:ok, _} -> Logger.info("βœ… SQL executed successfully: #{sql}") + {:error, error} -> Logger.error("❌ SQL execution failed: #{inspect(error)}") + end + end) end) end diff --git a/run.sh b/run.sh index 97ddf5bb0..20de1b81d 100755 --- a/run.sh +++ b/run.sh @@ -3,6 +3,46 @@ set -euo pipefail set -x ulimit -n + +echo "Testing database connection..." + +# Use environment variables to construct connection string +POSTGRES_HOST=${DB_HOST} +POSTGRES_PORT=${DB_PORT} +POSTGRES_DB=${DB_NAME} +POSTGRES_USER=${DB_USER} +POSTGRES_PASSWORD=${DB_PASSWORD} + +# Construct connection string +PG_CONN="postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}" + +# Test connection and search_path +echo "Trying to connect to PostgreSQL at ${POSTGRES_HOST}:${POSTGRES_PORT} as ${POSTGRES_USER}..." +if psql "${PG_CONN}/${POSTGRES_DB}" -c "SELECT 1;" > /dev/null 2>&1; then + echo "βœ… Database connection successful!" + + # Test search_path + echo "Testing search_path setting..." + psql "${PG_CONN}/${POSTGRES_DB}" -c "SET search_path TO _realtime; SELECT current_schema();" + + # List available schemas + echo "Available schemas:" + psql "${PG_CONN}/${POSTGRES_DB}" -c "\dn" + + # Check if _realtime schema exists + if psql "${PG_CONN}/${POSTGRES_DB}" -t -c "SELECT 1 FROM information_schema.schemata WHERE schema_name = '_realtime';" | grep -q 1; then + echo "βœ… _realtime schema exists" + else + echo "❌ Warning: _realtime schema does not exist" + fi +else + echo "❌ Failed to connect to the database. Please check your credentials and network." + echo "Error details:" + psql "${PG_CONN}/${POSTGRES_DB}" -c "SELECT 1;" + exit 1 +fi + + if [ ! -z "$RLIMIT_NOFILE" ]; then echo "Setting RLIMIT_NOFILE to ${RLIMIT_NOFILE}" ulimit -Sn "$RLIMIT_NOFILE" @@ -54,19 +94,33 @@ upload_crash_dump_to_s3() { exit "$EXIT_CODE" } - if [ "${ENABLE_ERL_CRASH_DUMP:-false}" = true ]; then trap upload_crash_dump_to_s3 INT TERM KILL EXIT fi -echo "Running migrations" -sudo -E -u nobody /app/bin/migrate + +echo "Running migrations..." +if ! sudo -E -u nobody /app/bin/migrate; then + echo "❌ Migration failed, exiting" + # exit 1 +fi +echo "βœ… Migrations completed" if [ "${SEED_SELF_HOST-}" = true ]; then - echo "Seeding selfhosted Realtime" - sudo -E -u nobody /app/bin/realtime eval 'Realtime.Release.seeds(Realtime.Repo)' + echo "Seeding selfhosted Realtime..." + echo "Checking database connection..." + if ! sudo -E -u nobody /app/bin/realtime eval 'IO.inspect(Realtime.Repo.query("SELECT 1"))'; then + echo "❌ Database connection failed" + # exit 1 + fi + echo "Running seeding..." + if ! sudo -E -u nobody /app/bin/realtime eval 'Realtime.Release.seeds(Realtime.Repo)'; then + echo "❌ Seeding failed" + # exit 1 + fi + echo "βœ… Seeding completed" fi echo "Starting Realtime" ulimit -n -exec "$@" +exec "$@" \ No newline at end of file From 4336685c20e8bc9dc2f2e54cad1df034cc4ca256 Mon Sep 17 00:00:00 2001 From: "John D. Pope" Date: Tue, 18 Mar 2025 19:04:06 +1100 Subject: [PATCH 2/3] Update run.sh --- run.sh | 333 +++++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 274 insertions(+), 59 deletions(-) diff --git a/run.sh b/run.sh index 20de1b81d..604aa7a6d 100755 --- a/run.sh +++ b/run.sh @@ -1,53 +1,240 @@ #!/bin/bash -set -euo pipefail -set -x -ulimit -n +# Enable strict mode +set -euo pipefail +# Uncomment for debugging +# set -x -echo "Testing database connection..." +# Validate required environment variables +echo "Validating environment variables..." +for var in DB_HOST DB_PORT DB_NAME DB_USER DB_PASSWORD RLIMIT_NOFILE ERL_CRASH_DUMP_S3_BUCKET; do + if [ -z "${!var:-}" ]; then + echo "❌ Error: Environment variable $var is not set or empty" + exit 1 + fi +done +echo "βœ… Environment variables validated" -# Use environment variables to construct connection string +# Database connection details POSTGRES_HOST=${DB_HOST} POSTGRES_PORT=${DB_PORT} -POSTGRES_DB=${DB_NAME} +POSTGRES_DB=${DB_NAME} # Typically 'postgres' for control plane POSTGRES_USER=${DB_USER} POSTGRES_PASSWORD=${DB_PASSWORD} +PG_CONN="postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}" -# Construct connection string -PG_CONN="postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}" - -# Test connection and search_path -echo "Trying to connect to PostgreSQL at ${POSTGRES_HOST}:${POSTGRES_PORT} as ${POSTGRES_USER}..." -if psql "${PG_CONN}/${POSTGRES_DB}" -c "SELECT 1;" > /dev/null 2>&1; then - echo "βœ… Database connection successful!" - - # Test search_path - echo "Testing search_path setting..." - psql "${PG_CONN}/${POSTGRES_DB}" -c "SET search_path TO _realtime; SELECT current_schema();" - - # List available schemas - echo "Available schemas:" - psql "${PG_CONN}/${POSTGRES_DB}" -c "\dn" - - # Check if _realtime schema exists - if psql "${PG_CONN}/${POSTGRES_DB}" -t -c "SELECT 1 FROM information_schema.schemata WHERE schema_name = '_realtime';" | grep -q 1; then - echo "βœ… _realtime schema exists" - else - echo "❌ Warning: _realtime schema does not exist" - fi +# Test control database connection +echo "Testing control database connection..." +echo "Trying to connect to PostgreSQL at ${POSTGRES_HOST}:${POSTGRES_PORT} as ${POSTGRES_USER} (control db: ${POSTGRES_DB})..." +if psql "${PG_CONN}" -c "SELECT 1;" > /dev/null 2>&1; then + echo "βœ… Control database connection successful!" else - echo "❌ Failed to connect to the database. Please check your credentials and network." + echo "❌ Failed to connect to the control database. Please check your credentials and network." echo "Error details:" - psql "${PG_CONN}/${POSTGRES_DB}" -c "SELECT 1;" + psql "${PG_CONN}" -c "SELECT 1;" + exit 1 +fi + +# Check WAL and replication settings +echo "Checking PostgreSQL replication settings..." +WAL_LEVEL=$(psql "${PG_CONN}" -t -c "SHOW wal_level;" 2>/dev/null | tr -d ' ') +MAX_SLOTS=$(psql "${PG_CONN}" -t -c "SHOW max_replication_slots;" 2>/dev/null | tr -d ' ') +echo "wal_level: $WAL_LEVEL" +echo "max_replication_slots: $MAX_SLOTS" +if [ "$WAL_LEVEL" != "logical" ]; then + echo "⚠️ Warning: wal_level is '$WAL_LEVEL', expected 'logical' for replication" +else + echo "βœ… wal_level is set to 'logical'" +fi +if [ "$MAX_SLOTS" -lt 1 ]; then + echo "❌ Error: max_replication_slots is $MAX_SLOTS, must be >= 1" exit 1 +else + echo "βœ… max_replication_slots is sufficient ($MAX_SLOTS)" +fi + +# Check user replication privileges +echo "Checking replication privileges for ${POSTGRES_USER}..." +REPL_PRIV=$(psql "${PG_CONN}" -t -c "SELECT rolreplication FROM pg_roles WHERE rolname = '${POSTGRES_USER}';" 2>/dev/null | tr -d ' ') +if [ "$REPL_PRIV" = "t" ]; then + echo "βœ… ${POSTGRES_USER} has replication privileges" +else + echo "⚠️ Warning: ${POSTGRES_USER} lacks replication privileges" +fi + +# List tenants from control database +echo "Listing tenants from control database (${POSTGRES_DB}):" +TENANTS=$(psql "${PG_CONN}" -t -c "SELECT name, external_id FROM realtime.tenants;" 2>/dev/null) +echo "$TENANTS" +if [ -z "$TENANTS" ]; then + echo "⚠️ Warning: No tenants found in realtime.tenants, proceeding with diagnostics" + TENANT_DB="${POSTGRES_DB}" # Fallback to control DB if no tenants + EXTERNAL_ID="none" +else + echo "βœ… Tenants listed successfully" + # Use first tenant + EXTERNAL_ID=$(echo "$TENANTS" | head -n 1 | awk '{print $3}' | tr -d ' ') + TENANT_NAME=$(echo "$TENANTS" | head -n 1 | awk '{print $1}' | tr -d ' ') + TENANT_DB="${TENANT_NAME}" + echo "Determined external_id from tenants table: ${EXTERNAL_ID}" + echo "πŸ„ Introspected tenant database name: ${TENANT_DB}" +fi + +# Construct tenant connection string +TENANT_PG_CONN="postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${TENANT_DB}" + +# Test tenant database connection +echo "Testing tenant database connection (${TENANT_DB})..." +if psql "${TENANT_PG_CONN}" -c "SELECT 1;" > /dev/null 2>&1; then + echo "βœ… Tenant database connection successful!" +else + echo "❌ Warning: Failed to connect to tenant database (${TENANT_DB}). Using control DB for further diagnostics." + echo "Error details:" + psql "${TENANT_PG_CONN}" -c "SELECT 1;" + TENANT_PG_CONN="${PG_CONN}" # Fallback to control DB +fi + +# Check realtime.subscription table +echo "Details of realtime.subscription table in database (${TENANT_DB}):" +SUBSCRIPTION_TABLE=$(psql "${TENANT_PG_CONN}" -c "\d realtime.subscription" 2>/dev/null) +echo "$SUBSCRIPTION_TABLE" +if echo "$SUBSCRIPTION_TABLE" | grep -q "subscription_id"; then + echo "βœ… realtime.subscription table found with expected structure" +else + echo "⚠️ Warning: realtime.subscription table not found or missing expected structure" +fi + +# List indexes on realtime.subscription +echo "Indexes on realtime.subscription:" +SUBSCRIPTION_INDEXES=$(psql "${TENANT_PG_CONN}" -t -c "SELECT indexname, indexdef FROM pg_indexes WHERE schemaname = 'realtime' AND tablename = 'subscription';" 2>/dev/null) +echo "$SUBSCRIPTION_INDEXES" +if [ -n "$SUBSCRIPTION_INDEXES" ]; then + echo "βœ… Indexes found on realtime.subscription" +else + echo "⚠️ Warning: No indexes found on realtime.subscription" +fi + +# List subscribed tables via publications +echo "Tables subscribed via publications in database (${TENANT_DB}):" +SUBSCRIBED_TABLES=$(psql "${TENANT_PG_CONN}" -t -c "SELECT pubname, schemaname, tablename FROM pg_publication_tables;" 2>/dev/null) +echo "$SUBSCRIBED_TABLES" +if [ -n "$SUBSCRIBED_TABLES" ]; then + echo "βœ… Subscribed tables found" +else + echo "⚠️ Warning: No subscribed tables found in publications" +fi + +# Check ownership of realtime.subscription +echo "Ownership of realtime.subscription:" +SUBSCRIPTION_OWNER=$(psql "${TENANT_PG_CONN}" -t -c "SELECT tableowner FROM pg_tables WHERE schemaname = 'realtime' AND tablename = 'subscription';" 2>/dev/null | tr -d '[:space:]') +echo "Owner: ${SUBSCRIPTION_OWNER:-unknown}" +if [ "$SUBSCRIPTION_OWNER" = "supabase_admin" ]; then + echo "βœ… realtime.subscription owned by supabase_admin" +else + echo "⚠️ Warning: realtime.subscription owned by ${SUBSCRIPTION_OWNER:-unknown}, expected supabase_admin" +fi + +# Check permissions on realtime.subscription +echo "Permissions on realtime.subscription:" +SUBSCRIPTION_PERMS=$(psql "${TENANT_PG_CONN}" -t -c "\dp realtime.subscription" 2>/dev/null) +echo "$SUBSCRIPTION_PERMS" +if echo "$SUBSCRIPTION_PERMS" | grep -q "supabase_admin"; then + echo "βœ… Permissions include supabase_admin" +else + echo "⚠️ Warning: supabase_admin lacks permissions on realtime.subscription" +fi + +# List current subscriptions +echo "Current subscriptions in realtime.subscription:" +CURRENT_SUBS=$(psql "${TENANT_PG_CONN}" -t -c "SELECT subscription_id, entity FROM realtime.subscription;" 2>/dev/null) +echo "$CURRENT_SUBS" +if [ -n "$CURRENT_SUBS" ]; then + echo "βœ… Active subscriptions found" +else + echo "⚠️ Warning: No active subscriptions found" +fi + +# List replication slots +echo "Replication slots in database (${TENANT_DB}):" +REPLICATION_SLOTS=$(psql "${TENANT_PG_CONN}" -t -c "SELECT slot_name, plugin, slot_type, database, active, confirmed_flush_lsn FROM pg_replication_slots;" 2>/dev/null) +echo "$REPLICATION_SLOTS" +if [ -n "$REPLICATION_SLOTS" ]; then + echo "βœ… Replication slots found" + # Check for expected slot + if echo "$REPLICATION_SLOTS" | grep -q "supabase_realtime_rls"; then + SLOT_ACTIVE=$(echo "$REPLICATION_SLOTS" | grep "supabase_realtime_rls" | awk '{print $5}') + if [ "$SLOT_ACTIVE" = "t" ]; then + echo "βœ… supabase_realtime_rls slot is active" + else + echo "⚠️ Warning: supabase_realtime_rls slot exists but is inactive" + fi + else + echo "⚠️ Warning: supabase_realtime_rls slot not found" + fi +else + echo "⚠️ Warning: No replication slots found" +fi + +# Test publication status +echo "Checking publication status in database (${TENANT_DB}):" +PUBLICATIONS=$(psql "${TENANT_PG_CONN}" -t -c "SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete, pubtruncate FROM pg_publication;" 2>/dev/null) +echo "$PUBLICATIONS" +if echo "$PUBLICATIONS" | grep -q "mailopoly_publication"; then + echo "βœ… mailopoly_publication found" +else + echo "⚠️ Warning: mailopoly_publication not found" +fi + +# Test search_path (control db) +echo "Testing search_path setting (control db)..." +SEARCH_PATH_RESULT=$(psql "${PG_CONN}" -t -c "SET search_path TO _realtime; SELECT current_schema();" 2>/dev/null) +if echo "$SEARCH_PATH_RESULT" | grep -q "_realtime"; then + echo "βœ… Search path set to _realtime successfully (control db)" +else + echo "⚠️ Warning: Failed to set search_path to _realtime. Current schema: $(echo "$SEARCH_PATH_RESULT" | tail -n 1)" +fi + +# List available schemas (control db) +echo "Available schemas (control db):" +SCHEMA_LIST=$(psql "${PG_CONN}" -t -c "\dn" 2>/dev/null) +echo "$SCHEMA_LIST" +if echo "$SCHEMA_LIST" | grep -q "_realtime"; then + echo "βœ… _realtime schema found (control db)" +else + echo "❌ Warning: _realtime schema not found in the list (control db)" fi +# Check _realtime schema ownership (control db) +if psql "${PG_CONN}" -t -c "SELECT 1 FROM information_schema.schemata WHERE schema_name = '_realtime';" | grep -q 1; then + echo "βœ… _realtime schema exists (control db)" + OWNER=$(psql "${PG_CONN}" -t -c "SELECT nspowner::regrole FROM pg_namespace WHERE nspname = '_realtime';" | tr -d '[:space:]') + if [ "$OWNER" = "supabase_admin" ]; then + echo "βœ… _realtime schema owned by supabase_admin (control db)" + else + echo "⚠️ Warning: _realtime schema owned by $OWNER, expected supabase_admin (control db)" + fi +else + echo "❌ Warning: _realtime schema does not exist (control db)" +fi -if [ ! -z "$RLIMIT_NOFILE" ]; then +# Check realtime schema (control db) +if psql "${PG_CONN}" -t -c "SELECT 1 FROM information_schema.schemata WHERE schema_name = 'realtime';" | grep -q 1; then + echo "βœ… realtime schema exists (control db)" +else + echo "⚠️ Warning: realtime schema does not exist (control db)" +fi + +# Set RLIMIT_NOFILE +if [ -n "$RLIMIT_NOFILE" ]; then echo "Setting RLIMIT_NOFILE to ${RLIMIT_NOFILE}" - ulimit -Sn "$RLIMIT_NOFILE" + if ! ulimit -Sn "$RLIMIT_NOFILE" 2>/dev/null; then + echo "❌ Error: Failed to set RLIMIT_NOFILE to ${RLIMIT_NOFILE}" + exit 1 + fi + echo "βœ… RLIMIT_NOFILE set to ${RLIMIT_NOFILE}" fi +# Crash dump handling export ERL_CRASH_DUMP=/tmp/erl_crash.dump upload_crash_dump_to_s3() { @@ -56,6 +243,11 @@ upload_crash_dump_to_s3() { s3Host=$ERL_CRASH_DUMP_S3_HOST s3Port=$ERL_CRASH_DUMP_S3_PORT + if [ -z "$bucket" ]; then + echo "⚠️ Warning: ERL_CRASH_DUMP_S3_BUCKET not set, skipping crash dump upload" + return + fi + if [ "${AWS_CONTAINER_CREDENTIALS_RELATIVE_URI-}" ]; then response=$(curl -s http://169.254.170.2$AWS_CONTAINER_CREDENTIALS_RELATIVE_URI) s3Key=$(echo "$response" | grep -o '"AccessKeyId": *"[^"]*"' | grep -o '"[^"]*"$' | tr -d '"') @@ -65,18 +257,19 @@ upload_crash_dump_to_s3() { s3Secret=$ERL_CRASH_DUMP_S3_SECRET fi - filePath=${ERL_CRASH_DUMP_FOLDER:-tmp}/$(date +%s)_${ERL_CRASH_DUMP_FILE_NAME:-erl_crash.dump} + if [ -z "$s3Key" ] || [ -z "$s3Secret" ]; then + echo "❌ Error: S3 credentials not found" + return + fi + filePath=${ERL_CRASH_DUMP_FOLDER:-tmp}/$(date +%s)_${ERL_CRASH_DUMP_FILE_NAME:-erl_crash.dump} if [ -f "${ERL_CRASH_DUMP_FOLDER:-tmp}/${ERL_CRASH_DUMP_FILE_NAME:-erl_crash.dump}" ]; then - mv ${ERL_CRASH_DUMP_FOLDER:-tmp}/${ERL_CRASH_DUMP_FILE_NAME:-erl_crash.dump} $filePath - + mv "${ERL_CRASH_DUMP_FOLDER:-tmp}/${ERL_CRASH_DUMP_FILE_NAME:-erl_crash.dump}" "$filePath" resource="/${bucket}/realtime/crash_dumps${filePath}" - contentType="application/octet-stream" dateValue=$(date -R) stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}" - - signature=$(echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64) + signature=$(echo -en "${stringToSign}" | openssl sha1 -hmac "${s3Secret}" -binary | base64) if [ "${ERL_CRASH_DUMP_S3_SSL:-}" = true ]; then protocol="https" @@ -84,43 +277,65 @@ upload_crash_dump_to_s3() { protocol="http" fi - curl -v -X PUT -T "${filePath}" \ + if ! curl -v -X PUT -T "${filePath}" \ -H "Host: ${s3Host}" \ -H "Date: ${dateValue}" \ -H "Content-Type: ${contentType}" \ -H "Authorization: AWS ${s3Key}:${signature}" \ - ${protocol}://${s3Host}:${s3Port}${resource} + "${protocol}://${s3Host}:${s3Port}${resource}" > /dev/null 2>&1; then + echo "❌ Error: Failed to upload crash dump to S3" + else + echo "βœ… Crash dump uploaded to S3 at ${resource}" + fi fi - exit "$EXIT_CODE" } + if [ "${ENABLE_ERL_CRASH_DUMP:-false}" = true ]; then trap upload_crash_dump_to_s3 INT TERM KILL EXIT fi - +# Run migrations echo "Running migrations..." -if ! sudo -E -u nobody /app/bin/migrate; then - echo "❌ Migration failed, exiting" - # exit 1 +if ! sudo -E -u nobody /app/bin/migrate --log-level debug; then + echo "❌ Migration failed" + MIGRATION_STATUS=$(psql "${PG_CONN}" -t -c "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'realtime' AND table_name = 'schema_migrations';" 2>/dev/null) + if [ "$MIGRATION_STATUS" -eq 0 ]; then + echo "❌ Warning: schema_migrations table not found, migrations may not have started" + else + echo "⚠️ Warning: Migration failed but schema_migrations table exists, check for partial application" + fi + psql "${PG_CONN}" -c "\dt realtime.*" +else + echo "βœ… Migrations completed" fi -echo "βœ… Migrations completed" -if [ "${SEED_SELF_HOST-}" = true ]; then - echo "Seeding selfhosted Realtime..." - echo "Checking database connection..." - if ! sudo -E -u nobody /app/bin/realtime eval 'IO.inspect(Realtime.Repo.query("SELECT 1"))'; then - echo "❌ Database connection failed" - # exit 1 +# Verify key tables +echo "Verifying key tables..." +for table in tenants schema_migrations; do + if psql "${PG_CONN}" -t -c "SELECT 1 FROM information_schema.tables WHERE table_schema = 'realtime' AND table_name = '$table' LIMIT 1;" | grep -q 1; then + echo "βœ… $table table exists in realtime schema" + else + echo "❌ Warning: $table table not found in realtime schema" fi - echo "Running seeding..." - if ! sudo -E -u nobody /app/bin/realtime eval 'Realtime.Release.seeds(Realtime.Repo)'; then - echo "❌ Seeding failed" - # exit 1 +done + +# Seed if enabled +if [ "${SEED_SELF_HOST-}" = true ]; then + echo "Seeding self-hosted Realtime..." + if ! sudo -E -u nobody /app/bin/realtime eval 'IO.inspect(Realtime.Repo.query("SELECT 1"))' > /dev/null 2>&1; then + echo "❌ Database connection failed for seeding" + else + echo "βœ… Database connection for seeding successful" + if ! sudo -E -u nobody /app/bin/realtime eval 'Realtime.Release.seeds(Realtime.Repo)' > /dev/null 2>&1; then + echo "❌ Seeding failed" + else + echo "βœ… Seeding completed" + fi fi - echo "βœ… Seeding completed" fi -echo "Starting Realtime" +echo "Starting Realtime with external_id: ${EXTERNAL_ID}" ulimit -n -exec "$@" \ No newline at end of file +sleep 15 # Delay for logs +exec "$@" From d7396bb099a14759a008f0ee6ce729efa4c5dc15 Mon Sep 17 00:00:00 2001 From: "John D. Pope" Date: Mon, 24 Mar 2025 12:54:26 +1100 Subject: [PATCH 3/3] message of the day --- supabase_motd.sh | 142 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 supabase_motd.sh diff --git a/supabase_motd.sh b/supabase_motd.sh new file mode 100644 index 000000000..86dc88bb4 --- /dev/null +++ b/supabase_motd.sh @@ -0,0 +1,142 @@ +#!/bin/bash + +# Colors +GREEN='\033[0;32m' +RED='\033[0;31m' +YELLOW='\033[0;33m' +BLUE='\033[0;34m' +NC='\033[0m' # No Color + +# Database connection parameters - use environment variables +DB_HOST=${DB_HOST:-localhost} +DB_PORT=${DB_PORT:-5432} +DB_NAME=${DB_NAME:-postgres} +DB_USER=${DB_USER:-supabase_admin} +DB_PASSWORD=${DB_PASSWORD:-""} + +# Function to run PostgreSQL queries and handle errors +run_query() { + local query="$1" + local result + + if [ -z "$DB_PASSWORD" ]; then + result=$(psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -t -c "$query" 2>/dev/null) + else + result=$(PGPASSWORD="$DB_PASSWORD" psql -h "$DB_HOST" -p "$DB_PORT" -U "$DB_USER" -d "$DB_NAME" -t -c "$query" 2>/dev/null) + fi + + if [ $? -ne 0 ]; then + echo "❌ Query failed: $query" + return 1 + fi + + echo "$result" +} + +# Safe function to run commands that might not be available +safe_cmd() { + if command -v "$1" &> /dev/null; then + "$@" + else + echo "N/A (command not found)" + fi +} + +# Get system metrics (safely) +load=$(safe_cmd uptime | awk -F'load average:' '{ print $2 }' | cut -d, -f1 | sed 's/^[ \t]*//' 2>/dev/null || echo "N/A") +mem_info=$(safe_cmd free -h | awk '/^Mem:/ {print $3 "/" $2}' 2>/dev/null || echo "N/A") +disk_usage=$(safe_cmd df -h / | awk 'NR==2 {print $5}' 2>/dev/null || echo "N/A") + +# Container info (safely) +container_id=$(safe_cmd hostname || echo "unknown") +ip_address=$(safe_cmd hostname -I 2>/dev/null || echo "N/A") + +# Check PostgreSQL connection +pg_status="⚠️ Not checked (no credentials)" +if [ -n "$DB_PASSWORD" ]; then + pg_status=$(run_query "SELECT 1" >/dev/null && echo "${GREEN}βœ… Connected${NC}" || echo "${RED}❌ Connection failed${NC}") + + # Only run these queries if DB connection succeeded + if run_query "SELECT 1" >/dev/null; then + # Get replication slot info + replication_slot=$(run_query "SELECT slot_name, plugin, slot_type, database, + CASE WHEN active THEN '${GREEN}βœ… Active${NC}' ELSE '${RED}❌ Inactive${NC}' END as active, + active_pid, confirmed_flush_lsn + FROM pg_replication_slots + WHERE slot_name LIKE '%realtime%';") + + # Check WAL retention + wal_retention=$(run_query "SELECT slot_name, + pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) as retained_wal, + CASE + WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) > 1073741824 THEN '${RED}High${NC}' + WHEN pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) > 104857600 THEN '${YELLOW}Medium${NC}' + ELSE '${GREEN}Low${NC}' + END as retention_level + FROM pg_replication_slots + WHERE slot_name LIKE '%realtime%';") + + # Check tenants + tenants=$(run_query "SELECT name, external_id FROM realtime.tenants LIMIT 5;") + fi +fi + +printf '%b\n' "\ ++-----------------------------------------------------+ +| πŸ”„ SUPABASE REALTIME - DIAGNOSTIC DASHBOARD πŸ”„ | ++-----------------------------------------------------+ +| πŸ’» System: +| πŸ”„ Load Average : $load +| 🧠 Memory Usage : $mem_info +| πŸ’Ύ Disk Usage : $disk_usage +| πŸ–₯️ Container ID : $container_id +| 🌐 IP Address : $ip_address ++-----------------------------------------------------+ +| πŸ—„οΈ Database Connection: $pg_status +| 🌐 Host : $DB_HOST:$DB_PORT +| πŸ“Š Database : $DB_NAME +" + +# Only show DB-specific info if we have credentials and connection succeeded +if [ -n "$DB_PASSWORD" ] && run_query "SELECT 1" >/dev/null; then +printf '%b\n' "\ ++-----------------------------------------------------+ +| πŸ‘₯ Tenants: +$(echo "$tenants" | sed 's/^/| /') ++-----------------------------------------------------+ +| πŸ”„ Replication Slot Status: +$(echo "$replication_slot" | sed 's/^/| /') ++-----------------------------------------------------+ +| πŸ“ WAL Retention: +$(echo "$wal_retention" | sed 's/^/| /') +" +fi + +printf '%b\n' "\ ++-----------------------------------------------------+ +| πŸ› οΈ Common Commands: +| psql "postgres://$POSTGRES_USER:$POSTGRES_PASSWORD@$POSTGRES_HOST:$POSTGRES_PORT/postgres" +| πŸ” Check tenant: +| SELECT * FROM realtime.tenants; +| +| πŸ”„ Check replication slot: +| SELECT slot_name, plugin, slot_type, database, active, +| active_pid, confirmed_flush_lsn +| FROM pg_replication_slots +| WHERE slot_name LIKE '%realtime%'; +| +| πŸ”Œ Start replication: +| pg_recvlogical -h \$DB_HOST -p \$DB_PORT \\ +| -U \$DB_USER -d \$DB_NAME \\ +| --slot=supabase_realtime_rls --start \\ +| -o proto_version=1 \\ +| -o publication_names=mailopoly_publication -f - +| +| πŸ“’ Create publication: +| CREATE PUBLICATION mailopoly_publication +| FOR TABLE realtime.subscription; +| +| πŸ”„ Create replication slot: +| SELECT pg_create_logical_replication_slot( +| 'supabase_realtime_rls', 'pgoutput'); ++-----------------------------------------------------+" \ No newline at end of file