diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8e47620..56f6c9e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -10,7 +10,6 @@ jobs: runs-on: ubuntu-latest env: GOPROXY: "https://proxy.golang.org,direct" - ENABLE_DOCKER_INTEGRATION_TESTS: TRUE steps: - name: Set up Go diff --git a/Makefile b/Makefile index 94b7cce..cf9522d 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,12 @@ all: test vet staticcheck + +# Run all tests, including integration tests test: + go test -v -tags=integration ./... + +# Run unit tests only +unit-test: go test -v ./... vet: diff --git a/examples/publisher_confirm/main.go b/examples/publisher_confirm/main.go index f5aecaf..92895da 100644 --- a/examples/publisher_confirm/main.go +++ b/examples/publisher_confirm/main.go @@ -9,7 +9,7 @@ import ( "syscall" "time" - rabbitmq "github.com/wagslane/go-rabbitmq" + "github.com/wagslane/go-rabbitmq" ) func main() { @@ -38,6 +38,10 @@ func main() { log.Printf("message returned from server: %s", string(r.Body)) }) + publisher.NotifyPublishWithReturn(func(p rabbitmq.Confirmation, r rabbitmq.Return) { + log.Printf("message confirmed and returned from server: %s, deliveryTag=%d, ACK=%v", string(r.Body), p.DeliveryTag, p.Ack) + }) + // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) done := make(chan bool, 1) diff --git a/integration_test.go b/integration_test.go index 5df151e..0604cb1 100644 --- a/integration_test.go +++ b/integration_test.go @@ -1,22 +1,17 @@ +//go:build integration + package rabbitmq import ( "context" "fmt" - "os" "os/exec" "strings" "testing" "time" ) -const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS` - func prepareDockerTest(t *testing.T) (connStr string) { - if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" { - t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag) - return - } ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -97,7 +92,7 @@ func TestSimplePubSub(t *testing.T) { } defer consumer.CloseWithContext(context.Background()) - // Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full + // Set up a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full // it does not block. consumed := make(chan Delivery) defer close(consumed) @@ -154,3 +149,186 @@ func TestSimplePubSub(t *testing.T) { } } } + +// TestNotifyPublishWithReturn tests the NotifyPublishWithReturn functionality by publishing both +// routable and unroutable messages and verifying the handler receives the correct confirmations and returns. +func TestNotifyPublishWithReturn(t *testing.T) { + connStr := prepareDockerTest(t) + conn := waitForHealthyAmqp(t, connStr) + defer conn.Close() + + // Define a struct to hold the results of the publish operation + type publishResult struct { + confirmation Confirmation + returnInfo Return + hasReturn bool + } + + // Helper function to set up publisher and result channel + setupPublisher := func(t *testing.T) (*Publisher, chan publishResult) { + publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf))) + if err != nil { + t.Fatal("error creating publisher", err) + } + results := make(chan publishResult, 10) + + // Set up the handler + publisher.NotifyPublishWithReturn(func(p Confirmation, r Return) { + hasReturn := r.ReplyCode != 0 + t.Logf("NotifyPublishWithReturn called: ack=%v, hasReturn=%v, replyCode=%d", + p.Ack, hasReturn, r.ReplyCode) + + results <- publishResult{ + confirmation: p, + returnInfo: r, + hasReturn: hasReturn, + } + }) + + return publisher, results + } + + t.Run("UnroutableMandatoryMessage_ShouldBeAckedAndReturned", func(t *testing.T) { + publisher, results := setupPublisher(t) + defer publisher.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Publish to non-existent queue with mandatory=true (should return) + t.Logf("publishing to non-existent queue (should be returned)") + err := publisher.PublishWithContext( + ctx, + []byte("test message 1"), + []string{"non-existent-queue"}, + WithPublishOptionsMandatory, + ) + if err != nil { + t.Fatal("failed to publish to non-existent queue", err) + } + + // Wait for the return + confirmation + select { + case result := <-results: + if !result.hasReturn { + t.Fatal("expected a return for unroutable message, but got none") + } + if result.returnInfo.ReplyCode == 0 { + t.Fatal("expected non-zero reply code for returned message") + } + if !result.confirmation.Ack { + t.Fatal("expected message to be acked, but it was nacked") + } + t.Logf("correctly received return: replyCode=%d, replyText=%s", + result.returnInfo.ReplyCode, result.returnInfo.ReplyText) + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for return notification") + } + }) + + t.Run("RoutableMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) { + publisher, results := setupPublisher(t) + defer publisher.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Create a queue and publish to it (should succeed, no return) + consumerQueue := "test_queue_" + fmt.Sprintf("%d", time.Now().UnixNano()) + consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf))) + if err != nil { + t.Fatal("error creating consumer", err) + } + defer consumer.CloseWithContext(context.Background()) + + // Start consumer to ensure queue exists + consumed := make(chan Delivery, 1) + go func() { + err = consumer.Run(func(d Delivery) Action { + select { + case consumed <- d: + default: + } + return Ack + }) + if err != nil { + t.Log("consumer run failed", err) + } + }() + + // Wait for the consumer to be ready + time.Sleep(100 * time.Millisecond) + + t.Logf("publishing to existing queue (should succeed)") + err = publisher.PublishWithContext( + ctx, + []byte("test message 2"), + []string{consumerQueue}, + WithPublishOptionsMandatory, + ) + if err != nil { + t.Fatal("failed to publish to existing queue", err) + } + + // Wait for the confirmation (no return expected) + select { + case result := <-results: + if result.hasReturn { + t.Fatalf("unexpected return for routable message: replyCode=%d, replyText=%s", + result.returnInfo.ReplyCode, result.returnInfo.ReplyText) + } + if !result.confirmation.Ack { + t.Fatal("expected message to be acked, but it was nacked") + } + t.Logf("correctly received confirmation without return: ack=%v", result.confirmation.Ack) + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for confirmation notification") + } + + // Verify the message was actually consumed + select { + case d := <-consumed: + t.Logf("message successfully consumed: '%s'", string(d.Body)) + if string(d.Body) != "test message 2" { + t.Fatalf("expected message 'test message 2', got '%s'", string(d.Body)) + } + case <-time.After(time.Second * 3): + t.Fatal("timeout waiting for message consumption") + } + }) + + t.Run("UnroutableNonMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) { + publisher, results := setupPublisher(t) + defer publisher.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // Publish without mandatory flag to non-existent queue (should succeed, no return) + t.Logf("publishing to non-existent queue without mandatory (should succeed)") + err := publisher.PublishWithContext( + ctx, + []byte("test message 3"), + []string{"another-non-existent-queue"}, + // No WithPublishOptionsMandatory - defaults to false + ) + if err != nil { + t.Fatal("failed to publish without mandatory", err) + } + + // Wait for the confirmation (no return expected) + select { + case result := <-results: + if result.hasReturn { + t.Fatalf("unexpected return for non-mandatory message: replyCode=%d, replyText=%s", + result.returnInfo.ReplyCode, result.returnInfo.ReplyText) + } + if !result.confirmation.Ack { + t.Fatal("expected non-mandatory message to be acked, but it was nacked") + } + t.Logf("correctly received confirmation without return for non-mandatory: ack=%v", result.confirmation.Ack) + case <-time.After(time.Second * 5): + t.Fatal("timeout waiting for non-mandatory confirmation notification") + } + }) +} diff --git a/publish.go b/publish.go index 06f9cb0..11994c6 100644 --- a/publish.go +++ b/publish.go @@ -53,9 +53,10 @@ type Publisher struct { disablePublishDueToBlocked bool disablePublishDueToBlockedMu *sync.RWMutex - handlerMu *sync.Mutex - notifyReturnHandler func(r Return) - notifyPublishHandler func(p Confirmation) + handlerMu *sync.Mutex + notifyReturnHandler func(r Return) + notifyPublishHandler func(p Confirmation) + notifyPublishWithReturnHandler func(p Confirmation, r Return) options PublisherOptions } @@ -121,6 +122,7 @@ func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publishe } publisher.startReturnHandler() publisher.startPublishHandler() + publisher.startPublishWithReturnHandler() } }() @@ -320,6 +322,40 @@ func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { } } +// NotifyPublishWithReturn registers a listener for publish confirmation events with optional return information, +// delivered in the correct order according to RabbitMQ protocol. Must set ConfirmPublishings option. +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind. +// +// This method is particularly useful when publishing with mandatory or immediate flags, as it allows you to +// handle both the return (if unroutable) and the associated confirmation in a single handler with proper ordering. +// +// Handler receives: +// - Confirmation: Always present for every published message (ack/nack) +// - Return: Only present if the message was unroutable with mandatory/immediate flags (empty Return{} otherwise) +// +// For successfully routed messages: +// - Only confirmation is received, Return will be empty +// +// For unrouted mandatory or immediate messages: +// - Return is received first (as per RabbitMQ protocol), followed immediately by confirmation +// - Both are delivered to the handler together in the correct pairing +// +// See github.com/rabbitmq/amqp091-go documentation: +// https://pkg.go.dev/github.com/rabbitmq/amqp091-go#Channel.Confirm +// "Unroutable mandatory or immediate messages are acknowledged immediately after any Channel.NotifyReturn +// listeners have been notified." +func (publisher *Publisher) NotifyPublishWithReturn(handler func(p Confirmation, r Return)) { + publisher.handlerMu.Lock() + shouldStart := publisher.notifyPublishWithReturnHandler == nil + publisher.notifyPublishWithReturnHandler = handler + publisher.handlerMu.Unlock() + + if shouldStart { + publisher.startPublishWithReturnHandler() + } +} + func (publisher *Publisher) startReturnHandler() { publisher.handlerMu.Lock() if publisher.notifyReturnHandler == nil { @@ -343,7 +379,11 @@ func (publisher *Publisher) startPublishHandler() { return } publisher.handlerMu.Unlock() - publisher.chanManager.ConfirmSafe(false) + err := publisher.chanManager.ConfirmSafe(false) + if err != nil { + publisher.options.Logger.Errorf("failed to enable confirm mode for publish handler: %v", err) + return + } go func() { confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) @@ -355,3 +395,66 @@ func (publisher *Publisher) startPublishHandler() { } }() } + +func (publisher *Publisher) startPublishWithReturnHandler() { + publisher.handlerMu.Lock() + if publisher.notifyPublishWithReturnHandler == nil { + publisher.handlerMu.Unlock() + return + } + publisher.handlerMu.Unlock() + + // Enable confirm mode for this channel + err := publisher.chanManager.ConfirmSafe(false) + if err != nil { + publisher.options.Logger.Errorf("failed to enable confirm mode for return publish in order handler: %v", err) + return + } + + go func() { + returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return)) + confirmations := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation)) + + for { + select { + case ret, ok := <-returns: + if !ok { + // returns channel closed, likely due to publisher being closed + publisher.options.Logger.Warnf("returns channel closed, stopping return handler") + returns = nil + continue + } + + // According to AMQP 0.9.1 protocol, when a message is returned, + // the return is immediately followed by a confirmation. + // We must consume the next confirmation to pair with this return. + select { + case conf, ok := <-confirmations: + if !ok { + publisher.options.Logger.Warnf("confirmations channel closed while waiting for confirmation after return") + return + } + + // Call handler with both return and confirmation + go publisher.notifyPublishWithReturnHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }, Return{ret}) + } + + case conf, ok := <-confirmations: + if !ok { + publisher.options.Logger.Warnf("confirmations channel closed") + // confirmations channel closed, likely due to publisher being closed + return + } + + // This is a confirmation without a return (successful delivery) + go publisher.notifyPublishWithReturnHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }, Return{}) + } + } + }() +}