Skip to content

Commit 024150d

Browse files
committed
Implement manual credit flow
1 parent e6aad87 commit 024150d

File tree

4 files changed

+70
-4
lines changed

4 files changed

+70
-4
lines changed

pkg/stream/client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package stream
33
import (
44
"bufio"
55
"bytes"
6+
"context"
67
"crypto/tls"
78
"errors"
89
"fmt"
@@ -693,7 +694,9 @@ func (c *Client) BrokerLeader(stream string) (*Broker, error) {
693694
streamMetadata.Leader.advHost = streamMetadata.Leader.Host
694695

695696
// see: https://github.com/rabbitmq/rabbitmq-stream-go-client/pull/317
696-
_, err := net.LookupIP(streamMetadata.Leader.Host)
697+
ctx, cancel := context.WithTimeout(context.Background(), c.socketCallTimeout)
698+
defer cancel()
699+
_, err := net.DefaultResolver.LookupIPAddr(ctx, streamMetadata.Leader.Host)
697700
if err != nil {
698701
var dnsError *net.DNSError
699702
if errors.As(err, &dnsError) {

pkg/stream/consumer.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
)
1212

1313
type Consumer struct {
14-
ID uint8
14+
ID uint8 // also the SubscriptionId
1515
response *Response
1616
options *ConsumerOptions
1717
onClose func()
@@ -131,6 +131,17 @@ func (consumer *Consumer) NotifyClose() ChannelClose {
131131
return ch
132132
}
133133

134+
func (consumer *Consumer) Credit(credits int16) error {
135+
if consumer.options.CreditStrategy != ManualCreditStrategy {
136+
return fmt.Errorf("credit can only be called when CreditStrategy is ManualCreditStrategy")
137+
}
138+
if credits <= 0 {
139+
return fmt.Errorf("credits must be a positive number")
140+
}
141+
consumer.options.client.credit(consumer.ID, credits)
142+
return nil
143+
}
144+
134145
type ConsumerContext struct {
135146
Consumer *Consumer
136147
chunkInfo *chunkInfo
@@ -220,6 +231,14 @@ func (s *SingleActiveConsumer) SetEnabled(enabled bool) *SingleActiveConsumer {
220231
return s
221232
}
222233

234+
type CreditStrategy int
235+
236+
const (
237+
AutomaticCreditStrategy CreditStrategy = iota // Default, sends 1 credit per chunk
238+
ManualCreditStrategy // User manages credits
239+
)
240+
241+
// ConsumerOptions for a consumer
223242
type ConsumerOptions struct {
224243
client *Client
225244
ConsumerName string
@@ -232,14 +251,17 @@ type ConsumerOptions struct {
232251
ClientProvidedName string
233252
Filter *ConsumerFilter
234253
SingleActiveConsumer *SingleActiveConsumer
254+
CreditStrategy CreditStrategy
235255
}
236256

257+
// NewConsumerOptions returns a new ConsumerOptions instance
237258
func NewConsumerOptions() *ConsumerOptions {
238259
return &ConsumerOptions{
239260
Offset: OffsetSpecification{}.Last(),
240261
autocommit: false,
241262
autoCommitStrategy: NewAutoCommitStrategy(),
242-
CRCCheck: false,
263+
CRCCheck: true,
264+
CreditStrategy: AutomaticCreditStrategy,
243265
initialCredits: 10,
244266
ClientProvidedName: "go-stream-consumer",
245267
Filter: nil,
@@ -295,6 +317,11 @@ func (c *ConsumerOptions) SetSingleActiveConsumer(singleActiveConsumer *SingleAc
295317
return c
296318
}
297319

320+
func (c *ConsumerOptions) SetCreditStrategy(creditStrategy CreditStrategy) *ConsumerOptions {
321+
c.CreditStrategy = creditStrategy
322+
return c
323+
}
324+
298325
func (c *ConsumerOptions) IsSingleActiveConsumerEnabled() bool {
299326
return c.SingleActiveConsumer != nil && c.SingleActiveConsumer.Enabled
300327
}

pkg/stream/consumer_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,4 +832,38 @@ var _ = Describe("Streaming Consumers", func() {
832832
Expect(consumer.Close()).NotTo(HaveOccurred())
833833
})
834834

835+
It("Manual Credit Strategy", func() {
836+
producer, err := env.NewProducer(streamName, nil)
837+
Expect(err).NotTo(HaveOccurred())
838+
err = producer.BatchSend(CreateArrayMessagesForTesting(30))
839+
Expect(err).NotTo(HaveOccurred())
840+
err = producer.BatchSend(CreateArrayMessagesForTesting(30))
841+
Expect(err).NotTo(HaveOccurred())
842+
err = producer.BatchSend(CreateArrayMessagesForTesting(30))
843+
Expect(err).NotTo(HaveOccurred())
844+
845+
var messagesReceived int32
846+
consumer, err := env.NewConsumer(streamName,
847+
func(_ ConsumerContext, _ *amqp.Message) {
848+
atomic.AddInt32(&messagesReceived, 1)
849+
}, NewConsumerOptions().
850+
SetOffset(OffsetSpecification{}.First()).
851+
SetCreditStrategy(ManualCreditStrategy).
852+
SetInitialCredits(1))
853+
Expect(err).NotTo(HaveOccurred())
854+
855+
Eventually(func() int32 {
856+
return atomic.LoadInt32(&messagesReceived)
857+
}, 5*time.Second).Should(Equal(int32(30)))
858+
859+
Expect(consumer.Credit(2)).NotTo(HaveOccurred())
860+
861+
Eventually(func() int32 {
862+
return atomic.LoadInt32(&messagesReceived)
863+
}, 5*time.Second).Should(Equal(int32(90)))
864+
865+
Expect(producer.Close()).NotTo(HaveOccurred())
866+
Expect(consumer.Close()).NotTo(HaveOccurred())
867+
})
868+
835869
})

pkg/stream/server_frame.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,9 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
410410
if consumer.getStatus() == open {
411411
consumer.chunkForConsumer <- chunk
412412
// request a credit for the next chunk
413-
c.credit(subscriptionId, 1)
413+
if consumer.options.CreditStrategy == AutomaticCreditStrategy {
414+
c.credit(subscriptionId, 1)
415+
}
414416
} else {
415417
logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+
416418
"Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())

0 commit comments

Comments
 (0)