diff --git a/config/dev.exs b/config/dev.exs index a438f8ea4..0eff300d8 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -97,6 +97,8 @@ config :phoenix, :plug_init_mode, :runtime # Disable caching to ensure the rendered spec is refreshed config :open_api_spex, :cache_adapter, OpenApiSpex.Plug.NoneCache -config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []} +# Disabled but can print to stdout with: +# config :opentelemetry, traces_exporter: {:otel_exporter_stdout, []} +config :opentelemetry, traces_exporter: :none config :mix_test_watch, clear: true diff --git a/lib/realtime_web/channels/presence.ex b/lib/realtime_web/channels/presence.ex index f4d378b92..9e173febe 100644 --- a/lib/realtime_web/channels/presence.ex +++ b/lib/realtime_web/channels/presence.ex @@ -8,5 +8,6 @@ defmodule RealtimeWeb.Presence do use Phoenix.Presence, otp_app: :realtime, pubsub_server: Realtime.PubSub, + dispatcher: RealtimeWeb.RealtimeChannel.MessageDispatcher, pool_size: 10 end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 91a417c21..104d9a077 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -18,7 +18,6 @@ defmodule RealtimeWeb.RealtimeChannel do alias Realtime.Tenants.Authorization alias Realtime.Tenants.Authorization.Policies alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies - alias Realtime.Tenants.Authorization.Policies.PresencePolicies alias Realtime.Tenants.Connect alias RealtimeWeb.Channels.Payloads.Join @@ -259,27 +258,11 @@ defmodule RealtimeWeb.RealtimeChannel do {:noreply, assign(socket, %{pg_sub_ref: pg_sub_ref})} end - def handle_info( - %{event: "presence_diff"}, - %{assigns: %{policies: %Policies{presence: %PresencePolicies{read: false}}}} = socket - ) do - Logger.warning("Presence message ignored") - {:noreply, socket} - end - def handle_info(_msg, %{assigns: %{policies: %Policies{broadcast: %BroadcastPolicies{read: false}}}} = socket) do Logger.warning("Broadcast message ignored") {:noreply, socket} end - def handle_info(%{event: "presence_diff", payload: payload} = msg, socket) do - %{presence_rate_counter: presence_rate_counter} = socket.assigns - GenCounter.add(presence_rate_counter.id) - maybe_log_info(socket, msg) - push(socket, "presence_diff", payload) - {:noreply, socket} - end - def handle_info(%{event: type, payload: payload} = msg, socket) do count(socket) maybe_log_info(socket, msg) diff --git a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex index 32e1528f3..6604eb2bd 100644 --- a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex +++ b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex @@ -5,14 +5,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do require Logger - def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) - - def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id, replayed_message_ids) do - {:rc_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}, replayed_message_ids} - end - - def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id, replayed_message_ids) do - {:rc_fastlane, fastlane_pid, serializer, topic, replayed_message_ids} + def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) do + {:rc_fastlane, fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids} end @doc """ @@ -20,48 +14,58 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do It also sends an :update_rate_counter to the subscriber and it can conditionally log """ @spec dispatch(list, pid, Phoenix.Socket.Broadcast.t()) :: :ok - def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{} = msg) do + def dispatch(subscribers, from, %Phoenix.Socket.Broadcast{event: event} = msg) do # fastlane_pid is the actual socket transport pid # This reduce caches the serialization and bypasses the channel process going straight to the # transport process message_id = message_id(msg.payload) - # Credo doesn't like that we don't use the result aggregation - _ = - Enum.reduce(subscribers, %{}, fn - {pid, _}, cache when pid == from -> - cache + {_cache, count} = + Enum.reduce(subscribers, {%{}, 0}, fn + {pid, _}, {cache, count} when pid == from -> + {cache, count} - {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, replayed_message_ids}}, cache -> + {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, log_level, tenant_id, replayed_message_ids}}, + {cache, count} -> if already_replayed?(message_id, replayed_message_ids) do # skip already replayed message - cache + {cache, count} else - send(pid, :update_rate_counter) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) - end + if event != "presence_diff", do: send(pid, :update_rate_counter) - {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}, replayed_message_ids}}, cache -> - if already_replayed?(message_id, replayed_message_ids) do - # skip already replayed message - cache - else - send(pid, :update_rate_counter) - log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" - Logger.info(log, external_id: tenant_id, project: tenant_id) + maybe_log(log_level, join_topic, msg, tenant_id) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + cache = do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + {cache, count + 1} end - {pid, _}, cache -> + {pid, _}, {cache, count} -> send(pid, msg) - cache + {cache, count} end) + tenant_id = tenant_id(subscribers) + increment_presence_counter(tenant_id, event, count) + :ok end + defp increment_presence_counter(tenant_id, "presence_diff", count) when is_binary(tenant_id) do + tenant_id + |> Realtime.Tenants.presence_events_per_second_key() + |> Realtime.GenCounter.add(count) + end + + defp increment_presence_counter(_tenant_id, _event, _count), do: :ok + + defp maybe_log(:info, join_topic, msg, tenant_id) do + log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" + Logger.info(log, external_id: tenant_id, project: tenant_id) + end + + defp maybe_log(_level, _join_topic, _msg, _tenant_id), do: :ok + defp message_id(%{"meta" => %{"id" => id}}), do: id defp message_id(_), do: nil @@ -82,4 +86,10 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do Map.put(cache, serializer, encoded_msg) end end + + defp tenant_id([{_pid, {:rc_fastlane, _, _, _, _, tenant_id, _}} | _]) do + tenant_id + end + + defp tenant_id(_), do: nil end diff --git a/mix.exs b/mix.exs index 72ae7f630..d0e42bf11 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.52.1", + version: "2.53.0", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, @@ -53,7 +53,7 @@ defmodule Realtime.MixProject do # Type `mix help deps` for examples and options. defp deps do [ - {:phoenix, "~> 1.7.0"}, + {:phoenix, override: true, github: "supabase/phoenix", branch: "feat/presence-custom-dispatcher-1.7.19"}, {:phoenix_ecto, "~> 4.4.0"}, {:ecto_sql, "~> 3.11"}, {:ecto_psql_extras, "~> 0.8"}, diff --git a/mix.lock b/mix.lock index c5fce6022..ba6f47328 100644 --- a/mix.lock +++ b/mix.lock @@ -66,7 +66,7 @@ "opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"}, "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"}, "otel_http": {:hex, :otel_http, "0.2.0", "b17385986c7f1b862f5d577f72614ecaa29de40392b7618869999326b9a61d8a", [:rebar3], [], "hexpm", "f2beadf922c8cfeb0965488dd736c95cc6ea8b9efce89466b3904d317d7cc717"}, - "phoenix": {:hex, :phoenix, "1.7.19", "36617efe5afbd821099a8b994ff4618a340a5bfb25531a1802c4d4c634017a57", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "ba4dc14458278773f905f8ae6c2ec743d52c3a35b6b353733f64f02dfe096cd6"}, + "phoenix": {:git, "https://github.com/supabase/phoenix.git", "7b884cc0cc1a49ad2bc272acda2e622b3e11c139", [branch: "feat/presence-custom-dispatcher-1.7.19"]}, "phoenix_ecto": {:hex, :phoenix_ecto, "4.4.3", "86e9878f833829c3f66da03d75254c155d91d72a201eb56ae83482328dc7ca93", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "d36c401206f3011fefd63d04e8ef626ec8791975d9d107f9a0817d426f61ac07"}, "phoenix_html": {:hex, :phoenix_html, "3.3.4", "42a09fc443bbc1da37e372a5c8e6755d046f22b9b11343bf885067357da21cb3", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "0249d3abec3714aff3415e7ee3d9786cb325be3151e6c4b3021502c585bf53fb"}, "phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.6", "7b1f0327f54c9eb69845fd09a77accf922f488c549a7e7b8618775eb603a62c7", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "1681ab813ec26ca6915beb3414aa138f298e17721dc6a2bde9e6eb8a62360ff6"}, diff --git a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs index 44ce83b99..53be2e51f 100644 --- a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs +++ b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs @@ -16,12 +16,24 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do describe "fastlane_metadata/5" do test "info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :info, "tenant_id") == - {:rc_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}, MapSet.new()} + {:rc_fastlane, self(), Serializer, "realtime:topic", :info, "tenant_id", MapSet.new()} end test "non-info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :warning, "tenant_id") == - {:rc_fastlane, self(), Serializer, "realtime:topic", MapSet.new()} + {:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new()} + end + + test "replayed message ids" do + assert MessageDispatcher.fastlane_metadata( + self(), + Serializer, + "realtime:topic", + :warning, + "tenant_id", + MapSet.new([1]) + ) == + {:rc_fastlane, self(), Serializer, "realtime:topic", :warning, "tenant_id", MapSet.new([1])} end end @@ -50,8 +62,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do from_pid = :erlang.list_to_pid(~c'<0.2.1>') subscribers = [ - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}}, - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}} + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}} ] msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}} @@ -74,6 +86,48 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do refute_receive _any end + test "dispatches 'presence_diff' messages to fastlane subscribers" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + + subscribers = [ + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant456", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant456", MapSet.new()}} + ] + + msg = %Broadcast{topic: "some:other:topic", event: "presence_diff", payload: %{data: "test"}} + + log = + capture_log(fn -> + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + end) + + assert log =~ "Received message on realtime:topic with payload: #{inspect(msg, pretty: true)}" + + assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}} + assert_receive {:encoded, %Broadcast{event: "presence_diff", payload: %{data: "test"}, topic: "realtime:topic"}} + + assert Agent.get(TestSerializer, & &1) == 1 + + assert Realtime.GenCounter.get(Realtime.Tenants.presence_events_per_second_key("tenant456")) == 2 + + refute_receive _any + end + test "does not dispatch messages to fastlane subscribers if they already replayed it" do parent = self() @@ -95,8 +149,9 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do subscribers = [ {subscriber_pid, - {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, replaeyd_message_ids}}, - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", replaeyd_message_ids}} + {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", replaeyd_message_ids}}, + {subscriber_pid, + {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", replaeyd_message_ids}} ] msg = %Broadcast{ @@ -131,8 +186,8 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do from_pid = :erlang.list_to_pid(~c'<0.2.1>') subscribers = [ - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}}, - {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}} + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :info, "tenant123", MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", :warning, "tenant123", MapSet.new()}} ] msg = %Broadcast{topic: "some:other:topic", event: "event", payload: "not a map"} diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index ae6c1734a..8022d6ebd 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -239,23 +239,14 @@ defmodule RealtimeWeb.RealtimeChannelTest do end describe "presence" do - test "events are counted", %{tenant: tenant} do + test "presence state event is counted", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant) {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) assert {:ok, _, %Socket{} = socket} = subscribe_and_join(socket, "realtime:test", %{}) - presence_diff = %Socket.Broadcast{event: "presence_diff", payload: %{joins: %{}, leaves: %{}}} - send(socket.channel_pid, presence_diff) - assert_receive %Socket.Message{topic: "realtime:test", event: "presence_state", payload: %{}} - assert_receive %Socket.Message{ - topic: "realtime:test", - event: "presence_diff", - payload: %{joins: %{}, leaves: %{}} - } - tenant_id = tenant.external_id # Wait for RateCounter to tick @@ -264,8 +255,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do assert {:ok, %RateCounter{id: {:channel, :presence_events, ^tenant_id}, bucket: bucket}} = RateCounter.get(socket.assigns.presence_rate_counter) - # presence_state + presence_diff - assert 2 in bucket + # presence_state + assert Enum.sum(bucket) == 1 end end