@@ -13,6 +13,7 @@ import (
1313 "github.com/pkg/errors"
1414 "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
1515 "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
16+ test_helper "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper"
1617)
1718
1819var _ = Describe ("Streaming Consumers" , func () {
@@ -835,33 +836,32 @@ var _ = Describe("Streaming Consumers", func() {
835836 It ("Manual Credit Strategy" , func () {
836837 producer , err := env .NewProducer (streamName , nil )
837838 Expect (err ).NotTo (HaveOccurred ())
838- const batchSize = 32
839- err = producer .BatchSend (CreateArrayMessagesForTesting (batchSize ))
840- Expect (err ).NotTo (HaveOccurred ())
841- err = producer .BatchSend (CreateArrayMessagesForTesting (batchSize ))
842- Expect (err ).NotTo (HaveOccurred ())
839+
840+ const batchSize = 500
843841 err = producer .BatchSend (CreateArrayMessagesForTesting (batchSize ))
844842 Expect (err ).NotTo (HaveOccurred ())
845843
846- var messagesReceived int32
844+ msgCh := make ( chan * amqp. Message )
847845 consumer , err := env .NewConsumer (streamName ,
848- func (_ ConsumerContext , _ * amqp.Message ) {
849- atomic . AddInt32 ( & messagesReceived , 1 )
846+ func (_ ConsumerContext , msg * amqp.Message ) {
847+ msgCh <- msg
850848 }, NewConsumerOptions ().
851849 SetOffset (OffsetSpecification {}.First ()).
852850 SetCreditStrategy (ManualCreditStrategy ).
853851 SetInitialCredits (1 ))
854852 Expect (err ).NotTo (HaveOccurred ())
855853
856- Eventually (func () int32 {
857- return atomic .LoadInt32 (& messagesReceived )
858- }, 10 * time .Second ).Should (Equal (int32 (batchSize )))
859-
860- Expect (consumer .Credit (2 )).NotTo (HaveOccurred ())
854+ // Sad workaround to avoid asserting too soon when there's nothing in the channel
855+ <- time .After (time .Millisecond * 100 )
861856
862- Eventually (func () int32 {
863- return atomic .LoadInt32 (& messagesReceived )
864- }, 10 * time .Second ).Should (Equal (int32 (batchSize * 3 )))
857+ // Eventually, it should exhaust the credits
858+ Eventually (msgCh ).Within (3 * time .Second ).ShouldNot (Receive (), "expected no messages after exhausting credits" )
859+ // Give more credits to consume the entire batch of 500 messages
860+ Expect (consumer .Credit (20 )).To (Succeed ())
861+ // Eventually, it should receive the last message
862+ Eventually (msgCh ).Within (10 * time .Second ).Should (Receive (test_helper .HaveMatchingData ("test_499" )))
863+ // It should not receive any more messages, because the entire batch of 500 messages has been consumed
864+ Consistently (msgCh ).ShouldNot (Receive ())
865865
866866 Expect (producer .Close ()).NotTo (HaveOccurred ())
867867 Expect (consumer .Close ()).NotTo (HaveOccurred ())
0 commit comments