Skip to content

Commit cb00641

Browse files
authored
First consumer version (#19)
* First consumer version --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 023979a commit cb00641

File tree

7 files changed

+385
-31
lines changed

7 files changed

+385
-31
lines changed

examples/getting_started/main.go

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"github.com/Azure/go-amqp"
78
"github.com/rabbitmq/rabbitmq-amqp-go-client/rabbitmq_amqp"
@@ -67,48 +68,97 @@ func main() {
6768
return
6869
}
6970

70-
addr, err := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
71-
72-
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
71+
// Create a consumer to receive messages from the queue
72+
// you need to build the address of the queue, but you can use the helper function
73+
addrQueue, _ := rabbitmq_amqp.QueueAddress(&queueName)
74+
consumer, err := amqpConnection.Consumer(context.Background(), addrQueue, "getting-started-consumer")
7375
if err != nil {
74-
rabbitmq_amqp.Error("Error creating publisher", err)
76+
rabbitmq_amqp.Error("Error creating consumer", err)
7577
return
7678
}
7779

78-
// Publish a message to the exchange
79-
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!")))
80+
consumerContext, cancel := context.WithCancel(context.Background())
81+
82+
// Consume messages from the queue
83+
go func(ctx context.Context) {
84+
for {
85+
deliveryContext, err := consumer.Receive(ctx)
86+
if errors.Is(err, context.Canceled) {
87+
// The consumer was closed correctly
88+
rabbitmq_amqp.Info("[Consumer]", "consumer closed. Context", err)
89+
return
90+
}
91+
if err != nil {
92+
// An error occurred receiving the message
93+
rabbitmq_amqp.Error("[Consumer]", "Error receiving message", err)
94+
return
95+
}
96+
97+
rabbitmq_amqp.Info("[Consumer]", "Received message",
98+
fmt.Sprintf("%s", deliveryContext.Message().Data))
99+
100+
err = deliveryContext.Accept(context.Background())
101+
if err != nil {
102+
rabbitmq_amqp.Error("Error accepting message", err)
103+
return
104+
}
105+
}
106+
}(consumerContext)
107+
108+
addr, _ := rabbitmq_amqp.ExchangeAddress(&exchangeName, &routingKey)
109+
publisher, err := amqpConnection.Publisher(context.Background(), addr, "getting-started-publisher")
80110
if err != nil {
81-
rabbitmq_amqp.Error("Error publishing message", err)
111+
rabbitmq_amqp.Error("Error creating publisher", err)
82112
return
83113
}
84-
switch publishResult.Outcome {
85-
case &amqp.StateAccepted{}:
86-
rabbitmq_amqp.Info("Message accepted")
87-
case &amqp.StateReleased{}:
88-
rabbitmq_amqp.Warn("Message was not routed")
89-
case &amqp.StateRejected{}:
90-
rabbitmq_amqp.Warn("Message rejected")
91-
stateType := publishResult.Outcome.(*amqp.StateRejected)
92-
if stateType.Error != nil {
93-
rabbitmq_amqp.Warn("Message rejected with error: %v", stateType.Error)
114+
115+
for i := 0; i < 10; i++ {
116+
117+
// Publish a message to the exchange
118+
publishResult, err := publisher.Publish(context.Background(), amqp.NewMessage([]byte("Hello, World!"+fmt.Sprintf("%d", i))))
119+
if err != nil {
120+
rabbitmq_amqp.Error("Error publishing message", err)
121+
return
122+
}
123+
switch publishResult.Outcome.(type) {
124+
case *amqp.StateAccepted:
125+
rabbitmq_amqp.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
126+
break
127+
case *amqp.StateReleased:
128+
rabbitmq_amqp.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
129+
break
130+
case *amqp.StateRejected:
131+
rabbitmq_amqp.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
132+
stateType := publishResult.Outcome.(*amqp.StateRejected)
133+
if stateType.Error != nil {
134+
rabbitmq_amqp.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
135+
}
136+
break
137+
default:
138+
// these status are not supported. Leave it for AMQP 1.0 compatibility
139+
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
140+
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
94141
}
95-
default:
96-
// these status are not supported
97-
rabbitmq_amqp.Warn("Message state: %v", publishResult.Outcome)
98142
}
99143

100144
println("press any key to close the connection")
101145

102146
var input string
103147
_, _ = fmt.Scanln(&input)
104148

149+
cancel()
150+
//Close the consumer
151+
err = consumer.Close(context.Background())
152+
if err != nil {
153+
rabbitmq_amqp.Error("[Consumer]", err)
154+
}
105155
// Close the publisher
106156
err = publisher.Close(context.Background())
107157
if err != nil {
108158
return
109159
}
110-
// Unbind the queue from the exchange
111160

161+
// Unbind the queue from the exchange
112162
err = management.Unbind(context.TODO(), bindingPath)
113163

114164
if err != nil {
@@ -143,8 +193,7 @@ func main() {
143193
}
144194

145195
fmt.Printf("AMQP Connection closed.\n")
146-
// Wait for the status change to be printed
147-
time.Sleep(500 * time.Millisecond)
148-
149-
close(stateChangeds)
196+
// not necessary. It waits for the status change to be printed
197+
time.Sleep(100 * time.Millisecond)
198+
close(stateChanged)
150199
}

rabbitmq_amqp/amqp_connection.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,24 @@ func (a *AmqpConnection) Publisher(ctx context.Context, destinationAdd string, l
2626
if !validateAddress(destinationAdd) {
2727
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
2828
}
29-
3029
sender, err := a.session.NewSender(ctx, destinationAdd, createSenderLinkOptions(destinationAdd, linkName, AtLeastOnce))
3130
if err != nil {
3231
return nil, err
3332
}
3433
return newPublisher(sender), nil
3534
}
3635

36+
func (a *AmqpConnection) Consumer(ctx context.Context, destinationAdd string, linkName string) (*Consumer, error) {
37+
if !validateAddress(destinationAdd) {
38+
return nil, fmt.Errorf("invalid destination address, the address should start with /%s/ or/%s/ ", exchanges, queues)
39+
}
40+
receiver, err := a.session.NewReceiver(ctx, destinationAdd, createReceiverLinkOptions(destinationAdd, linkName, AtLeastOnce))
41+
if err != nil {
42+
return nil, err
43+
}
44+
return newConsumer(receiver), nil
45+
}
46+
3747
// Dial connect to the AMQP 1.0 server using the provided connectionSettings
3848
// Returns a pointer to the new AmqpConnection if successful else an error.
3949
// addresses is a list of addresses to connect to. It picks one randomly.
@@ -93,6 +103,7 @@ func (a *AmqpConnection) open(ctx context.Context, addr string, connOptions *amq
93103
}
94104
a.Connection = conn
95105
a.session, err = a.Connection.NewSession(ctx, nil)
106+
96107
if err != nil {
97108
return err
98109
}

rabbitmq_amqp/amqp_consumer.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package rabbitmq_amqp
2+
3+
import (
4+
"context"
5+
"github.com/Azure/go-amqp"
6+
)
7+
8+
type DeliveryContext struct {
9+
receiver *amqp.Receiver
10+
message *amqp.Message
11+
}
12+
13+
func (dc *DeliveryContext) Message() *amqp.Message {
14+
return dc.message
15+
}
16+
17+
func (dc *DeliveryContext) Accept(ctx context.Context) error {
18+
return dc.receiver.AcceptMessage(ctx, dc.message)
19+
}
20+
21+
func (dc *DeliveryContext) Discard(ctx context.Context, e *amqp.Error) error {
22+
return dc.receiver.RejectMessage(ctx, dc.message, e)
23+
}
24+
25+
func (dc *DeliveryContext) DiscardWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
26+
if err := validateMessageAnnotations(annotations); err != nil {
27+
return err
28+
}
29+
// copy the rabbitmq annotations to amqp annotations
30+
destination := make(amqp.Annotations)
31+
for key, value := range annotations {
32+
destination[key] = value
33+
34+
}
35+
36+
return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{
37+
DeliveryFailed: true,
38+
UndeliverableHere: true,
39+
Annotations: destination,
40+
})
41+
}
42+
43+
func (dc *DeliveryContext) Requeue(ctx context.Context) error {
44+
return dc.receiver.ReleaseMessage(ctx, dc.message)
45+
}
46+
47+
func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotations amqp.Annotations) error {
48+
if err := validateMessageAnnotations(annotations); err != nil {
49+
return err
50+
}
51+
// copy the rabbitmq annotations to amqp annotations
52+
destination := make(amqp.Annotations)
53+
for key, value := range annotations {
54+
destination[key] = value
55+
56+
}
57+
return dc.receiver.ModifyMessage(ctx, dc.message, &amqp.ModifyMessageOptions{
58+
DeliveryFailed: false,
59+
UndeliverableHere: false,
60+
Annotations: destination,
61+
})
62+
}
63+
64+
type Consumer struct {
65+
receiver *amqp.Receiver
66+
}
67+
68+
func newConsumer(receiver *amqp.Receiver) *Consumer {
69+
return &Consumer{receiver: receiver}
70+
}
71+
72+
func (c *Consumer) Receive(ctx context.Context) (*DeliveryContext, error) {
73+
msg, err := c.receiver.Receive(ctx, nil)
74+
if err != nil {
75+
return nil, err
76+
}
77+
return &DeliveryContext{receiver: c.receiver, message: msg}, nil
78+
}
79+
80+
func (c *Consumer) Close(ctx context.Context) error {
81+
return c.receiver.Close(ctx)
82+
}

0 commit comments

Comments
 (0)