@@ -187,15 +187,15 @@ defmodule Registry do
187187 Note that the registry uses one ETS table plus two ETS tables per partition.
188188 """
189189
190- @ keys [ :unique , :duplicate ]
190+ @ keys [ :unique , :duplicate , { :duplicate , :key } , { :duplicate , :pid } ]
191191 @ all_info - 1
192192 @ key_info - 2
193193
194194 @ typedoc "The registry identifier"
195195 @ type registry :: atom
196196
197197 @ typedoc "The type of the registry"
198- @ type keys :: :unique | :duplicate
198+ @ type keys :: :unique | :duplicate | { :duplicate , :key } | { :duplicate , :pid }
199199
200200 @ typedoc "The type of keys allowed on registration"
201201 @ type key :: term
@@ -266,8 +266,8 @@ defmodule Registry do
266266 :undefined
267267 end
268268
269- { kind , _ , _ } ->
270- raise ArgumentError , ":via is not supported for #{ kind } registries"
269+ { { :duplicate , _ } , _ , _ } ->
270+ raise ArgumentError , ":via is not supported for duplicate registries"
271271 end
272272 end
273273
@@ -329,11 +329,24 @@ defmodule Registry do
329329 {Registry, keys: :unique, name: MyApp.Registry, partitions: System.schedulers_online()}
330330 ], strategy: :one_for_one)
331331
332+ For `:duplicate` registries with many different keys (e.g., many topics with
333+ few subscribers each), you can optimize key-based lookups by partitioning by key:
334+
335+ Registry.start_link(
336+ keys: {:duplicate, :key},
337+ name: MyApp.TopicRegistry,
338+ partitions: System.schedulers_online()
339+ )
340+
341+ This allows key-based lookups to check only a single partition instead of
342+ searching all partitions. Use the default `:pid` partitioning when you have
343+ fewer keys with many entries each (e.g., one topic with many subscribers).
344+
332345 ## Options
333346
334347 The registry requires the following keys:
335348
336- * `:keys` - chooses if keys are `:unique` or `:duplicate`
349+ * `:keys` - chooses if keys are `:unique`, `:duplicate`, `{:duplicate, :key}`, or `{ :duplicate, :pid} `
337350 * `:name` - the name of the registry and its tables
338351
339352 The following keys are optional:
@@ -345,16 +358,40 @@ defmodule Registry do
345358 crashes. Messages sent to listeners are of type `t:listener_message/0`.
346359 * `:meta` - a keyword list of metadata to be attached to the registry.
347360
361+ For `:duplicate` registries, you can specify the partitioning strategy
362+ directly in the `:keys` option:
363+
364+ * `:duplicate` or `{:duplicate, :pid}` - Use `:pid` partitioning (default)
365+ when you have keys with many entries (e.g., one topic with many subscribers).
366+ This is the traditional behavior and groups all entries from the same process together.
367+
368+ * `{:duplicate, :key}` - Use `:key` partitioning when entries are spread across
369+ many different keys (e.g., many topics with few subscribers each). This makes
370+ key-based lookups more efficient as they only need to check a single partition
371+ instead of all partitions.
372+
348373 """
349374 @ doc since: "1.5.0"
350375 @ spec start_link ( [ start_option ] ) :: { :ok , pid } | { :error , term }
351376 def start_link ( options ) do
352377 keys = Keyword . get ( options , :keys )
353378
354- if keys not in @ keys do
355- raise ArgumentError ,
356- "expected :keys to be given and be one of :unique or :duplicate, got: #{ inspect ( keys ) } "
357- end
379+ # Validate and normalize keys format
380+ kind =
381+ case keys do
382+ { :duplicate , partition_strategy } when partition_strategy in [ :key , :pid ] ->
383+ { :duplicate , partition_strategy }
384+
385+ :unique ->
386+ :unique
387+
388+ :duplicate ->
389+ { :duplicate , :pid }
390+
391+ _ ->
392+ raise ArgumentError ,
393+ "expected :keys to be given and be one of :unique, :duplicate, {:duplicate, :key}, or {:duplicate, :pid}, got: #{ inspect ( keys ) } "
394+ end
358395
359396 name =
360397 case Keyword . fetch ( options , :name ) do
@@ -397,11 +434,18 @@ defmodule Registry do
397434
398435 # The @info format must be kept in sync with Registry.Partition optimization.
399436 entries = [
400- { @ all_info , { keys , partitions , nil , nil , listeners } } ,
401- { @ key_info , { keys , partitions , nil } } | meta
437+ { @ all_info , { kind , partitions , nil , nil , listeners } } ,
438+ { @ key_info , { kind , partitions , nil } } | meta
402439 ]
403440
404- Registry.Supervisor . start_link ( keys , name , partitions , listeners , entries , compressed )
441+ Registry.Supervisor . start_link (
442+ kind ,
443+ name ,
444+ partitions ,
445+ listeners ,
446+ entries ,
447+ compressed
448+ )
405449 end
406450
407451 @ doc false
@@ -468,7 +512,8 @@ defmodule Registry do
468512 end
469513
470514 { kind , _ , _ } ->
471- raise ArgumentError , "Registry.update_value/3 is not supported for #{ kind } registries"
515+ raise ArgumentError ,
516+ "Registry.update_value/3 is not supported for #{ inspect ( kind ) } registries"
472517 end
473518 end
474519
@@ -508,12 +553,12 @@ defmodule Registry do
508553 |> List . wrap ( )
509554 |> apply_non_empty_to_mfa_or_fun ( mfa_or_fun )
510555
511- { :duplicate , 1 , key_ets } ->
556+ { { :duplicate , _ } , 1 , key_ets } ->
512557 key_ets
513558 |> safe_lookup_second ( key )
514559 |> apply_non_empty_to_mfa_or_fun ( mfa_or_fun )
515560
516- { :duplicate , partitions , _ } ->
561+ { { :duplicate , _ } , partitions , _ } ->
517562 if Keyword . get ( opts , :parallel , false ) do
518563 registry
519564 |> dispatch_parallel ( key , mfa_or_fun , partitions )
@@ -625,10 +670,14 @@ defmodule Registry do
625670 [ ]
626671 end
627672
628- { :duplicate , 1 , key_ets } ->
673+ { { :duplicate , _ } , 1 , key_ets } ->
629674 safe_lookup_second ( key_ets , key )
630675
631- { :duplicate , partitions , _key_ets } ->
676+ { { :duplicate , :key } , partitions , _key_ets } ->
677+ partition = hash ( key , partitions )
678+ safe_lookup_second ( key_ets! ( registry , partition ) , key )
679+
680+ { { :duplicate , :pid } , partitions , _key_ets } ->
632681 for partition <- 0 .. ( partitions - 1 ) ,
633682 pair <- safe_lookup_second ( key_ets! ( registry , partition ) , key ) ,
634683 do: pair
@@ -749,10 +798,10 @@ defmodule Registry do
749798 key_ets = key_ets || key_ets! ( registry , key , partitions )
750799 :ets . select ( key_ets , spec )
751800
752- { :duplicate , 1 , key_ets } ->
801+ { { :duplicate , _ } , 1 , key_ets } ->
753802 :ets . select ( key_ets , spec )
754803
755- { :duplicate , partitions , _key_ets } ->
804+ { { :duplicate , _ } , partitions , _key_ets } ->
756805 for partition <- 0 .. ( partitions - 1 ) ,
757806 pair <- :ets . select ( key_ets! ( registry , partition ) , spec ) ,
758807 do: pair
@@ -795,16 +844,35 @@ defmodule Registry do
795844 @ spec keys ( registry , pid ) :: [ key ]
796845 def keys ( registry , pid ) when is_atom ( registry ) and is_pid ( pid ) do
797846 { kind , partitions , _ , pid_ets , _ } = info! ( registry )
798- { _ , pid_ets } = pid_ets || pid_ets! ( registry , pid , partitions )
799847
800- keys =
801- try do
802- spec = [ { { pid , :"$1" , :"$2" , :_ } , [ ] , [ { { :"$1" , :"$2" } } ] } ]
803- :ets . select ( pid_ets , spec )
804- catch
805- :error , :badarg -> [ ]
848+ pid_etses =
849+ if pid_ets do
850+ { _ , pid_ets } = pid_ets
851+ [ pid_ets ]
852+ else
853+ case kind do
854+ { :duplicate , :key } ->
855+ for partition <- 0 .. ( partitions - 1 ) do
856+ { _ , pid_ets } = pid_ets! ( registry , partition )
857+ pid_ets
858+ end
859+
860+ _ ->
861+ { _ , pid_ets } = pid_ets! ( registry , pid , partitions )
862+ [ pid_ets ]
863+ end
806864 end
807865
866+ keys =
867+ Enum . flat_map ( pid_etses , fn pid_ets ->
868+ try do
869+ spec = [ { { pid , :"$1" , :"$2" , :_ } , [ ] , [ { { :"$1" , :"$2" } } ] } ]
870+ :ets . select ( pid_ets , spec )
871+ catch
872+ :error , :badarg -> [ ]
873+ end
874+ end )
875+
808876 # Handle the possibility of fake keys
809877 keys = gather_keys ( keys , [ ] , false )
810878
@@ -882,8 +950,17 @@ defmodule Registry do
882950 [ ]
883951 end
884952
885- { :duplicate , partitions , key_ets } ->
886- key_ets = key_ets || key_ets! ( registry , pid , partitions )
953+ { { :duplicate , _ } , 1 , key_ets } ->
954+ for { ^ pid , value } <- safe_lookup_second ( key_ets , key ) , do: value
955+
956+ { { :duplicate , :key } , partitions , _key_ets } ->
957+ partition = hash ( key , partitions )
958+ key_ets = key_ets! ( registry , partition )
959+ for { ^ pid , value } <- safe_lookup_second ( key_ets , key ) , do: value
960+
961+ { { :duplicate , :pid } , partitions , _key_ets } ->
962+ partition = hash ( pid , partitions )
963+ key_ets = key_ets! ( registry , partition )
887964 for { ^ pid , value } <- safe_lookup_second ( key_ets , key ) , do: value
888965 end
889966 end
@@ -1121,7 +1198,7 @@ defmodule Registry do
11211198 end
11221199 end
11231200
1124- defp register_key ( :duplicate , key_ets , _key , entry ) do
1201+ defp register_key ( { :duplicate , _ } , key_ets , _key , entry ) do
11251202 true = :ets . insert ( key_ets , entry )
11261203 :ok
11271204 end
@@ -1339,10 +1416,10 @@ defmodule Registry do
13391416 key_ets = key_ets || key_ets! ( registry , key , partitions )
13401417 :ets . select_count ( key_ets , spec )
13411418
1342- { :duplicate , 1 , key_ets } ->
1419+ { { :duplicate , _ } , 1 , key_ets } ->
13431420 :ets . select_count ( key_ets , spec )
13441421
1345- { :duplicate , partitions , _key_ets } ->
1422+ { { :duplicate , _ } , partitions , _key_ets } ->
13461423 Enum . sum_by ( 0 .. ( partitions - 1 ) , fn partition_index ->
13471424 :ets . select_count ( key_ets! ( registry , partition_index ) , spec )
13481425 end )
@@ -1512,7 +1589,12 @@ defmodule Registry do
15121589 { hash ( key , partitions ) , hash ( pid , partitions ) }
15131590 end
15141591
1515- defp partitions ( :duplicate , _key , pid , partitions ) do
1592+ defp partitions ( { :duplicate , :key } , key , _pid , partitions ) do
1593+ partition = hash ( key , partitions )
1594+ { partition , partition }
1595+ end
1596+
1597+ defp partitions ( { :duplicate , :pid } , _key , pid , partitions ) do
15161598 partition = hash ( pid , partitions )
15171599 { partition , partition }
15181600 end
@@ -1576,9 +1658,10 @@ defmodule Registry.Supervisor do
15761658 defp strategy_for_kind ( :unique ) , do: :one_for_all
15771659
15781660 # Duplicate registries have both key and pid partitions hashed
1579- # by pid. This means that, if a PID partition crashes, all of
1661+ # by key ({:duplicate, :key}) or pid ({:duplicate, :pid}).
1662+ # This means that, if a PID or key partition crashes, all of
15801663 # its associated entries are in its sibling table, so we crash one.
1581- defp strategy_for_kind ( :duplicate ) , do: :one_for_one
1664+ defp strategy_for_kind ( { :duplicate , _ } ) , do: :one_for_one
15821665end
15831666
15841667defmodule Registry.Partition do
@@ -1633,6 +1716,7 @@ defmodule Registry.Partition do
16331716
16341717 def init ( { kind , registry , i , partitions , key_partition , pid_partition , listeners , compressed } ) do
16351718 Process . flag ( :trap_exit , true )
1719+
16361720 key_ets = init_key_ets ( kind , key_partition , compressed )
16371721 pid_ets = init_pid_ets ( kind , pid_partition )
16381722
@@ -1659,7 +1743,7 @@ defmodule Registry.Partition do
16591743 :ets . new ( key_partition , compression_opt ( opts , compressed ) )
16601744 end
16611745
1662- defp init_key_ets ( :duplicate , key_partition , compressed ) do
1746+ defp init_key_ets ( { :duplicate , _ } , key_partition , compressed ) do
16631747 opts = [ :duplicate_bag , :public , read_concurrency: true , write_concurrency: true ]
16641748 :ets . new ( key_partition , compression_opt ( opts , compressed ) )
16651749 end
0 commit comments