@@ -69,6 +69,7 @@ groups() ->
6969 metrics_cleanup_on_leadership_takeover ,
7070 metrics_cleanup_on_leader_crash ,
7171 consume_in_minority ,
72+ reject_after_leader_transfer ,
7273 shrink_all ,
7374 rebalance ,
7475 file_handle_reservations ,
@@ -907,6 +908,36 @@ consume_in_minority(Config) ->
907908 ok = rabbit_ct_broker_helpers :start_node (Config , Server2 ),
908909 ok .
909910
911+ reject_after_leader_transfer (Config ) ->
912+ [Server0 , Server1 , Server2 ] =
913+ rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
914+
915+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
916+ QQ = ? config (queue_name , Config ),
917+ RaName = binary_to_atom (<<" %2F_" , QQ /binary >>, utf8 ),
918+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
919+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
920+ publish (Ch , QQ ),
921+
922+ Ch2 = rabbit_ct_client_helpers :open_channel (Config , Server2 ),
923+ {# 'basic.get_ok' {delivery_tag = Tag }, # amqp_msg {}} =
924+ basic_get (Ch2 , QQ , false , 10 ),
925+
926+ ServerId1 = {RaName , Server1 },
927+ ct :pal (" transfer leadership ~p " ,
928+ [rabbit_ct_broker_helpers :rpc (Config , 0 , ra ,
929+ transfer_leadership , [ServerId1 , ServerId1 ])]),
930+ ok = amqp_channel :call (Ch2 , # 'basic.reject' {delivery_tag = Tag ,
931+ requeue = true }),
932+ wait_for_messages (Config , [[QQ , <<" 1" >>, <<" 1" >>, <<" 0" >>]]),
933+
934+ {# 'basic.get_ok' {delivery_tag = Tag2 }, # amqp_msg {}} =
935+ basic_get (Ch2 , QQ , false , 10 ),
936+
937+ ok = amqp_channel :call (Ch2 , # 'basic.reject' {delivery_tag = Tag2 ,
938+ requeue = true }),
939+ ok .
940+
910941shrink_all (Config ) ->
911942 [Server0 , Server1 , Server2 ] =
912943 rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
@@ -3340,3 +3371,14 @@ validate_queue(Ch, Queue, ExpectedMsgs) ->
33403371 end
33413372 end || M <- ExpectedMsgs ],
33423373 ok .
3374+
3375+ basic_get (_ , _ , _ , 0 ) ->
3376+ empty ;
3377+ basic_get (Ch , Q , NoAck , Attempt ) ->
3378+ case amqp_channel :call (Ch , # 'basic.get' {queue = Q , no_ack = NoAck }) of
3379+ {# 'basic.get_ok' {}, # amqp_msg {}} = R ->
3380+ R ;
3381+ _ ->
3382+ timer :sleep (100 ),
3383+ basic_get (Ch , Q , NoAck , Attempt - 1 )
3384+ end .
0 commit comments