Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime
# Disable caching to ensure the rendered spec is refreshed
config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache

config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
# Disabled but can print to stdout with:
# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []}
config :opentelemetry, traces_exporter: :none

config :mix_test_watch, clear: true
1 change: 1 addition & 0 deletions lib/realtime_web/channels/presence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ defmodule RealtimeWeb.Presence do
use Phoenix.Presence,
otp_app: :realtime,
pubsub_server: Realtime.PubSub,
dispatcher: RealtimeWeb.RealtimeChannel.MessageDispatcher,
pool_size: 10
end
17 changes: 0 additions & 17 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule RealtimeWeb.RealtimeChannel do
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
alias Realtime.Tenants.Connect

alias RealtimeWeb.Channels.Payloads.Join
Expand Down Expand Up @@ -259,27 +258,11 @@ defmodule RealtimeWeb.RealtimeChannel do
{:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})}
end

def handle_info(
%{event: "presence_diff"},
%{assigns: %{policies: %Policies{presence: %PresencePolicies{read: false}}}} = socket
) do
Logger.warning("Presence message ignored")
{:noreply, socket}
end

def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do
Logger.warning("Broadcast message ignored")
{:noreply, socket}
end

def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do
%{presence_rate_counter: presence_rate_counter} = socket.assigns
GenCounter.add(presence_rate_counter.id)
maybe_log_info(socket, msg)
push(socket, "presence_diff", payload)
{:noreply, socket}
end

def handle_info(%{event: type, payload: payload} = msg, socket) do
count(socket)
maybe_log_info(socket, msg)
Expand Down
70 changes: 40 additions & 30 deletions lib/realtime_web/channels/realtime_channel/message_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,63 +5,67 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do

require Logger

def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new())

def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id, replayed_message_ids) do
{:rc_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}, replayed_message_ids}
end

def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id, replayed_message_ids) do
{:rc_fastlane, fastlane_pid, serializer, topic, replayed_message_ids}
def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do
{:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids}
end

@doc """
This dispatch function caches encoded messages if fastlane is used
It also sends an :update_rate_counter to the subscriber and it can conditionally log
"""
@spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok
def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{} = msg) do
def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do
# fastlane_pid is the actual socket transport pid
# This reduce caches the serialization and bypasses the channel process going straight to the
# transport process

message_id = message_id(msg.payload)

# Credo doesn't like that we don't use the result aggregation
_ =
Enum.reduce(subscribers, %{}, fn
{pid, _}, cache when pid == from ->
cache
{_cache, count} =
Enum.reduce(subscribers, {%{}, 0}, fn
{pid, _}, {cache, count} when pid == from ->
{cache, count}

{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, replayed_message_ids}}, cache ->
{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}},
{cache, count} ->
if already_replayed?(message_id, replayed_message_ids) do
# skip already replayed message
cache
{cache, count}
else
send(pid, :update_rate_counter)
do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
end
if event != "presence_diff", do: send(pid, :update_rate_counter)

{pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}, replayed_message_ids}}, cache ->
if already_replayed?(message_id, replayed_message_ids) do
# skip already replayed message
cache
else
send(pid, :update_rate_counter)
log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
Logger.info(log, external_id: tenant_id, project: tenant_id)
maybe_log(log_level, join_topic, msg, tenant_id)

do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache)
{cache, count + 1}
end

{pid, _}, cache ->
{pid, _}, {cache, count} ->
send(pid, msg)
cache
{cache, count}
end)

tenant_id = tenant_id(subscribers)
increment_presence_counter(tenant_id, event, count)

:ok
end

defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do
tenant_id
|> Realtime.Tenants.presence_events_per_second_key()
|> Realtime.GenCounter.add(count)
end

defp increment_presence_counter(_tenant_id, _event, _count), do: :ok

defp maybe_log(:info, join_topic, msg, tenant_id) do
log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}"
Logger.info(log, external_id: tenant_id, project: tenant_id)
end

defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok

defp message_id(%{"meta" => %{"id" => id}}), do: id
defp message_id(_), do: nil

Expand All @@ -82,4 +86,10 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do
Map.put(cache, serializer, encoded_msg)
end
end

defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do
tenant_id
end

