Skip to content

Commit ef837fc

Browse files
committed
Add new method Publisher.NotifyPublishWithReturn for ordered return/confirmation handling
- Add a new method Publisher.NotifyPublishWithReturn that ensures proper pairing of returns and confirmations according to RabbitMQ protocol where returns are immediately followed by confirmations for unroutable messages - Add comprehensive integration tests covering routable/unroutable scenarios with mandatory and non-mandatory publishing This addresses the issue where NotifyPublish() and NotifyReturn() handlers run in separate goroutines and can go out of order, making it difficult to properly link returned messages with their confirmations for reliable unrouted message handling.
1 parent 0958e3a commit ef837fc

File tree

5 files changed

+305
-14
lines changed

5 files changed

+305
-14
lines changed

.github/workflows/tests.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ jobs:
1010
runs-on: ubuntu-latest
1111
env:
1212
GOPROXY: "https://proxy.golang.org,direct"
13-
ENABLE_DOCKER_INTEGRATION_TESTS: TRUE
1413

1514
steps:
1615
- name: Set up Go

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
all: test vet staticcheck
22

3+
4+
# Run all tests, including integration tests
35
test:
6+
go test -v -tags=integration ./...
7+
8+
# Run unit tests only
9+
unit-test:
410
go test -v ./...
511

612
vet:

examples/publisher_confirm/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
"syscall"
1010
"time"
1111

12-
rabbitmq "github.com/wagslane/go-rabbitmq"
12+
"github.com/wagslane/go-rabbitmq"
1313
)
1414

1515
func main() {
@@ -38,6 +38,11 @@ func main() {
3838
log.Printf("message returned from server: %s", string(r.Body))
3939
})
4040

41+
publisher.NotifyPublishWithReturn(func(p rabbitmq.Confirmation, r rabbitmq.Return) {
42+
log.Printf("message confirmed and returned from server: %s, deliveryTag=%d, ACK=%v", string(r.Body), p.DeliveryTag, p.Ack)
43+
44+
})
45+
4146
// block main thread - wait for shutdown signal
4247
sigs := make(chan os.Signal, 1)
4348
done := make(chan bool, 1)

integration_test.go

Lines changed: 186 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
1+
//go:build integration
2+
13
package rabbitmq
24

35
import (
46
"context"
57
"fmt"
6-
"os"
78
"os/exec"
89
"strings"
910
"testing"
1011
"time"
1112
)
1213

