Skip to content

Commit 35b8893

Browse files
authored
Add multi uris support (#17)
* Multi uris configuration * remove the Interfaces * Add outcome delivery * refactor * refactor state --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 1a6679a commit 35b8893

22 files changed

+387
-271
lines changed

examples/getting_started/main.go

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,28 @@ import (
99
)
1010

1111
func main() {
12-
1312
exchangeName := "getting-started-exchange"
1413
queueName := "getting-started-queue"
1514
routingKey := "routing-key"
1615

17-
fmt.Printf("Getting started with AMQP Go AMQP 1.0 Client\n")
16+
rabbitmq_amqp.Info("Getting started with AMQP Go AMQP 1.0 Client")
1817

19-
/// Create a channel to receive status change notifications
20-
chStatusChanged := make(chan *rabbitmq_amqp.StatusChanged, 1)
21-
go func(ch chan *rabbitmq_amqp.StatusChanged) {
18+
/// Create a channel to receive state change notifications
19+
stateChanged := make(chan *rabbitmq_amqp.StateChanged, 1)
20+
go func(ch chan *rabbitmq_amqp.StateChanged) {
2221
for statusChanged := range ch {
23-
fmt.Printf("%s\n", statusChanged)
22+
rabbitmq_amqp.Info("[Connection]", "Status changed", statusChanged)
2423
}
25-
}(chStatusChanged)
24+
}(stateChanged)
2625

2726
// Open a connection to the AMQP 1.0 server
28-
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), "amqp://", nil)
27+
amqpConnection, err := rabbitmq_amqp.Dial(context.Background(), []string{"amqp://"}, nil)
2928
if err != nil {
30-
fmt.Printf("Error opening connection: %v\n", err)
29+
rabbitmq_amqp.Error("Error opening connection", err)
3130
return
3231
}
3332
// Register the channel to receive status change notifications
34-
amqpConnection.NotifyStatusChange(chStatusChanged)
33+
amqpConnection.NotifyStatusChange(stateChanged)
3534

3635
fmt.Printf("AMQP Connection opened.\n")
3736
// Create the management interface for the connection
@@ -41,7 +40,7 @@ func main() {
4140
Name: exchangeName,
4241
})
4342
if err != nil {
44-
fmt.Printf("Error declaring exchange: %v\n", err)
43+
rabbitmq_amqp.Error("Error declaring exchange", err)
4544
return
4645
}
4746

@@ -52,7 +51,7 @@ func main() {
5251
})
5352

5453
if err != nil {
55-
fmt.Printf("Error declaring queue: %v\n", err)
54+
rabbitmq_amqp.Error("Error declaring queue", err)
5655
return
5756
}
5857

@@ -64,24 +63,39 @@ func main() {
6463
})
6564

6665
if err != nil {
67-
fmt.Printf("Error binding: %v\n", err)
66+
rabbitmq_amqp.Error("Error binding", err)
6867
return
6968
}
7069

7170
addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
7271

7372
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
7473
if err != nil {
75-
fmt.Printf("Error creating publisher: %v\n", err)
74+
rabbitmq_amqp.Error("Error creating publisher", err)
7675
return
7776
}
7877

7978
// Publish a message to the exchange
80-
err = publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
79+
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
8180
if err != nil {
82-
fmt.Printf("Error publishing message: %v\n", err)
81+
rabbitmq_amqp.Error("Error publishing message", err)
8382
return
8483
}
84+
switch publishResult.Outcome {
85+
case &amqp.StateAccepted{}:
86+
rabbitmq_amqp.Info("Message accepted")
87+
case &amqp.StateReleased{}:
88+
rabbitmq_amqp.Warn("Message was not routed")
89+
case &amqp.StateRejected{}:
90+
rabbitmq_amqp.Warn("Message rejected")
91+
stateType := publishResult.Outcome.(*amqp.StateRejected)
92+
if stateType.Error != nil {
93+
rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error)
94+
}
95+
default:
96+
// these status are not supported
97+
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
98+
}
8599

86100
println("press any key to close the connection")
87101