defp tenant_id(_), do: nil
end
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.52.1",
version: "2.53.0",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down Expand Up @@ -53,7 +53,7 @@ defmodule Realtime.MixProject do
# Type `mix help deps` for examples and options.
defp deps do
[
{:phoenix, "~> 1.7.0"},
{:phoenix, override: true, github: "supabase/phoenix", branch: "feat/presence-custom-dispatcher-1.7.19"},
{:phoenix_ecto, "~> 4.4.0"},
{:ecto_sql, "~> 3.11"},
{:ecto_psql_extras, "~> 0.8"},
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"},
"opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"},
"otel_http": {:hex, :otel_http, "0.2.0", "b17385986c7f1b862f5d577f72614ecaa29de40392b7618869999326b9a61d8a", [:rebar3], [], "hexpm", "f2beadf922c8cfeb0965488dd736c95cc6ea8b9efce89466b3904d317d7cc717"},
"phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"},
"phoenix": {:git, "https://github.com/supabase/phoenix.git", "7b884cc0cc1a49ad2bc272acda2e622b3e11c139", [branch: "feat/presence-custom-dispatcher-1.7.19"]},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"},
"phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,24 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
describe "fastlane_metadata/5" do
test "info level" do
assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :info, "tenant_id") ==
{:rc_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}, MapSet.new()}
{:rc_fastlane, self(), Serializer, "realtime:topic", :info, "tenant_id", MapSet.new()}
end

test "non-info level" do
assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :warning, "tenant_id") ==
{:rc_fastlane, self(), Serializer, "realtime:topic", MapSet.new()}
{:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new()}
end

test "replayed message ids" do
assert MessageDispatcher.fastlane_metadata(
self(),
Serializer,
"realtime:topic",
:warning,
"tenant_id",
MapSet.new([1])
) ==
{:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new([1])}
end
end

Expand Down Expand Up @@ -50,8 +62,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
from_pid = :erlang.list_to_pid(~c'<0.2.1>')

subscribers = [
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}},
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}}
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}},
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}}
]

msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}}
Expand All @@ -74,6 +86,48 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
refute_receive _any
end

test "dispatches 'presence_diff' messages to fastlane subscribers" do
parent = self()

subscriber_pid =
spawn(fn ->
loop = fn loop ->
receive do
msg ->
send(parent, {:subscriber, msg})
loop.(loop)
end
end

loop.(loop)
end)

from_pid = :erlang.list_to_pid(~c'<0.2.1>')

subscribers = [
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant456", MapSet.new()}},
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant456", MapSet.new()}}
]

msg = %Broadcast{topic: "some:other:topic", event: "presence_diff", payload: %{data: "test"}}

log =
capture_log(fn ->
assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok
end)

assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}"

assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}}
assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}}

assert Agent.get(TestSerializer, & &1) == 1

assert Realtime.GenCounter.get(Realtime.Tenants.presence_events_per_second_key("tenant456")) == 2

refute_receive _any
end

test "does not dispatch messages to fastlane subscribers if they already replayed it" do
parent = self()

Expand All @@ -95,8 +149,9 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do

subscribers = [
{subscriber_pid,
{:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, replaeyd_message_ids}},
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", replaeyd_message_ids}}
{:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", replaeyd_message_ids}},
{subscriber_pid,
{:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", replaeyd_message_ids}}
]

msg = %Broadcast{
Expand Down Expand Up @@ -131,8 +186,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do
from_pid = :erlang.list_to_pid(~c'<0.2.1>')

subscribers = [
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}},
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}}
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}},
{subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}}
]

msg = %Broadcast{topic: "some:other:topic", event: "event", payload: "not a map"}
Expand Down
15 changes: 3 additions & 12 deletions test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -239,23 +239,14 @@ defmodule RealtimeWeb.RealtimeChannelTest do
end

describe "presence" do
test "events are counted", %{tenant: tenant} do
test "presence state event is counted", %{tenant: tenant} do
jwt = Generators.generate_jwt_token(tenant)
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt))

assert {:ok, _, %Socket{} = socket} = subscribe_and_join(socket, "realtime:test", %{})

presence_diff = %Socket.Broadcast{event: "presence_diff", payload: %{joins: %{}, leaves: %{}}}
send(socket.channel_pid, presence_diff)

assert_receive %Socket.Message{topic: "realtime:test", event: "presence_state", payload: %{}}

assert_receive %Socket.Message{
topic: "realtime:test",
event: "presence_diff",
payload: %{joins: %{}, leaves: %{}}
}

tenant_id = tenant.external_id

# Wait for RateCounter to tick
Expand All @@ -264,8 +255,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
assert {:ok, %RateCounter{id: {:channel, :presence_events, ^tenant_id}, bucket: bucket}} =
RateCounter.get(socket.assigns.presence_rate_counter)

# presence_state + presence_diff
assert 2 in bucket
# presence_state
assert Enum.sum(bucket) == 1
end
end

Expand Down
Loading