13-
const enableDockerIntegrationTestsFlag = `ENABLE_DOCKER_INTEGRATION_TESTS`
14-
1514
func prepareDockerTest(t *testing.T) (connStr string) {
16-
if v, ok := os.LookupEnv(enableDockerIntegrationTestsFlag); !ok || strings.ToUpper(v) != "TRUE" {
17-
t.Skipf("integration tests are only run if '%s' is TRUE", enableDockerIntegrationTestsFlag)
18-
return
19-
}
2015
ctx, cancel := context.WithCancel(context.Background())
2116
defer cancel()
2217

@@ -97,7 +92,7 @@ func TestSimplePubSub(t *testing.T) {
9792
}
9893
defer consumer.CloseWithContext(context.Background())
9994

100-
// Setup a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
95+
// Set up a consumer which pushes each of its consumed messages over the channel. If the channel is closed or full
10196
// it does not block.
10297
consumed := make(chan Delivery)
10398
defer close(consumed)
@@ -154,3 +149,186 @@ func TestSimplePubSub(t *testing.T) {
154149
}
155150
}
156151
}
152+
153+
// TestNotifyPublishWithReturn tests the NotifyPublishWithReturn functionality by publishing both
154+
// routable and unroutable messages and verifying the handler receives the correct confirmations and returns.
155+
func TestNotifyPublishWithReturn(t *testing.T) {
156+
connStr := prepareDockerTest(t)
157+
conn := waitForHealthyAmqp(t, connStr)
158+
defer conn.Close()
159+
160+
// Define a struct to hold the results of the publish operation
161+
type publishResult struct {
162+
confirmation Confirmation
163+
returnInfo Return
164+
hasReturn bool
165+
}
166+
167+
// Helper function to set up publisher and result channel
168+
setupPublisher := func(t *testing.T) (*Publisher, chan publishResult) {
169+
publisher, err := NewPublisher(conn, WithPublisherOptionsLogger(simpleLogF(t.Logf)))
170+
if err != nil {
171+
t.Fatal("error creating publisher", err)
172+
}
173+
results := make(chan publishResult, 10)
174+
175+
// Set up the handler
176+
publisher.NotifyPublishWithReturn(func(p Confirmation, r Return) {
177+
hasReturn := r.ReplyCode != 0
178+
t.Logf("NotifyPublishWithReturn called: ack=%v, hasReturn=%v, replyCode=%d",
179+
p.Ack, hasReturn, r.ReplyCode)
180+
181+
results <- publishResult{
182+
confirmation: p,
183+
returnInfo: r,
184+
hasReturn: hasReturn,
185+
}
186+
})
187+
188+
return publisher, results
189+
}
190+
191+
t.Run("UnroutableMandatoryMessage_ShouldBeAckedAndReturned", func(t *testing.T) {
192+
publisher, results := setupPublisher(t)
193+
defer publisher.Close()
194+
195+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
196+
defer cancel()
197+
198+
// Publish to non-existent queue with mandatory=true (should return)
199+
t.Logf("publishing to non-existent queue (should be returned)")
200+
err := publisher.PublishWithContext(
201+
ctx,
202+
[]byte("test message 1"),
203+
[]string{"non-existent-queue"},
204+
WithPublishOptionsMandatory,
205+
)
206+
if err != nil {
207+
t.Fatal("failed to publish to non-existent queue", err)
208+
}
209+
210+
// Wait for the return + confirmation
211+
select {
212+
case result := <-results:
213+
if !result.hasReturn {
214+
t.Fatal("expected a return for unroutable message, but got none")
215+
}
216+
if result.returnInfo.ReplyCode == 0 {
217+
t.Fatal("expected non-zero reply code for returned message")
218+
}
219+
if !result.confirmation.Ack {
220+
t.Fatal("expected message to be acked, but it was nacked")
221+
}
222+
t.Logf("correctly received return: replyCode=%d, replyText=%s",
223+
result.returnInfo.ReplyCode, result.returnInfo.ReplyText)
224+
case <-time.After(time.Second * 5):
225+
t.Fatal("timeout waiting for return notification")
226+
}
227+
})
228+
229+
t.Run("RoutableMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) {
230+
publisher, results := setupPublisher(t)
231+
defer publisher.Close()
232+
233+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
234+
defer cancel()
235+
236+
// Create a queue and publish to it (should succeed, no return)
237+
consumerQueue := "test_queue_" + fmt.Sprintf("%d", time.Now().UnixNano())
238+
consumer, err := NewConsumer(conn, consumerQueue, WithConsumerOptionsLogger(simpleLogF(t.Logf)))
239+
if err != nil {
240+
t.Fatal("error creating consumer", err)
241+
}
242+
defer consumer.CloseWithContext(context.Background())
243+
244+
// Start consumer to ensure queue exists
245+
consumed := make(chan Delivery, 1)
246+
go func() {
247+
err = consumer.Run(func(d Delivery) Action {
248+
select {
249+
case consumed <- d:
250+
default:
251+
}
252+
return Ack
253+
})
254+
if err != nil {
255+
t.Log("consumer run failed", err)
256+
}
257+
}()
258+
259+
// Wait for the consumer to be ready
260+
time.Sleep(100 * time.Millisecond)
261+
262+
t.Logf("publishing to existing queue (should succeed)")
263+
err = publisher.PublishWithContext(
264+
ctx,
265+
[]byte("test message 2"),
266+
[]string{consumerQueue},
267+
WithPublishOptionsMandatory,
268+
)
269+
if err != nil {
270+
t.Fatal("failed to publish to existing queue", err)
271+
}
272+
273+
// Wait for the confirmation (no return expected)
274+
select {
275+
case result := <-results:
276+
if result.hasReturn {
277+
t.Fatalf("unexpected return for routable message: replyCode=%d, replyText=%s",
278+
result.returnInfo.ReplyCode, result.returnInfo.ReplyText)
279+
}
280+
if !result.confirmation.Ack {
281+
t.Fatal("expected message to be acked, but it was nacked")
282+
}
283+
t.Logf("correctly received confirmation without return: ack=%v", result.confirmation.Ack)
284+
case <-time.After(time.Second * 5):
285+
t.Fatal("timeout waiting for confirmation notification")
286+
}
287+
288+
// Verify the message was actually consumed
289+
select {
290+
case d := <-consumed:
291+
t.Logf("message successfully consumed: '%s'", string(d.Body))
292+
if string(d.Body) != "test message 2" {
293+
t.Fatalf("expected message 'test message 2', got '%s'", string(d.Body))
294+
}
295+
case <-time.After(time.Second * 3):
296+
t.Fatal("timeout waiting for message consumption")
297+
}
298+
})
299+
300+
t.Run("UnroutableNonMandatoryMessage_ShouldBeAckedWithoutReturn", func(t *testing.T) {
301+
publisher, results := setupPublisher(t)
302+
defer publisher.Close()
303+
304+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
305+
defer cancel()
306+
307+
// Publish without mandatory flag to non-existent queue (should succeed, no return)
308+
t.Logf("publishing to non-existent queue without mandatory (should succeed)")
309+
err := publisher.PublishWithContext(
310+
ctx,
311+
[]byte("test message 3"),
312+
[]string{"another-non-existent-queue"},
313+
// No WithPublishOptionsMandatory - defaults to false
314+
)
315+
if err != nil {
316+
t.Fatal("failed to publish without mandatory", err)
317+
}
318+
319+
// Wait for the confirmation (no return expected)
320+
select {
321+
case result := <-results:
322+
if result.hasReturn {
323+
t.Fatalf("unexpected return for non-mandatory message: replyCode=%d, replyText=%s",
324+
result.returnInfo.ReplyCode, result.returnInfo.ReplyText)
325+
}
326+
if !result.confirmation.Ack {
327+
t.Fatal("expected non-mandatory message to be acked, but it was nacked")
328+
}
329+
t.Logf("correctly received confirmation without return for non-mandatory: ack=%v", result.confirmation.Ack)
330+
case <-time.After(time.Second * 5):
331+
t.Fatal("timeout waiting for non-mandatory confirmation notification")
332+
}
333+
})
334+
}

0 commit comments

Comments
 (0)