Skip to content

Commit ccbc8d1

Browse files
authored
Add ErrMaxReconnectAttemptsReached (#49)
* add error max ErrMaxReconnectAttemptsReached * closes #48 --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 25962ec commit ccbc8d1

File tree

6 files changed

+35
-13
lines changed

6 files changed

+35
-13
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
# RabbitMQ AMQP 1.0 Golang Client
22

33
This library is meant to be used with RabbitMQ 4.0.
4-
Suitable for testing in pre-production environments.
5-
64

75
## Getting Started
86

97
- [Getting Started](docs/examples/getting_started)
108
- [Examples](docs/examples)
9+
Inside the `docs/examples` directory you will find several examples to get you started.</br>
10+
Also advanced examples like how to use streams, how to handle reconnections, and how to use TLS.
1111
- Getting started Video tutorial: </br>
1212
[![Getting Started](https://img.youtube.com/vi/iR1JUFh3udI/0.jpg)](https://youtu.be/iR1JUFh3udI)
1313

docs/examples/reliable/reliable.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,19 @@ func main() {
1616
var stateAccepted int32
1717
var stateReleased int32
1818
var stateRejected int32
19+
var isRunning bool
1920

2021
var received int32
2122
var failed int32
2223

2324
startTime := time.Now()
25+
isRunning = true
2426
go func() {
25-
for {
27+
for isRunning {
2628
time.Sleep(5 * time.Second)
2729
total := stateAccepted + stateReleased + stateRejected
2830
messagesPerSecond := float64(total) / time.Since(startTime).Seconds()
2931
rmq.Info("[Stats]", "sent", total, "received", received, "failed", failed, "messagesPerSecond", messagesPerSecond)
30-
3132
}
3233
}()
3334

@@ -41,6 +42,16 @@ func main() {
4142
switch statusChanged.To.(type) {
4243
case *rmq.StateOpen:
4344
signalBlock.Broadcast()
45+
case *rmq.StateReconnecting:
46+
rmq.Info("[connection]", "Reconnecting to the AMQP 1.0 server")
47+
case *rmq.StateClosed:
48+
StateClosed := statusChanged.To.(*rmq.StateClosed)
49+
if errors.Is(StateClosed.GetError(), rmq.ErrMaxReconnectAttemptsReached) {
50+
rmq.Error("[connection]", "Max reconnect attempts reached. Closing connection", StateClosed.GetError())
51+
signalBlock.Broadcast()
52+
isRunning = false
53+
}
54+
4455
}
4556
}
4657
}(stateChanged)
@@ -87,13 +98,13 @@ func main() {
8798

8899
// Consume messages from the queue
89100
go func(ctx context.Context) {
90-
for {
101+
for isRunning {
91102
deliveryContext, err := consumer.Receive(ctx)
92103
if errors.Is(err, context.Canceled) {
93104
// The consumer was closed correctly
94105
return
95106
}
96-
if err != nil {
107+
if err != nil && isRunning {
97108
// An error occurred receiving the message
98109
// here the consumer could be disconnected from the server due to a network error
99110
signalBlock.L.Lock()
@@ -107,7 +118,7 @@ func main() {
107118

108119
atomic.AddInt32(&received, 1)
109120
err = deliveryContext.Accept(context.Background())
110-
if err != nil {
121+
if err != nil && isRunning {
111122
// same here the delivery could not be accepted due to a network error
112123
// we wait for 2_500 ms and try again
113124
time.Sleep(2500 * time.Millisecond)
@@ -124,12 +135,13 @@ func main() {
124135
return
125136
}
126137

127-
wg := &sync.WaitGroup{}
128138
for i := 0; i < 1; i++ {
129-
wg.Add(1)
130139
go func() {
131-
defer wg.Done()
132140
for i := 0; i < 500_000; i++ {
141+
if !isRunning {
142+
rmq.Info("[Publisher]", "Publisher is stopped simulation not running, queue", queueName)
143+
return
144+
}
133145
publishResult, err := publisher.Publish(context.Background(), rmq.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
134146
if err != nil {
135147
// here you need to deal with the error. You can store the message in a local in memory/persistent storage
@@ -160,7 +172,6 @@ func main() {
160172
}
161173
}()
162174
}
163-
wg.Wait()
164175

165176
println("press any key to close the connection")
166177

pkg/rabbitmqamqp/amqp_binding.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,8 @@ func (b *AMQPBinding) Bind(ctx context.Context) (string, error) {
5858
kv[destination] = b.destinationName
5959
kv["arguments"] = make(map[string]any)
6060
_, err := b.management.Request(ctx, kv, path, commandPost, []int{responseCode204})
61-
bindingPathWithExchangeQueueKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
62-
return bindingPathWithExchangeQueueKey, err
61+
bindingPathWithExchangeQueueAndKey := bindingPathWithExchangeQueueKey(b.toQueue, b.sourceName, b.destinationName, b.bindingKey)
62+
return bindingPathWithExchangeQueueAndKey, err
6363
}
6464

6565
// Unbind removes a binding between an exchange and a queue or exchange

pkg/rabbitmqamqp/amqp_connection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,10 @@ func (a *AmqpConnection) maybeReconnect() {
350350
Error("Reconnection attempt failed", "attempt", attempt, "error", err, "ID", a.Id())
351351
}
352352

353+
// If we reach here, all attempts failed
354+
Error("All reconnection attempts failed, closing connection", "ID", a.Id())
355+
a.lifeCycle.SetState(&StateClosed{error: ErrMaxReconnectAttemptsReached})
356+
353357
}
354358

355359
// restartEntities attempts to restart all publishers and consumers after a reconnection

pkg/rabbitmqamqp/amqp_connection_recovery.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
package rabbitmqamqp
22

33
import (
4+
"errors"
45
"sync"
56
"time"
67
)
78

9+
// ErrMaxReconnectAttemptsReached typed error when the MaxReconnectAttempts is reached
10+
var ErrMaxReconnectAttemptsReached = errors.New("max reconnect attempts reached, connection will not be recovered")
11+
812
type RecoveryConfiguration struct {
913
/*
1014
ActiveRecovery Define if the recovery is activated.

pkg/rabbitmqamqp/life_cycle.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ func (s StateChanged) String() string {
7777

7878
switch s.To.(type) {
7979
case *StateClosed:
80+
if s.To.(*StateClosed).error == nil {
81+
return fmt.Sprintf("From: %s, To: %s", statusToString(s.From), statusToString(s.To))
82+
}
8083
return fmt.Sprintf("From: %s, To: %s, Error: %s", statusToString(s.From), statusToString(s.To), s.To.(*StateClosed).error)
8184

8285
}

0 commit comments

Comments
 (0)