@@ -132,5 +146,5 @@ func main() {
132146
// Wait for the status change to be printed
133147
time.Sleep(500 * time.Millisecond)
134148

135-
close(chStatusChanged)
149+
close(stateChangeds)
136150
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client
33
go 1.22.0
44

55
require (
6-
github.com/Azure/go-amqp v1.2.0
6+
github.com/Azure/go-amqp v1.4.0-beta.1
77
github.com/google/uuid v1.6.0
88
github.com/onsi/ginkgo/v2 v2.20.2
99
github.com/onsi/gomega v1.34.2

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719 h1:rL7yrEV9yputQV7T+Y9eJVmTVkK4B0aHlBc8TUITC5A=
2-
github.com/Azure/go-amqp v1.1.1-0.20240913224415-f631e6909719/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
3-
github.com/Azure/go-amqp v1.2.0 h1:NNyfN3/cRszWzMvjmm64yaPZDHX/2DJkowv8Ub9y01I=
4-
github.com/Azure/go-amqp v1.2.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
1+
github.com/Azure/go-amqp v1.4.0-beta.1 h1:BjZM/308FpfsQjX0gXtYK8Vx+WgQ1eng3oVQDEeXMmA=
2+
github.com/Azure/go-amqp v1.4.0-beta.1/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
53
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
64
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
75
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=

rabbitmq_amqp/address_helper.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package rabbitmq_amqp
33
import (
44
"errors"
55
"fmt"
6-
"net/url"
76
"strings"
87
)
98

@@ -77,16 +76,16 @@ func encodePathSegments(input string) string {
7776
return encoded.String()
7877
}
7978

80-
// Decode takes a percent-encoded string and returns its decoded representation.
81-
func decode(input string) (string, error) {
82-
// Use url.QueryUnescape which properly decodes percent-encoded strings
83-
decoded, err := url.QueryUnescape(input)
84-
if err != nil {
85-
return "", err
86-
}
87-
88-
return decoded, nil
89-
}
79+
//// Decode takes a percent-encoded string and returns its decoded representation.
80+
//func decode(input string) (string, error) {
81+
// // Use url.QueryUnescape which properly decodes percent-encoded strings
82+
// decoded, err := url.QueryUnescape(input)
83+
// if err != nil {
84+
// return "", err
85+
// }
86+
//
87+
// return decoded, nil
88+
//}
9089

9190
// isUnreserved checks if a character is an unreserved character in percent encoding
9291
// Unreserved characters are: A-Z, a-z, 0-9, -, ., _, ~
@@ -111,5 +110,8 @@ func bindingPathWithExchangeQueueKey(toQueue bool, sourceName, destinationName,
111110
}
112111
format := "/%s/src=%s;%s=%s;key=%s;args="
113112
return fmt.Sprintf(format, bindings, sourceNameEncoded, destinationType, destinationNameEncoded, keyEncoded)
113+
}
114114

115+
func validateAddress(address string) bool {
116+
return strings.HasPrefix(address, fmt.Sprintf("/%s/", exchanges)) || strings.HasPrefix(address, fmt.Sprintf("/%s/", queues))
115117
}

rabbitmq_amqp/amqp_binding_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import (
77
)
88

99
var _ = Describe("AMQP Bindings test ", func() {
10-
var connection IConnection
11-
var management IManagement
10+
var connection *AmqpConnection
11+
var management *AmqpManagement
1212
BeforeEach(func() {
13-
conn, err := Dial(context.TODO(), "amqp://", nil)
13+
conn, err := Dial(context.TODO(), []string{"amqp://"}, nil)
1414
Expect(err).To(BeNil())
1515
connection = conn
1616
management = connection.Management()

rabbitmq_amqp/amqp_connection.go

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,50 @@ import (
1717

1818
type AmqpConnection struct {
1919
Connection *amqp.Conn
20-
management IManagement
20+
management *AmqpManagement
2121
lifeCycle *LifeCycle
2222
session *amqp.Session
2323
}
2424

25-
func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (IPublisher, error) {
26-
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName))
25+
func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, linkName string) (*Publisher, error) {
26+
if !validateAddress(destinationAdd) {
27+
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
28+
}
29+
30+
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce))
2731
if err != nil {
2832
return nil, err
2933
}
3034
return newPublisher(sender), nil
3135
}
3236

33-
// Management returns the management interface for the connection.
34-
// See IManagement interface.
35-
func (a *AmqpConnection) Management() IManagement {
36-
return a.management
37-
}
38-
39-
// Dial creates a new AmqpConnection
40-
// with a new AmqpManagement and a new LifeCycle.
41-
// Returns a pointer to the new AmqpConnection
42-
func Dial(ctx context.Context, addr string, connOptions *amqp.ConnOptions) (IConnection, error) {
37+
// Dial connect to the AMQP 1.0 server using the provided connectionSettings
38+
// Returns a pointer to the new AmqpConnection if successful else an error.
39+
// addresses is a list of addresses to connect to. It picks one randomly.
40+
// It is enough that one of the addresses is reachable.
41+
func Dial(ctx context.Context, addresses []string, connOptions *amqp.ConnOptions) (*AmqpConnection, error) {
4342
conn := &AmqpConnection{
4443
management: NewAmqpManagement(),
4544
lifeCycle: NewLifeCycle(),
4645
}
47-
err := conn.open(ctx, addr, connOptions)
48-
if err != nil {
49-
return nil, err
46+
tmp := make([]string, len(addresses))
47+
copy(tmp, addresses)
48+
49+
// random pick and extract one address to use for connection
50+
for len(tmp) > 0 {
51+
idx := random(len(tmp))
52+
addr := tmp[idx]
53+
// remove the index from the tmp list
54+
tmp = append(tmp[:idx], tmp[idx+1:]...)
55+
err := conn.open(ctx, addr, connOptions)
56+
if err != nil {
57+
Error("Failed to open connection", ExtractWithoutPassword(addr), err)
58+
continue
59+
}
60+
Debug("Connected to", ExtractWithoutPassword(addr))
61+
return conn, nil
5062
}
51-
return conn, nil
63+
return nil, fmt.Errorf("no address to connect to")
5264
}
5365

5466
// Open opens a connection to the AMQP 1.0 server.
@@ -84,30 +96,42 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq
8496
if err != nil {
8597
return err
8698
}
87-
err = a.Management().Open(ctx, a)
99+
err = a.management.Open(ctx, a)
88100
if err != nil {
89101
// TODO close connection?
90102
return err
91103
}
92104

93-
a.lifeCycle.SetStatus(Open)
105+
a.lifeCycle.SetState(&StateOpen{})
94106
return nil
95107
}
96108

97109
func (a *AmqpConnection) Close(ctx context.Context) error {
98-
err := a.Management().Close(ctx)
110+
err := a.management.Close(ctx)
99111
if err != nil {
100112
return err
101113
}
102114
err = a.Connection.Close()
103-
a.lifeCycle.SetStatus(Closed)
115+
a.lifeCycle.SetState(&StateClosed{})
104116
return err
105117
}
106118

107-
func (a *AmqpConnection) NotifyStatusChange(channel chan *StatusChanged) {
119+
// NotifyStatusChange registers a channel to receive getState change notifications
120+
// from the connection.
121+
func (a *AmqpConnection) NotifyStatusChange(channel chan *StateChanged) {
108122
a.lifeCycle.chStatusChanged = channel
109123
}
110124

111-
func (a *AmqpConnection) Status() int {
112-
return a.lifeCycle.Status()
125+
func (a *AmqpConnection) State() LifeCycleState {
126+
return a.lifeCycle.State()
127+
}
128+
129+
// *** management section ***
130+
131+
// Management returns the management interface for the connection.
132+
// The management interface is used to declare and delete exchanges, queues, and bindings.
133+
func (a *AmqpConnection) Management() *AmqpManagement {
134+
return a.management
113135
}
136+
137+
//*** end management section ***

rabbitmq_amqp/amqp_connection_test.go

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
var _ = Describe("AMQP Connection Test", func() {
1212
It("AMQP SASLTypeAnonymous Connection should succeed", func() {
1313

14-
connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
14+
connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{
1515
SASLType: amqp.SASLTypeAnonymous()})
1616
Expect(err).To(BeNil())
1717
err = connection.Close(context.Background())
@@ -20,43 +20,54 @@ var _ = Describe("AMQP Connection Test", func() {
2020

2121
It("AMQP SASLTypePlain Connection should succeed", func() {
2222

23-
connection, err := Dial(context.Background(), "amqp://", &amqp.ConnOptions{
23+
connection, err := Dial(context.Background(), []string{"amqp://"}, &amqp.ConnOptions{
2424
SASLType: amqp.SASLTypePlain("guest", "guest")})
2525

2626
Expect(err).To(BeNil())
2727
err = connection.Close(context.Background())
2828
Expect(err).To(BeNil())
2929
})
3030

31+
It("AMQP Connection connect to the one correct uri and fails the others", func() {
32+
conn, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://"}, nil)
33+
Expect(err).To(BeNil())
34+
Expect(conn.Close(context.Background()))
35+
})
36+
3137
It("AMQP Connection should fail due of wrong Port", func() {
32-
_, err := Dial(context.Background(), "amqp://localhost:1234", nil)
38+
_, err := Dial(context.Background(), []string{"amqp://localhost:1234"}, nil)
3339
Expect(err).NotTo(BeNil())
3440
})
3541

3642
It("AMQP Connection should fail due of wrong Host", func() {
37-
_, err := Dial(context.Background(), "amqp://wrong_host:5672", nil)
43+
_, err := Dial(context.Background(), []string{"amqp://wrong_host:5672"}, nil)
44+
Expect(err).NotTo(BeNil())
45+
})
46+
47+
It("AMQP Connection should fails with all the wrong uris", func() {
48+
_, err := Dial(context.Background(), []string{"amqp://localhost:1234", "amqp://nohost:555", "amqp://nono"}, nil)
3849
Expect(err).NotTo(BeNil())
3950
})
4051

4152
It("AMQP Connection should fail due to context cancellation", func() {
4253
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
4354
cancel()
44-
_, err := Dial(ctx, "amqp://", nil)
55+
_, err := Dial(ctx, []string{"amqp://"}, nil)
4556
Expect(err).NotTo(BeNil())
4657
})
4758

4859
It("AMQP Connection should receive events", func() {
49-
ch := make(chan *StatusChanged, 1)
50-
connection, err := Dial(context.Background(), "amqp://", nil)
60+
ch := make(chan *StateChanged, 1)
61+
connection, err := Dial(context.Background(), []string{"amqp://"}, nil)
5162
Expect(err).To(BeNil())
5263
connection.NotifyStatusChange(ch)
5364
err = connection.Close(context.Background())
5465
Expect(err).To(BeNil())
5566

5667
recv := <-ch
5768
Expect(recv).NotTo(BeNil())
58-
Expect(recv.From).To(Equal(Open))
59-
Expect(recv.To).To(Equal(Closed))
69+
Expect(recv.From).To(Equal(&StateOpen{}))
70+
Expect(recv.To).To(Equal(&StateClosed{}))
6071
})
6172

6273
//It("AMQP TLS Connection should success with SASLTypeAnonymous ", func() {

rabbitmq_amqp/amqp_exchange.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ type AmqpExchangeInfo struct {
99
name string
1010
}
1111

12-
func newAmqpExchangeInfo(name string) IExchangeInfo {
12+
func newAmqpExchangeInfo(name string) *AmqpExchangeInfo {
1313
return &AmqpExchangeInfo{name: name}
1414
}
1515

@@ -33,7 +33,7 @@ func newAmqpExchange(management *AmqpManagement, name string) *AmqpExchange {
3333
}
3434
}
3535

36-
func (e *AmqpExchange) Declare(ctx context.Context) (IExchangeInfo, error) {
36+
func (e *AmqpExchange) Declare(ctx context.Context) (*AmqpExchangeInfo, error) {
3737
path, err := ExchangeAddress(&e.name, nil)
3838
if err != nil {
3939
return nil, err

0 commit comments

Comments
 (0)