Elegant RabbitMQ client package with Go idioms and configurable options.
π Simple API Design: Clean and intuitive interfaces with options pattern configuration β‘ Connection Management: Automatic connection lifecycle handling with reconnection support π Message Acknowledgment: Complete message acknowledgment with Ack/Reject operations π― Concurrent Processing: Built-in rate limiting with configurable concurrent goroutines π Flexible Options: Chainable With methods allowing fine-grained publish and consume configuration πͺ Message Persistence: Message-level delivery mode settings supporting both transient and persistent messages
go get github.com/go-xlan/go-rabbitmqThis complete example demonstrates concurrent publishing and consuming with message acknowledgment:
package main
import (
"context"
"math/rand/v2"
"sync"
"time"
"github.com/go-xlan/go-rabbitmq/rabbitmq"
"github.com/google/uuid"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/yyle88/must"
"github.com/yyle88/neatjson/neatjsonm"
"github.com/yyle88/neatjson/neatjsons"
"github.com/yyle88/rese"
"github.com/yyle88/zaplog"
)
type Message struct {
Value string
}
func main() {
var cfg = &rabbitmq.Config{
Protocol: "amqp",
Host: "127.0.0.1",
Port: 5672,
Username: "guest",
Password: "guest",
QueueName: "go-rabbitmq-demo1x",
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
publishFunction(cfg)
}()
wg.Add(1)
go func() {
defer wg.Done()
consumeFunction(cfg)
}()
wg.Wait()
}
func publishFunction(cfg *rabbitmq.Config) {
ctx := context.Background()
pub := rese.P1(rabbitmq.NewPublishClient(cfg))
defer rese.F0(pub.Close)
for idx := 0; idx < 10; idx++ {
msg := &Message{Value: uuid.New().String()}
data := neatjsonm.B(msg)
if idx%2 == 0 {
must.Done(pub.PublishMessage(ctx, data, pub.NewPublishOptions().WithDeliveryMode(amqp.Persistent)))
} else {
must.Done(pub.PublishMessage(ctx, data, pub.NewPublishOptions()))
}
time.Sleep(time.Millisecond * 10)
}
time.Sleep(time.Millisecond * 100)
}
func consumeFunction(cfg *rabbitmq.Config) {
ctx := context.Background()
ctx, cancelCauseFunc := context.WithCancelCause(ctx)
must.Done(rabbitmq.ConsumeMessage(ctx, cfg, func(name string, data []byte) error {
zaplog.SUG.Infoln(name)
zaplog.SUG.Infoln(neatjsons.SxB(data))
if rand.IntN(100) < 10 {
return nil
}
return errors.Errorf("xxx")
}, 10000))
time.Sleep(time.Millisecond * 500)
cancelCauseFunc(errors.New("done"))
time.Sleep(time.Millisecond * 200)
}β¬οΈ Source: Source
Configure queue with custom options:
opts := rabbitmq.NewQueueOptions("my-queue").
WithDurable(true).
WithAutoDelete(false).
WithExclusive(false)Configure consume behavior:
opts := rabbitmq.NewConsumeOptions("my-queue").
WithAutoAck(false).
WithConsumerTag("my-consumer").
WithExclusive(false)Publish with custom options:
opts := pub.NewPublishOptions().
WithDeliveryMode(amqp.Persistent).
WithExchange("my-exchange").
WithMandatory(true)
err := pub.PublishMessage(ctx, data, opts)Use TLS for secure connection:
cfg := &rabbitmq.Config{
Protocol: "amqps",
Host: "secure.rabbitmq.host",
Port: 5671,
Username: "user",
Password: "pass",
QueueName: "secure-queue",
}- Protocol:
"amqp"(plain) /"amqps"(TLS-secured) - Host: RabbitMQ hostname / IP address
- Port: RabbitMQ port (
5672standard,5671TLS) - Username: Authentication username
- Password: Authentication password
- QueueName: Default queue name to use
- Durable: Queue survives restarts (default:
true) - AutoDelete: Delete queue when no consumers (default:
false) - Exclusive: Queue accessible to this connection during its lifetime (default:
false) - NoWait: Don't wait to confirm queue declaration (default:
false) - Args: Extra arguments to pass to queue declaration
- AutoAck: Auto-acknowledge messages (default:
false) - ConsumerTag: Consumer tag name
- Exclusive: Exclusive consumer, no concurrent consumers (default:
false) - NoLocal: Don't get messages published on same connection (default:
false) - NoWait: Don't wait to confirm consume operation (default:
false) - Args: Extra arguments to pass to consume operation
- DeliveryMode:
0(transient) /2(persistent) - Exchange: Exchange name to publish message to
- RoutingKey: Routing key to route message
- Mandatory: Return message if unroutable
- Immediate: Return message if no immediate consumers
The package uses structured exception types to provide context about different conditions:
// Exception types
- MqCannotInit // Failed to initialize connection
- MqDisconnected // Lost connection to RabbitMQ
- ContextIsDone // Context cancelledAutomatic reconnection happens in the background except when the first connection fails.
- Close clients: Use
defer pub.Close()to ensure resource cleanup - Handle context cancellation: Pass appropriate context to enable smooth shutdown
- Configure concurrent processing: Set appropriate
maxRoutinebased on workload - Use persistent messages with care: Persistence prevents message loss but has performance cost
- Message acknowledgment: Return
nilto ack, returnerrorto reject and requeue - Test configuration: Test in development environment before production
MIT License. See LICENSE.
Contributions are welcome! Report bugs, suggest features, and contribute code:
- π Found a mistake? Open an issue on GitHub with reproduction steps
- π‘ Have a feature idea? Create an issue to discuss the suggestion
- π Documentation confusing? Report it so we can improve
- π Need new features? Share the use cases to help us understand requirements
- β‘ Performance issue? Help us optimize through reporting slow operations
- π§ Configuration problem? Ask questions about complex setups
- π’ Follow project progress? Watch the repo to get new releases and features
- π Success stories? Share how this package improved the workflow
- π¬ Feedback? We welcome suggestions and comments
New code contributions, follow this process:
- Fork: Fork the repo on GitHub (using the webpage UI).
- Clone: Clone the forked project (
git clone https://github.com/yourname/repo-name.git). - Navigate: Navigate to the cloned project (
cd repo-name) - Branch: Create a feature branch (
git checkout -b feature/xxx). - Code: Implement the changes with comprehensive tests
- Testing: (Golang project) Ensure tests pass (
go test ./...) and follow Go code style conventions - Documentation: Update documentation to support client-facing changes and use significant commit messages
- Stage: Stage changes (
git add .) - Commit: Commit changes (
git commit -m "Add feature xxx") ensuring backward compatible code - Push: Push to the branch (
git push origin feature/xxx). - PR: Open a merge request on GitHub (on the GitHub webpage) with detailed description.
Please ensure tests pass and include relevant documentation updates.
Welcome to contribute to this project via submitting merge requests and reporting issues.
Project Support:
- β Give GitHub stars if this project helps you
- π€ Share with teammates and (golang) programming friends
- π Write tech blogs about development tools and workflows - we provide content writing support
- π Join the ecosystem - committed to supporting open source and the (golang) development scene
Have Fun Coding with this package! πππ