Skip to content

Commit 6dd64ee

Browse files
ZerpetGsantomaggio
andauthored
RPC high level constructs (#59)
* Set Go toolchain * Initial version of RPC server The happy path works. The server can connect to a queue and responds messages if the request's replyTo property is set. * Bump Ginkgo Gomega and use floating 4.1 image in test * Add VS Code gitignores * Add correlationID extractor and post-processor RPC Tests were refactored to extract common setup code into a before each node. * RPC server test for custom functionality A test that verifies that custom correction ID and custom post processor are called by the server. * Add code docs for RPC server functions The test for RpcServer.Close() was returning too soon. The simplest solution is an ugly sleep of 1 second. * Add functions to handle consumer credits manually RPC server construct provides an utility to pause the message processing. This relies on the ability to "drain" credits, and issue new credits at a later point to unpause. This commit exposes that functionality internally. * Implement pause/unpause in RPC server The RPC server now can be constructred from the AMQP connection. Tests were refactored to use this contructor, instead of manually setting up publishers and consumers. Because setting the reply-to address in the post-processor forces the users of custom post-processors to remember/add a line to set the reply-to address always. By setting the reply-to prior to the post-processor, the user has the option to modify the reply-to before sending, if desired. By setting the reply-to prior to post-processor, we free up some cognitive load on the users who wish to use a custom post-processor. * Add E2E test for RPC constructs * Refactor pointer-to functions: one generic function to rule them all * The automatic reconnection was racy. It could try to open a connection at the same time that `Close()` was called on `AmqpConnection`. This required introducing an internal field to determine when the connection is "closed", and a specific error to signal in the reconnection that the connection is "closed" after calling `Close()`. * Add examples and fix typo Signed-off-by: Gabriele Santomaggio <[email protected]> --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Gabriele Santomaggio <[email protected]>
1 parent 8e67f54 commit 6dd64ee

23 files changed

+1880
-75
lines changed

.ci/ubuntu/gha-setup.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ set -o xtrace
77
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
88
readonly script_dir
99
echo "[INFO] script_dir: '$script_dir'"
10-
readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine
10+
readonly rabbitmq_image=rabbitmq:4.1-management-alpine
1111

1212

1313
readonly docker_name_prefix='rabbitmq-amqp-go-client'

.github/workflows/build-test.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ jobs:
99
strategy:
1010
fail-fast: true
1111
matrix:
12-
go: [ '1.22']
12+
go:
13+
- stable
14+
- oldstable
1315
steps:
1416
- uses: actions/checkout@v4
1517
- uses: actions/setup-go@v5

.gitignore

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,15 @@ go.work.sum
2727
coverage.txt
2828
.DS_Store
2929
.ci/ubuntu/log/rabbitmq.log
30+
31+
# Visual Studio Code
32+
.vscode/*
33+
!.vscode/settings.json
34+
!.vscode/tasks.json
35+
!.vscode/launch.json
36+
!.vscode/extensions.json
37+
!.vscode/*.code-snippets
38+
!*.code-workspace
39+
40+
# Built Visual Studio Code Extensions
41+
*.vsix

docs/examples/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@
99
- [Video](video) - From the YouTube tutorial [AMQP 1.0 with Golang](https://youtu.be/iR1JUFh3udI)
1010
- [TLS](tls) - An example of how to use TLS with the AMQP 1.0 client.
1111
- [Advanced Settings](advanced_settings) - An example of how to use the advanced connection settings of the AMQP 1.0 client.
12-
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
12+
- [Broadcast](broadcast) - An example of how to use fanout to broadcast messages to multiple auto-deleted queues.
13+
- [RPC Echo](rpc_echo_server) - An example of how to implement RPC with the AMQP 1.0 client.
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"os"
8+
"os/signal"
9+
10+
"github.com/Azure/go-amqp"
11+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
12+
)
13+
14+
type echoRpcServer struct {
15+
conn *rabbitmqamqp.AmqpConnection
16+
server rabbitmqamqp.RpcServer
17+
}
18+
19+
func (s *echoRpcServer) stop(ctx context.Context) {
20+
s.server.Close(ctx)
21+
s.conn.Close(ctx)
22+
}
23+
24+
func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
25+
_, err := conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
26+
Name: rpcServerQueueName,
27+
})
28+
if err != nil {
29+
panic(err)
30+
}
31+
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
32+
RequestQueue: rpcServerQueueName,
33+
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
34+
return request, nil
35+
},
36+
})
37+
if err != nil {
38+
panic(err)
39+
}
40+
return &echoRpcServer{
41+
conn: conn,
42+
server: srv,
43+
}
44+
}
45+
46+
const rpcServerQueueName = "rpc-queue"
47+
48+
func main() {
49+
// Dial rabbit for RPC server connection
50+
srvConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
51+
if err != nil {
52+
panic(err)
53+
}
54+
55+
srv := newEchoRpcServer(srvConn)
56+
57+
// Dial rabbit for RPC client connection
58+
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
59+
if err != nil {
60+
panic(err)
61+
}
62+
63+
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
64+
RequestQueueName: rpcServerQueueName,
65+
})
66+
if err != nil {
67+
panic(err)
68+
}
69+
70+
// Set up a channel to listen for OS signals
71+
sigs := make(chan os.Signal, 1)
72+
signal.Notify(sigs, os.Interrupt) // Listen for Ctrl+C
73+
74+
// Goroutine to handle graceful shutdown
75+
go func() {
76+
<-sigs // Wait for Ctrl+C
77+
fmt.Println("\nReceived Ctrl+C, gracefully shutting down...")
78+
srv.stop(context.TODO())
79+
_ = clientConn.Close(context.TODO())
80+
_ = srvConn.Close(context.TODO())
81+
os.Exit(0)
82+
}()
83+
84+
reader := bufio.NewReader(os.Stdin)
85+
fmt.Println("Type a message and press Enter to send (Ctrl+C to quit):")
86+
87+
for {
88+
fmt.Print("Enter message: ")
89+
input, _ := reader.ReadString('\n')
90+
// Remove newline character from input
91+
message := input[:len(input)-1]
92+
93+
if message == "" {
94+
continue
95+
}
96+
97+
resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
98+
if err != nil {
99+
fmt.Printf("Error calling RPC: %v\n", err)
100+
continue
101+
}
102+
m, ok := <-resp
103+
if !ok {
104+
fmt.Println("timed out waiting for response")
105+
continue
106+
}
107+
fmt.Printf("response: %s\n", m.GetData())
108+
}
109+
}

go.mod

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,25 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client
22

33
go 1.23.0
44

5+
toolchain go1.24.5
6+
57
require (
68
github.com/Azure/go-amqp v1.4.0
79
github.com/golang-jwt/jwt/v5 v5.2.2
810
github.com/google/uuid v1.6.0
9-
github.com/onsi/ginkgo/v2 v2.22.1
10-
github.com/onsi/gomega v1.36.2
11+
github.com/onsi/ginkgo/v2 v2.23.4
12+
github.com/onsi/gomega v1.38.0
1113
)
1214

1315
require (
1416
github.com/go-logr/logr v1.4.2 // indirect
1517
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
16-
github.com/google/go-cmp v0.6.0 // indirect
17-
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
18-
golang.org/x/net v0.38.0 // indirect
19-
golang.org/x/sys v0.31.0 // indirect
20-
golang.org/x/text v0.23.0 // indirect
21-
golang.org/x/tools v0.28.0 // indirect
18+
github.com/google/go-cmp v0.7.0 // indirect
19+
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
20+
go.uber.org/automaxprocs v1.6.0 // indirect
21+
golang.org/x/net v0.41.0 // indirect
22+
golang.org/x/sys v0.33.0 // indirect
23+
golang.org/x/text v0.26.0 // indirect
24+
golang.org/x/tools v0.33.0 // indirect
2225
gopkg.in/yaml.v3 v3.0.1 // indirect
2326
)

go.sum

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,40 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
1010
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
1111
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
1212
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
13-
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
14-
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
15-
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
16-
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
13+
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
14+
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
15+
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8=
16+
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
1717
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
1818
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
19-
github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM=
20-
github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM=
21-
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
22-
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
19+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
20+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
21+
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
22+
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
23+
github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus=
24+
github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8=
25+
github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY=
26+
github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o=
2327
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
2428
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
29+
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
30+
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
2531
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
2632
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
27-
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
28-
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
29-
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
30-
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
31-
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
32-
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
33-
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
34-
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
35-
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
36-
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
37-
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
33+
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
34+
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
35+
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
36+
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
37+
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
38+
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
39+
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
40+
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
41+
golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
42+
golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI=
43+
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
44+
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
3845
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
46+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
47+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
3948
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
4049
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

pkg/rabbitmqamqp/address_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ var _ = Describe("address builder test ", func() {
1515
Expect(err.Error()).To(Equal(expectedErr))
1616
},
1717
Entry("when both exchange and queue are set",
18-
stringPtr("my_exchange"), nil, stringPtr("my_queue"),
18+
ptr("my_exchange"), nil, ptr("my_queue"),
1919
"exchange and queue cannot be set together"),
2020
Entry("when neither exchange nor queue is set",
2121
nil, nil, nil,
@@ -32,13 +32,13 @@ var _ = Describe("address builder test ", func() {
3232
Expect(address).To(Equal(expected))
3333
},
3434
Entry("with exchange and key",
35-
stringPtr("my_exchange"), stringPtr("my_key"),
35+
ptr("my_exchange"), ptr("my_key"),
3636
"/exchanges/my_exchange/my_key"),
3737
Entry("with exchange only",
38-
stringPtr("my_exchange"), nil,
38+
ptr("my_exchange"), nil,
3939
"/exchanges/my_exchange"),
4040
Entry("with special characters",
41-
stringPtr("my_ exchange/()"), stringPtr("my_key "),
41+
ptr("my_ exchange/()"), ptr("my_key "),
4242
"/exchanges/my_%20exchange%2F%28%29/my_key%20"),
4343
)
4444
})

0 commit comments

Comments
 (0)