@@ -231,7 +231,7 @@ apply(#command_unregister_consumer{vhost = VirtualHost,
231231 of
232232 {value , Consumer } ->
233233 G1 = remove_from_group (Consumer , Group0 ),
234- handle_consumer_removal (G1 , Consumer );
234+ handle_consumer_removal (G1 , Consumer , Stream , ConsumerName );
235235 false ->
236236 {Group0 , []}
237237 end ,
@@ -247,19 +247,24 @@ apply(#command_activate_consumer{vhost = VirtualHost,
247247 stream = Stream ,
248248 consumer_name = ConsumerName },
249249 #? MODULE {groups = StreamGroups0 } = State0 ) ->
250+ rabbit_log :debug (" Activating consumer on ~tp , group ~p " ,
251+ [Stream , ConsumerName ]),
250252 {G , Eff } =
251253 case lookup_group (VirtualHost , Stream , ConsumerName , StreamGroups0 ) of
252254 undefined ->
253- rabbit_log :warning (" trying to activate consumer in group ~tp , but "
255+ rabbit_log :warning (" Trying to activate consumer in group ~tp , but "
254256 " the group does not longer exist" ,
255257 [{VirtualHost , Stream , ConsumerName }]),
256258 {undefined , []};
257259 Group ->
258260 # consumer {pid = Pid , subscription_id = SubId } =
259261 evaluate_active_consumer (Group ),
262+ rabbit_log :debug (" New active consumer on ~tp , group ~tp " ++
263+ " is ~tp from ~tp " ,
264+ [Stream , ConsumerName , SubId , Pid ]),
260265 Group1 =
261266 update_consumer_state_in_group (Group , Pid , SubId , true ),
262- {Group1 , [notify_consumer_effect (Pid , SubId , true )]}
267+ {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]}
263268 end ,
264269 StreamGroups1 =
265270 update_groups (VirtualHost , Stream , ConsumerName , G , StreamGroups0 ),
@@ -499,7 +504,8 @@ do_register_consumer(VirtualHost,
499504 Effects =
500505 case Active of
501506 true ->
502- [notify_consumer_effect (ConnectionPid , SubscriptionId , Active )];
507+ [notify_consumer_effect (ConnectionPid , SubscriptionId ,
508+ Stream , ConsumerName , Active )];
503509 _ ->
504510 []
505511 end ,
@@ -527,7 +533,8 @@ do_register_consumer(VirtualHost,
527533 active = true },
528534 G1 = add_to_group (Consumer0 , Group0 ),
529535 {G1 ,
530- [notify_consumer_effect (ConnectionPid , SubscriptionId , true )]};
536+ [notify_consumer_effect (ConnectionPid , SubscriptionId ,
537+ Stream , ConsumerName , true )]};
531538 _G ->
532539 % % whatever the current state is, the newcomer will be passive
533540 Consumer0 =
@@ -546,18 +553,28 @@ do_register_consumer(VirtualHost,
546553 % % the current active stays the same
547554 {G1 , []};
548555 _ ->
556+ rabbit_log :debug (" SAC consumer registration: " ++
557+ " active consumer change on stream ~tp , group ~tp . " ++
558+ " Notifying ~tp from ~tp it is no longer active." ,
559+ [Stream , ConsumerName , ActSubId , ActPid ]),
549560 % % there's a change, telling the active it's not longer active
550561 {update_consumer_state_in_group (G1 ,
551562 ActPid ,
552563 ActSubId ,
553564 false ),
554565 [notify_consumer_effect (ActPid ,
555566 ActSubId ,
567+ Stream ,
568+ ConsumerName ,
556569 false ,
557570 true )]}
558571 end ;
559572 false ->
560- % % no active consumer in the (non-empty) group, we are waiting for the reply of a former active
573+ rabbit_log :debug (" SAC consumer registration: no active consumer on stream ~tp , group ~tp . " ++
574+ " Likely waiting for a response from former active consumer." ,
575+ [Stream , ConsumerName ]),
576+ % % no active consumer in the (non-empty) group,
577+ % % we are waiting for the reply of a former active
561578 {G1 , []}
562579 end
563580 end ,
@@ -571,27 +588,27 @@ do_register_consumer(VirtualHost,
571588 lookup_consumer (ConnectionPid , SubscriptionId , Group1 ),
572589 {State #? MODULE {groups = StreamGroups1 }, {ok , Active }, Effects }.
573590
574- handle_consumer_removal (# group {consumers = []} = G , _ ) ->
591+ handle_consumer_removal (# group {consumers = []} = G , _ , _ , _ ) ->
575592 {G , []};
576593handle_consumer_removal (# group {partition_index = - 1 } = Group0 ,
577- Consumer ) ->
594+ Consumer , Stream , ConsumerName ) ->
578595 case Consumer of
579596 # consumer {active = true } ->
580597 % % this is the active consumer we remove, computing the new one
581598 Group1 = compute_active_consumer (Group0 ),
582599 case lookup_active_consumer (Group1 ) of
583600 {value , # consumer {pid = Pid , subscription_id = SubId }} ->
584601 % % creating the side effect to notify the new active consumer
585- {Group1 , [notify_consumer_effect (Pid , SubId , true )]};
602+ {Group1 , [notify_consumer_effect (Pid , SubId , Stream , ConsumerName , true )]};
586603 _ ->
587604 % % no active consumer found in the group, nothing to do
588605 {Group1 , []}
589606 end ;
590607 # consumer {active = false } ->
591- % % not the active consumer, nothing to do."),
608+ % % not the active consumer, nothing to do.
592609 {Group0 , []}
593610 end ;
594- handle_consumer_removal (Group0 , Consumer ) ->
611+ handle_consumer_removal (Group0 , Consumer , Stream , ConsumerName ) ->
595612 case lookup_active_consumer (Group0 ) of
596613 {value ,
597614 # consumer {pid = ActPid , subscription_id = ActSubId } =
@@ -601,40 +618,81 @@ handle_consumer_removal(Group0, Consumer) ->
601618 % % the current active stays the same
602619 {Group0 , []};
603620 _ ->
621+ rabbit_log :debug (" SAC consumer removal: " ++
622+ " active consumer change on stream ~tp , group ~tp . " ++
623+ " Notifying ~tp from ~tp it is no longer active." ,
624+ [Stream , ConsumerName , ActSubId , ActPid ]),
625+
604626 % % there's a change, telling the active it's not longer active
605627 {update_consumer_state_in_group (Group0 ,
606628 ActPid ,
607629 ActSubId ,
608630 false ),
609- [notify_consumer_effect (ActPid , ActSubId , false , true )]}
631+ [notify_consumer_effect (ActPid , ActSubId ,
632+ Stream , ConsumerName , false , true )]}
610633 end ;
611634 false ->
612635 case Consumer # consumer .active of
613636 true ->
614637 % % the active one is going away, picking a new one
615638 # consumer {pid = P , subscription_id = SID } =
616639 evaluate_active_consumer (Group0 ),
640+ rabbit_log :debug (" SAC consumer removal: " ++
641+ " active consumer change on stream ~tp , group ~tp . " ++
642+ " Notifying ~tp from ~tp it is the new active consumer." ,
643+ [Stream , ConsumerName , SID , P ]),
617644 {update_consumer_state_in_group (Group0 , P , SID , true ),
618- [notify_consumer_effect (P , SID , true )]};
645+ [notify_consumer_effect (P , SID ,
646+ Stream , ConsumerName , true )]};
619647 false ->
620- % % no active consumer in the (non-empty) group, we are waiting for the reply of a former active
648+ rabbit_log :debug (" SAC consumer removal: no active consumer on stream ~tp , group ~tp . " ++
649+ " Likely waiting for a response from former active consumer." ,
650+ [Stream , ConsumerName ]),
651+ % % no active consumer in the (non-empty) group,
652+ % % we are waiting for the reply of a former active
621653 {Group0 , []}
622654 end
623655 end .
624656
625- notify_consumer_effect (Pid , SubId , Active ) ->
626- notify_consumer_effect (Pid , SubId , Active , false ).
657+ message_type () ->
658+ case has_unblock_group_support () of
659+ true ->
660+ map ;
661+ false ->
662+ tuple
663+ end .
664+
665+ notify_consumer_effect (Pid , SubId , Stream , Name , Active ) ->
666+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , false ).
627667
628- notify_consumer_effect (Pid , SubId , Active , false = _SteppingDown ) ->
668+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , SteppingDown ) ->
669+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , SteppingDown , message_type ()).
670+
671+ notify_consumer_effect (Pid , SubId , _Stream , _Name , Active , false = _SteppingDown , tuple ) ->
629672 mod_call_effect (Pid ,
630673 {sac ,
631- {{subscription_id , SubId }, {active , Active },
674+ {{subscription_id , SubId },
675+ {active , Active },
632676 {extra , []}}});
633- notify_consumer_effect (Pid , SubId , Active , true = _SteppingDown ) ->
677+ notify_consumer_effect (Pid , SubId , _Stream , _Name , Active , true = _SteppingDown , tuple ) ->
634678 mod_call_effect (Pid ,
635679 {sac ,
636- {{subscription_id , SubId }, {active , Active },
637- {extra , [{stepping_down , true }]}}}).
680+ {{subscription_id , SubId },
681+ {active , Active },
682+ {extra , [{stepping_down , true }]}}});
683+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , false = _SteppingDown , map ) ->
684+ mod_call_effect (Pid ,
685+ {sac , #{subscription_id => SubId ,
686+ stream => Stream ,
687+ consumer_name => Name ,
688+ active => Active }});
689+ notify_consumer_effect (Pid , SubId , Stream , Name , Active , true = _SteppingDown , map ) ->
690+ mod_call_effect (Pid ,
691+ {sac , #{subscription_id => SubId ,
692+ stream => Stream ,
693+ consumer_name => Name ,
694+ active => Active ,
695+ stepping_down => true }}).
638696
639697maybe_create_group (VirtualHost ,
640698 Stream ,
@@ -743,3 +801,6 @@ mod_call_effect(Pid, Msg) ->
743801send_message (ConnectionPid , Msg ) ->
744802 ConnectionPid ! Msg ,
745803 ok .
804+
805+ has_unblock_group_support () ->
806+ rabbit_feature_flags :is_enabled (stream_sac_coordinator_unblock_group ).
0 commit comments