diff --git a/CHANGELOG.md b/CHANGELOG.md index 5394bfd6..629e788b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,11 @@ This new version of Phoenix.PubSub provides a simpler, more extensible, and more performant Phoenix.PubSub API. For users of Phoenix.PubSub, the API is the same, although frameworks and other adapters will have to migrate accordingly (which often means less code). -## 2.1.4 (2024-09-27) +## 2.1.4 (unreleased) ### Enhancements - Add `:permdown_on_shutdown` option. + - Add `Phoenix.PubSub.subscribe_once/3`. ## 2.1.3 (2023-06-14) diff --git a/lib/phoenix/pubsub.ex b/lib/phoenix/pubsub.ex index 9f4430a0..7319bcf9 100644 --- a/lib/phoenix/pubsub.ex +++ b/lib/phoenix/pubsub.ex @@ -191,6 +191,8 @@ defmodule Phoenix.PubSub do `Phoenix.PubSub.unsubscribe/2`, all duplicate subscriptions will be dropped. + See `subscribe_once/3` to avoid duplicate subscriptions. + ## Options * `:metadata` - provides metadata to be attached to this @@ -208,6 +210,22 @@ defmodule Phoenix.PubSub do end end + @doc """ + Subscribes the caller to the PubSub adapter's topic like `subscribe/3` unless + the calling process is already subscribed, in which case it returns `{:error, + :already_subscribed}`. + """ + @spec subscribe_once(t, topic, keyword) :: :ok | {:error, term} + def subscribe_once(pubsub, topic, opts \\ []) + when is_atom(pubsub) and is_binary(topic) and is_list(opts) do + subscriptions = Registry.lookup(pubsub, topic) + if Enum.any?(subscriptions, fn {pid, _} -> pid == self() end) do + {:error, :already_subscribed} + else + subscribe(pubsub, topic, opts) + end + end + @doc """ Unsubscribes the caller from the PubSub adapter's topic. """ diff --git a/test/shared/pubsub_test.exs b/test/shared/pubsub_test.exs index fe1c0091..18d85855 100644 --- a/test/shared/pubsub_test.exs +++ b/test/shared/pubsub_test.exs @@ -175,6 +175,27 @@ defmodule Phoenix.PubSubTest do assert_receive {:custom, nil, :none, :direct} assert_receive {:custom, :special, :none, :direct} end + + @tag pool_size: size + test "pool #{size}: subscribe_once/2 prevents duplicate subscriptions", + config do + # Subscribe twice + PubSub.subscribe(config.pubsub, config.topic) + PubSub.subscribe(config.pubsub, config.topic) + :ok = PubSub.broadcast(config.pubsub, config.topic, :ping) + {:messages, messages} = Process.info(self(), :messages) + # duplicate message + assert messages == [:ping, :ping] + PubSub.unsubscribe(config.pubsub, config.topic) + + # Duplicate calls do nothing + :ok = PubSub.subscribe_once(config.pubsub, config.topic) + {:error, :already_subscribed} = PubSub.subscribe_once(config.pubsub, config.topic) + :ok = PubSub.broadcast(config.pubsub, config.topic, :pong) + {:messages, messages} = Process.info(self(), :messages) + # no duplicate message + assert messages == [:ping, :ping, :pong] + end end @tag pool_size: 4