@@ -68,8 +68,15 @@ groups() ->
68
68
local_to_local_delete_dest_queue ,
69
69
local_to_local_vhost_access ,
70
70
local_to_local_user_access ,
71
- local_to_local_credit_flow ,
72
- local_to_local_stream_credit_flow
71
+ local_to_local_credit_flow_on_confirm ,
72
+ local_to_local_credit_flow_on_publish ,
73
+ local_to_local_credit_flow_no_ack ,
74
+ local_to_local_quorum_credit_flow_on_confirm ,
75
+ local_to_local_quorum_credit_flow_on_publish ,
76
+ local_to_local_quorum_credit_flow_no_ack ,
77
+ local_to_local_stream_credit_flow_on_confirm ,
78
+ local_to_local_stream_credit_flow_on_publish ,
79
+ local_to_local_stream_credit_flow_no_ack
73
80
]}
74
81
].
75
82
@@ -930,7 +937,16 @@ local_to_local_user_access(Config) ->
930
937
none ]),
931
938
shovel_test_utils :await_no_shovel (Config , ? PARAM ).
932
939
933
- local_to_local_credit_flow (Config ) ->
940
+ local_to_local_credit_flow_on_confirm (Config ) ->
941
+ local_to_local_credit_flow (Config , <<" on-confirm" >>).
942
+
943
+ local_to_local_credit_flow_on_publish (Config ) ->
944
+ local_to_local_credit_flow (Config , <<" on-publish" >>).
945
+
946
+ local_to_local_credit_flow_no_ack (Config ) ->
947
+ local_to_local_credit_flow (Config , <<" no-ack" >>).
948
+
949
+ local_to_local_credit_flow (Config , AckMode ) ->
934
950
Src = ? config (srcq , Config ),
935
951
Dest = ? config (destq , Config ),
936
952
with_session (Config ,
@@ -939,13 +955,53 @@ local_to_local_credit_flow(Config) ->
939
955
[{<<" src-protocol" >>, <<" local" >>},
940
956
{<<" src-queue" >>, Src },
941
957
{<<" dest-protocol" >>, <<" local" >>},
942
- {<<" dest-queue" >>, Dest }
958
+ {<<" dest-queue" >>, Dest },
959
+ {<<" ack-mode" >>, AckMode }
960
+ ]),
961
+ publish_many (Sess , Src , Dest , <<" tag1" >>, 500 ),
962
+ expect_many (Sess , Dest , 500 )
963
+ end ).
964
+
965
+ local_to_local_quorum_credit_flow_on_confirm (Config ) ->
966
+ local_to_local_quorum_credit_flow (Config , <<" on-confirm" >>).
967
+
968
+ local_to_local_quorum_credit_flow_on_publish (Config ) ->
969
+ local_to_local_quorum_credit_flow (Config , <<" on-publish" >>).
970
+
971
+ local_to_local_quorum_credit_flow_no_ack (Config ) ->
972
+ local_to_local_quorum_credit_flow (Config , <<" no-ack" >>).
973
+
974
+ local_to_local_quorum_credit_flow (Config , AckMode ) ->
975
+ Src = ? config (srcq , Config ),
976
+ Dest = ? config (destq , Config ),
977
+ VHost = <<" /" >>,
978
+ declare_queue (Config , VHost , Src , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
979
+ declare_queue (Config , VHost , Dest , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}]),
980
+ with_session (Config ,
981
+ fun (Sess ) ->
982
+ shovel_test_utils :set_param (Config , ? PARAM ,
983
+ [{<<" src-protocol" >>, <<" local" >>},
984
+ {<<" src-queue" >>, Src },
985
+ {<<" src-predeclared" >>, true },
986
+ {<<" dest-protocol" >>, <<" local" >>},
987
+ {<<" dest-queue" >>, Dest },
988
+ {<<" dest-predeclared" >>, true },
989
+ {<<" ack-mode" >>, AckMode }
943
990
]),
944
991
publish_many (Sess , Src , Dest , <<" tag1" >>, 500 ),
945
992
expect_many (Sess , Dest , 500 )
946
993
end ).
947
994
948
- local_to_local_stream_credit_flow (Config ) ->
995
+ local_to_local_stream_credit_flow_on_confirm (Config ) ->
996
+ local_to_local_stream_credit_flow (Config , <<" on-confirm" >>).
997
+
998
+ local_to_local_stream_credit_flow_on_publish (Config ) ->
999
+ local_to_local_stream_credit_flow (Config , <<" on-publish" >>).
1000
+
1001
+ local_to_local_stream_credit_flow_no_ack (Config ) ->
1002
+ local_to_local_stream_credit_flow (Config , <<" no-ack" >>).
1003
+
1004
+ local_to_local_stream_credit_flow (Config , AckMode ) ->
949
1005
Src = ? config (srcq , Config ),
950
1006
Dest = ? config (destq , Config ),
951
1007
VHost = <<" /" >>,
@@ -959,7 +1015,8 @@ local_to_local_stream_credit_flow(Config) ->
959
1015
{<<" src-predeclared" >>, true },
960
1016
{<<" dest-protocol" >>, <<" local" >>},
961
1017
{<<" dest-queue" >>, Dest },
962
- {<<" dest-predeclared" >>, true }
1018
+ {<<" dest-predeclared" >>, true },
1019
+ {<<" ack-mode" >>, AckMode }
963
1020
]),
964
1021
965
1022
Receiver = subscribe (Sess , Dest ),
@@ -972,6 +1029,7 @@ local_to_local_stream_credit_flow(Config) ->
972
1029
amqp10_client :detach_link (Receiver )
973
1030
end ).
974
1031
1032
+
975
1033
% %----------------------------------------------------------------------------
976
1034
with_session (Config , Fun ) ->
977
1035
with_session (Config , <<" /" >>, Fun ).
0 commit comments