Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ jobs:
runs-on: ubuntu-latest
env:
GOPROXY: "https://proxy.golang.org,direct"
ENABLE_DOCKER_INTEGRATION_TESTS: TRUE
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer needed since //go:build integration is used in integration_test.go


steps:
- name: Set up Go
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
all: test vet staticcheck


# Run all tests, including integration tests
test:
go test -v -tags=integration ./...

# Run unit tests only
unit-test:
go test -v ./...

vet:
Expand Down
6 changes: 5 additions & 1 deletion examples/publisher_confirm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"syscall"
"time"

rabbitmq "github.com/wagslane/go-rabbitmq"
"github.com/wagslane/go-rabbitmq"
)

func main() {
Expand Down Expand Up @@ -38,6 +38,10 @@ func main() {
log.Printf("message returned from server: %s", string(r.Body))
})

publisher.NotifyPublishWithReturn(func(p rabbitmq.Confirmation, r rabbitmq.Return) {
log.Printf("message confirmed and returned from server: %s, deliveryTag=%d, ACK=%v", string(r.Body), p.DeliveryTag, p.Ack)
})

// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
Expand Down
194 changes: 186 additions & 8 deletions integration_test.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
//go:build integration

package rabbitmq

import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"testing"
"time"
)

const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`

func prepareDockerTest(t *testing.T) (connStr string) {
if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" {
t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -97,7 +92,7 @@ func TestSimplePubSub(t *testing.T) {
}
defer consumer.CloseWithContext(context.Background())

// Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
// Set up a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
// it does not block.
consumed := make(chan Delivery)
defer close(consumed)
Expand Down Expand Up @@ -154,3 +149,186 @@ func TestSimplePubSub(t *testing.T) {
}
}
}

// TestNotifyPublishWithReturn tests the NotifyPublishWithReturn functionality by publishing both
// routable and unroutable messages and verifying the handler receives the correct confirmations and returns.
func TestNotifyPublishWithReturn(t *testing.T) {
connStr := prepareDockerTest(t)
conn := waitForHealthyAmqp(t, connStr)
defer conn.Close()

// Define a struct to hold the results of the publish operation
type publishResult struct {
confirmation Confirmation
returnInfo Return
hasReturn bool
}

// Helper function to set up publisher and result channel
setupPublisher := func(t *testing.T) (*Publisher, chan publishResult) {
publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
t.Fatal("error creating publisher", err)
}
results := make(chan publishResult, 10)

// Set up the handler
publisher.NotifyPublishWithReturn(func(p Confirmation, r Return) {
hasReturn := r.ReplyCode != 0
t.Logf("NotifyPublishWithReturn called: ack=%v, hasReturn=%v, replyCode=%d",
p.Ack, hasReturn, r.ReplyCode)

results <- publishResult{
confirmation: p,
returnInfo: r,
hasReturn: hasReturn,
}
})

return publisher, results
}

t.Run("UnroutableMandatoryMessage_ShouldBeAckedAndReturned", func(t *testing.T) {
publisher, results := setupPublisher(t)
defer publisher.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Publish to non-existent queue with mandatory=true (should return)
t.Logf("publishing to non-existent queue (should be returned)")
err := publisher.PublishWithContext(
ctx,
[]byte("test message 1"),
[]string{"non-existent-queue"},
WithPublishOptionsMandatory,
)
if err != nil {
t.Fatal("failed to publish to non-existent queue", err)
}

// Wait for the return + confirmation
select {
case result := <-results:
if !result.hasReturn {
t.Fatal("expected a return for unroutable message, but got none")
}
if result.returnInfo.ReplyCode == 0 {
t.Fatal("expected non-zero reply code for returned message")
}
if !result.confirmation.Ack {
t.Fatal("expected message to be acked, but it was nacked")
}
t.Logf("correctly received return: replyCode=%d, replyText=%s",
result.returnInfo.ReplyCode, result.returnInfo.ReplyText)
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for return notification")
}
})

t.Run("RoutableMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) {
publisher, results := setupPublisher(t)
defer publisher.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Create a queue and publish to it (should succeed, no return)
consumerQueue := "test_queue_" + fmt.Sprintf("%d", time.Now().UnixNano())
consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf)))
if err != nil {
t.Fatal("error creating consumer", err)
}
defer consumer.CloseWithContext(context.Background())

// Start consumer to ensure queue exists
consumed := make(chan Delivery, 1)
go func() {
err = consumer.Run(func(d Delivery) Action {
select {
case consumed <- d:
default:
}
return Ack
})
if err != nil {
t.Log("consumer run failed", err)
}
}()

// Wait for the consumer to be ready
time.Sleep(100 * time.Millisecond)

t.Logf("publishing to existing queue (should succeed)")
err = publisher.PublishWithContext(
ctx,
[]byte("test message 2"),
[]string{consumerQueue},
WithPublishOptionsMandatory,
)
if err != nil {
t.Fatal("failed to publish to existing queue", err)
}

// Wait for the confirmation (no return expected)
select {
case result := <-results:
if result.hasReturn {
t.Fatalf("unexpected return for routable message: replyCode=%d, replyText=%s",
result.returnInfo.ReplyCode, result.returnInfo.ReplyText)
}
if !result.confirmation.Ack {
t.Fatal("expected message to be acked, but it was nacked")
}
t.Logf("correctly received confirmation without return: ack=%v", result.confirmation.Ack)
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for confirmation notification")
}

// Verify the message was actually consumed
select {
case d := <-consumed:
t.Logf("message successfully consumed: '%s'", string(d.Body))
if string(d.Body) != "test message 2" {
t.Fatalf("expected message 'test message 2', got '%s'", string(d.Body))
}
case <-time.After(time.Second * 3):
t.Fatal("timeout waiting for message consumption")
}
})

t.Run("UnroutableNonMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) {
publisher, results := setupPublisher(t)
defer publisher.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

// Publish without mandatory flag to non-existent queue (should succeed, no return)
t.Logf("publishing to non-existent queue without mandatory (should succeed)")
err := publisher.PublishWithContext(
ctx,
[]byte("test message 3"),
[]string{"another-non-existent-queue"},
// No WithPublishOptionsMandatory - defaults to false
)
if err != nil {
t.Fatal("failed to publish without mandatory", err)
}

// Wait for the confirmation (no return expected)
select {
case result := <-results:
if result.hasReturn {
t.Fatalf("unexpected return for non-mandatory message: replyCode=%d, replyText=%s",
result.returnInfo.ReplyCode, result.returnInfo.ReplyText)
}
if !result.confirmation.Ack {
t.Fatal("expected non-mandatory message to be acked, but it was nacked")
}
t.Logf("correctly received confirmation without return for non-mandatory: ack=%v", result.confirmation.Ack)
case <-time.After(time.Second * 5):
t.Fatal("timeout waiting for non-mandatory confirmation notification")
}
})
}
Loading