Skip to content

Commit 0958e3a

Browse files
authored
Merge pull request #191 from taki-mekhalfa/feat/add_queue_message_expiration_option_for_the_consumer
feat: add queue message expiration option for consumer
2 parents a3ac13e + 9db6805 commit 0958e3a

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

consumer_options.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package rabbitmq
22

33
import (
4+
"time"
5+
46
amqp "github.com/rabbitmq/amqp091-go"
57
"github.com/wagslane/go-rabbitmq/internal/logger"
68
)
@@ -329,3 +331,16 @@ func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) {
329331

330332
options.QueueOptions.Args["x-queue-type"] = "quorum"
331333
}
334+
335+
// WithConsumerOptionsQueueMessageExpiration sets the message expiration (TTL) for all messages in the queue.
336+
// This option defines how long a message can remain in the queue before it is discarded if not consumed.
337+
// The TTL is specified as a time.Duration and will be converted to milliseconds for RabbitMQ.
338+
// See https://www.rabbitmq.com/docs/ttl#per-queue-message-ttl
339+
func WithConsumerOptionsQueueMessageExpiration(ttl time.Duration) func(*ConsumerOptions) {
340+
return func(options *ConsumerOptions) {
341+
if options.QueueOptions.Args == nil {
342+
options.QueueOptions.Args = Table{}
343+
}
344+
options.QueueOptions.Args["x-message-ttl"] = ttl.Milliseconds()
345+
}
346+
}

0 commit comments

Comments
 (0)