diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 155dac05..e56f4df3 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -3,6 +3,7 @@ package stream import ( "bufio" "bytes" + "context" "crypto/tls" "errors" "fmt" @@ -693,7 +694,9 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) { streamMetadata.Leader.advHost = streamMetadata.Leader.Host // see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317 - _, err := net.LookupIP(streamMetadata.Leader.Host) + ctx, cancel := context.WithTimeout(context.Background(), c.socketCallTimeout) + defer cancel() + _, err := net.DefaultResolver.LookupIPAddr(ctx, streamMetadata.Leader.Host) if err != nil { var dnsError *net.DNSError if errors.As(err, &dnsError) { diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 9c6ca744..8a08c64f 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -11,7 +11,7 @@ import ( ) type Consumer struct { - ID uint8 + ID uint8 // also the SubscriptionId response *Response options *ConsumerOptions onClose func() @@ -131,6 +131,17 @@ func (consumer *Consumer) NotifyClose() ChannelClose { return ch } +func (consumer *Consumer) Credit(credits int16) error { + if consumer.options.CreditStrategy != ManualCreditStrategy { + return fmt.Errorf("credit can only be called when CreditStrategy is ManualCreditStrategy") + } + if credits <= 0 { + return fmt.Errorf("credits must be a positive number") + } + consumer.options.client.credit(consumer.ID, credits) + return nil +} + type ConsumerContext struct { Consumer *Consumer chunkInfo *chunkInfo @@ -220,6 +231,14 @@ func (s *SingleActiveConsumer) SetEnabled(enabled bool) *SingleActiveConsumer { return s } +type CreditStrategy int + +const ( + AutomaticCreditStrategy CreditStrategy = iota // Default, sends 1 credit per chunk + ManualCreditStrategy // User manages credits +) + +// ConsumerOptions for a consumer type ConsumerOptions struct { client *Client ConsumerName string @@ -232,14 +251,17 @@ type ConsumerOptions struct { ClientProvidedName string Filter *ConsumerFilter SingleActiveConsumer *SingleActiveConsumer + CreditStrategy CreditStrategy } +// NewConsumerOptions returns a new ConsumerOptions instance func NewConsumerOptions() *ConsumerOptions { return &ConsumerOptions{ Offset: OffsetSpecification{}.Last(), autocommit: false, autoCommitStrategy: NewAutoCommitStrategy(), - CRCCheck: false, + CRCCheck: true, + CreditStrategy: AutomaticCreditStrategy, initialCredits: 10, ClientProvidedName: "go-stream-consumer", Filter: nil, @@ -295,6 +317,11 @@ func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleAc return c } +func (c *ConsumerOptions) SetCreditStrategy(creditStrategy CreditStrategy) *ConsumerOptions { + c.CreditStrategy = creditStrategy + return c +} + func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool { return c.SingleActiveConsumer != nil && c.SingleActiveConsumer.Enabled } diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index d609e5f6..76f205ef 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -832,4 +832,38 @@ var _ = Describe("Streaming Consumers", func() { Expect(consumer.Close()).NotTo(HaveOccurred()) }) + It("Manual Credit Strategy", func() { + producer, err := env.NewProducer(streamName, nil) + Expect(err).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(30)) + Expect(err).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(30)) + Expect(err).NotTo(HaveOccurred()) + err = producer.BatchSend(CreateArrayMessagesForTesting(30)) + Expect(err).NotTo(HaveOccurred()) + + var messagesReceived int32 + consumer, err := env.NewConsumer(streamName, + func(_ ConsumerContext, _ *amqp.Message) { + atomic.AddInt32(&messagesReceived, 1) + }, NewConsumerOptions(). + SetOffset(OffsetSpecification{}.First()). + SetCreditStrategy(ManualCreditStrategy). + SetInitialCredits(1)) + Expect(err).NotTo(HaveOccurred()) + + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(30))) + + Expect(consumer.Credit(2)).NotTo(HaveOccurred()) + + Eventually(func() int32 { + return atomic.LoadInt32(&messagesReceived) + }, 5*time.Second).Should(Equal(int32(90))) + + Expect(producer.Close()).NotTo(HaveOccurred()) + Expect(consumer.Close()).NotTo(HaveOccurred()) + }) + }) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index f7f26a3a..2a67560d 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -410,7 +410,9 @@ func (c *Client) handleDeliver(r *bufio.Reader) { if consumer.getStatus() == open { consumer.chunkForConsumer <- chunk // request a credit for the next chunk - c.credit(subscriptionId, 1) + if consumer.options.CreditStrategy == AutomaticCreditStrategy { + c.credit(subscriptionId, 1) + } } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())