Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions lib/realtime/api/feature_flag.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Realtime.Api.FeatureFlag do
@moduledoc """
Ecto schema for a global feature flag.

Flags have a name (unique) and a boolean enabled state. Per-tenant overrides
are stored separately on the `Realtime.Api.Tenant` schema as a JSONB map,
not as associations on this record.
"""

use Ecto.Schema
import Ecto.Changeset

@type t :: %__MODULE__{}

@primary_key {:id, :binary_id, autogenerate: true}
@foreign_key_type :binary_id
schema "feature_flags" do
field :name, :string
field :enabled, :boolean, default: false
timestamps()
end

def changeset(flag, attrs) do
flag
|> cast(attrs, [:name, :enabled])
|> validate_required([:name])
|> unique_constraint(:name)
end
end
4 changes: 3 additions & 1 deletion lib/realtime/api/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ defmodule Realtime.Api.Tenant do
field(:max_client_presence_events_per_window, :integer)
field(:client_presence_window_ms, :integer)
field(:presence_enabled, :boolean, default: false)
field(:feature_flags, :map, default: %{})

has_many(:extensions, Realtime.Api.Extensions,
foreign_key: :tenant_external_id,
Expand Down Expand Up @@ -82,7 +83,8 @@ defmodule Realtime.Api.Tenant do
:broadcast_adapter,
:max_client_presence_events_per_window,
:client_presence_window_ms,
:presence_enabled
:presence_enabled,
:feature_flags
])
|> validate_required([:external_id])
|> check_constraint(:jwt_secret,
Expand Down
1 change: 1 addition & 0 deletions lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ defmodule Realtime.Application do
id: Realtime.LogThrottle
),
Realtime.Tenants.Cache,
Realtime.FeatureFlags.Cache,
Realtime.RateCounter.DynamicSupervisor,
Realtime.Latency,
{Registry, keys: :duplicate, name: Realtime.Registry},
Expand Down
73 changes: 73 additions & 0 deletions lib/realtime/feature_flags.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
defmodule Realtime.FeatureFlags do
@moduledoc """
Manages feature flags with optional per-tenant overrides.

Each flag has a global enabled/disabled state. Tenants can override that state
via a JSONB map stored on the tenant record.

Use `enabled?/1` to check the global flag value only.
Use `enabled?/2` when the flag supports per-tenant overrides. Resolution order:
1. Tenant-specific override (if present)
2. Global flag value
3. `false` when the flag does not exist
"""

import Ecto.Query
alias Realtime.Api
alias Realtime.FeatureFlags.Cache
alias Realtime.Repo
alias Realtime.Api.FeatureFlag
alias Realtime.Tenants.Cache, as: TenantsCache

@spec list_flags() :: [FeatureFlag.t()]
def list_flags, do: Repo.all(from f in FeatureFlag, order_by: [asc: f.name])

@spec get_flag(String.t()) :: FeatureFlag.t() | nil
def get_flag(name) when is_binary(name), do: Repo.get_by(FeatureFlag, name: name)

@spec upsert_flag(map()) :: {:ok, FeatureFlag.t()} | {:error, Ecto.Changeset.t()}
def upsert_flag(attrs) do
%FeatureFlag{}
|> FeatureFlag.changeset(attrs)
|> Repo.insert(on_conflict: {:replace, [:enabled, :updated_at]}, conflict_target: :name, returning: true)
end

@spec delete_flag(FeatureFlag.t()) :: {:ok, FeatureFlag.t()} | {:error, Ecto.Changeset.t()}
def delete_flag(%FeatureFlag{} = flag), do: Repo.delete(flag)

@spec set_tenant_flag(String.t(), String.t(), boolean()) ::
{:ok, Realtime.Api.Tenant.t()} | {:error, :not_found | Ecto.Changeset.t()}
def set_tenant_flag(flag_name, tenant_id, enabled)
when is_binary(flag_name) and is_binary(tenant_id) and is_boolean(enabled) do
case Api.get_tenant_by_external_id(tenant_id, use_replica?: false) do
nil ->
{:error, :not_found}

tenant ->
updated_flags = Map.put(tenant.feature_flags, flag_name, enabled)
Api.update_tenant_by_external_id(tenant_id, %{feature_flags: updated_flags})
end
end

@spec enabled?(String.t()) :: boolean()
def enabled?(flag_name) when is_binary(flag_name) do
case Cache.get_flag(flag_name) do
nil -> false
%FeatureFlag{enabled: enabled} -> enabled
end
end

@spec enabled?(String.t(), String.t()) :: boolean()
def enabled?(flag_name, tenant_id) when is_binary(flag_name) and is_binary(tenant_id) do
case Cache.get_flag(flag_name) do
nil ->
false

%FeatureFlag{enabled: global_enabled} ->
case TenantsCache.get_tenant_by_external_id(tenant_id) do
nil -> global_enabled
%{feature_flags: flags} -> Map.get(flags, flag_name, global_enabled)
end
end
end
end
60 changes: 60 additions & 0 deletions lib/realtime/feature_flags/cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Realtime.FeatureFlags.Cache do
@moduledoc """
In-process Cachex cache for `Realtime.Api.FeatureFlag` records.

Cache misses fall through to the database automatically via `Cachex.fetch/3`.
Nil results (flag not found) are intentionally not cached so that newly
created flags become visible without requiring an explicit invalidation.

Use `global_update_cache/1` after mutations to push the updated struct to all
cluster nodes. Use `global_invalidate_cache/1` after deletes.
"""

require Cachex.Spec
alias Realtime.Api.FeatureFlag
alias Realtime.GenRpc
alias Realtime.FeatureFlags

def child_spec(_) do
tenant_cache_expiration = Application.get_env(:realtime, :tenant_cache_expiration)

%{
id: __MODULE__,
start: {Cachex, :start_link, [__MODULE__, [expiration: Cachex.Spec.expiration(default: tenant_cache_expiration)]]}
}
end

@spec get_flag(String.t()) :: FeatureFlag.t() | nil
def get_flag(name) do
Comment thread
filipecabaco marked this conversation as resolved.
with {_, value} <-
Cachex.fetch(__MODULE__, cache_key(name), fn _key ->
with %FeatureFlag{} = flag <- FeatureFlags.get_flag(name),
do: {:commit, flag},
else: (_ -> {:ignore, nil})
end) do
value
end
end

@spec update_cache(FeatureFlag.t()) :: {:ok, boolean()} | {:error, boolean()}
def update_cache(%FeatureFlag{} = flag) do
Comment thread
filipecabaco marked this conversation as resolved.
Cachex.put(__MODULE__, cache_key(flag.name), flag)
end

@spec invalidate_cache(String.t()) :: {:ok, boolean()} | {:error, boolean()}
def invalidate_cache(name) when is_binary(name) do
Comment thread
filipecabaco marked this conversation as resolved.
Cachex.del(__MODULE__, cache_key(name))
end

@spec global_update_cache(FeatureFlag.t()) :: :ok
def global_update_cache(%FeatureFlag{} = flag) do
GenRpc.multicast(__MODULE__, :update_cache, [flag])
end

@spec global_invalidate_cache(FeatureFlag.t()) :: :ok
def global_invalidate_cache(%FeatureFlag{} = flag) do
GenRpc.multicast(__MODULE__, :invalidate_cache, [flag.name])
end

defp cache_key(name), do: {:get_flag, name}
end
Loading
Loading