Skip to content

Commit 9c9e2f2

Browse files
committed
Add examples and fix typo
CorrectionID -> CorrelationID Sometimes AI doesnt get it right
1 parent 2c855ec commit 9c9e2f2

File tree

6 files changed

+439
-7
lines changed

6 files changed

+439
-7
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"os"
8+
"os/signal"
9+
10+
"github.com/Azure/go-amqp"
11+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
12+
)
13+
14+
type echoRpcServer struct {
15+
conn *rabbitmqamqp.AmqpConnection
16+
server rabbitmqamqp.RpcServer
17+
}
18+
19+
func (s *echoRpcServer) stop(ctx context.Context) {
20+
s.server.Close(ctx)
21+
s.conn.Close(ctx)
22+
}
23+
24+
func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
25+
conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
26+
Name: rpcServerQueueName,
27+
})
28+
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
29+
RequestQueue: rpcServerQueueName,
30+
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
31+
return request, nil
32+
},
33+
})
34+
if err != nil {
35+
panic(err)
36+
}
37+
return &echoRpcServer{
38+
conn: conn,
39+
server: srv,
40+
}
41+
}
42+
43+
const rpcServerQueueName = "rpc-queue"
44+
45+
func main() {
46+
// Dial rabbit for RPC server connection
47+
srvConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
48+
if err != nil {
49+
panic(err)
50+
}
51+
52+
srv := newEchoRpcServer(srvConn)
53+
54+
// Dial rabbit for RPC client connection
55+
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
56+
if err != nil {
57+
panic(err)
58+
}
59+
60+
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
61+
RequestQueueName: rpcServerQueueName,
62+
})
63+
if err != nil {
64+
panic(err)
65+
}
66+
67+
// Set up a channel to listen for OS signals
68+
sigs := make(chan os.Signal, 1)
69+
signal.Notify(sigs, os.Interrupt) // Listen for Ctrl+C
70+
71+
// Goroutine to handle graceful shutdown
72+
go func() {
73+
<-sigs // Wait for Ctrl+C
74+
fmt.Println("\nReceived Ctrl+C, gracefully shutting down...")
75+
srv.stop(context.TODO())
76+
_ = clientConn.Close(context.TODO())
77+
_ = srvConn.Close(context.TODO())
78+
os.Exit(0)
79+
}()
80+
81+
reader := bufio.NewReader(os.Stdin)
82+
fmt.Println("Type a message and press Enter to send (Ctrl+C to quit):")
83+
84+
for {
85+
fmt.Print("Enter message: ")
86+
input, _ := reader.ReadString('\n')
87+
// Remove newline character from input
88+
message := input[:len(input)-1]
89+
90+
if message == "" {
91+
continue
92+
}
93+
94+
resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
95+
if err != nil {
96+
fmt.Printf("Error calling RPC: %v\n", err)
97+
continue
98+
}
99+
m, ok := <-resp
100+
if !ok {
101+
fmt.Println("timed out waiting for response")
102+
continue
103+
}
104+
fmt.Printf("response: %s\n", m.GetData())
105+
}
106+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package rabbitmqamqp_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/Azure/go-amqp"
8+
"github.com/google/uuid"
9+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
10+
)
11+
12+
const (
13+
rpcServerQueueNameCustom = "rpc-queue-custom"
14+
correlationIDHeader = "x-correlation-id"
15+
)
16+
17+
type customCorrelationIDSupplier struct{}
18+
19+
func (s *customCorrelationIDSupplier) Get() any {
20+
return uuid.New().String()
21+
}
22+
23+
func Example_customCorrelationId() {
24+
// Dial rabbit for RPC server connection
25+
srvConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
26+
if err != nil {
27+
panic(err)
28+
}
29+
defer srvConn.Close(context.Background())
30+
31+
_, err = srvConn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
32+
Name: rpcServerQueueNameCustom,
33+
})
34+
if err != nil {
35+
panic(err)
36+
}
37+
38+
server, err := srvConn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
39+
RequestQueue: rpcServerQueueNameCustom,
40+
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
41+
fmt.Printf("Received: %s\n", request.GetData())
42+
return request, nil
43+
},
44+
CorrelationIdExtractor: func(message *amqp.Message) any {
45+
if message.ApplicationProperties == nil {
46+
panic("application properties are missing")
47+
}
48+
return message.ApplicationProperties[correlationIDHeader]
49+
},
50+
ReplyPostProcessor: func(reply *amqp.Message, correlationID any) *amqp.Message {
51+
if reply.ApplicationProperties == nil {
52+
reply.ApplicationProperties = make(map[string]interface{})
53+
}
54+
reply.ApplicationProperties[correlationIDHeader] = correlationID
55+
return reply
56+
},
57+
})
58+
if err != nil {
59+
panic(err)
60+
}
61+
defer server.Close(context.Background())
62+
63+
// Dial rabbit for RPC client connection
64+
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
65+
if err != nil {
66+
panic(err)
67+
}
68+
defer clientConn.Close(context.Background())
69+
70+
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
71+
RequestQueueName: rpcServerQueueNameCustom,
72+
CorrelationIdSupplier: &customCorrelationIDSupplier{},
73+
CorrelationIdExtractor: func(message *amqp.Message) any {
74+
if message.ApplicationProperties == nil {
75+
panic("application properties are missing")
76+
}
77+
return message.ApplicationProperties[correlationIDHeader]
78+
},
79+
RequestPostProcessor: func(request *amqp.Message, correlationID any) *amqp.Message {
80+
if request.ApplicationProperties == nil {
81+
request.ApplicationProperties = make(map[string]interface{})
82+
}
83+
request.ApplicationProperties[correlationIDHeader] = correlationID
84+
return request
85+
},
86+
})
87+
if err != nil {
88+
panic(err)
89+
}
90+
defer rpcClient.Close(context.Background())
91+
92+
message := "hello world"
93+
resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
94+
if err != nil {
95+
fmt.Printf("Error calling RPC: %v\n", err)
96+
return
97+
}
98+
99+
m, ok := <-resp
100+
if !ok {
101+
fmt.Println("timed out waiting for response")
102+
return
103+
}
104+
fmt.Printf("Response: %s\n", m.GetData())
105+
// Output:
106+
// Received: hello world
107+
// Response: hello world
108+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package rabbitmqamqp_test
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"fmt"
7+
"os"
8+
"os/signal"
9+
10+
"github.com/Azure/go-amqp"
11+
"github.com/rabbitmq/rabbitmq-amqp-go-client/pkg/rabbitmqamqp"
12+
)
13+
14+
type echoRpcServer struct {
15+
conn *rabbitmqamqp.AmqpConnection
16+
server rabbitmqamqp.RpcServer
17+
}
18+
19+
func (s *echoRpcServer) stop(ctx context.Context) {
20+
s.server.Close(ctx)
21+
s.conn.Close(ctx)
22+
}
23+
24+
func newEchoRpcServer(conn *rabbitmqamqp.AmqpConnection) *echoRpcServer {
25+
conn.Management().DeclareQueue(context.TODO(), &rabbitmqamqp.QuorumQueueSpecification{
26+
Name: rpcServerQueueName,
27+
})
28+
srv, err := conn.NewRpcServer(context.TODO(), rabbitmqamqp.RpcServerOptions{
29+
RequestQueue: rpcServerQueueName,
30+
Handler: func(ctx context.Context, request *amqp.Message) (*amqp.Message, error) {
31+
fmt.Printf("echo: %s\n", request.GetData())
32+
return request, nil
33+
},
34+
})
35+
if err != nil {
36+
panic(err)
37+
}
38+
return &echoRpcServer{
39+
conn: conn,
40+
server: srv,
41+
}
42+
}
43+
44+
const rpcServerQueueName = "rpc-queue"
45+
46+
func Example() {
47+
// Dial rabbit for RPC server connection
48+
srvConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
49+
if err != nil {
50+
panic(err)
51+
}
52+
53+
srv := newEchoRpcServer(srvConn)
54+
55+
// Dial rabbit for RPC client connection
56+
clientConn, err := rabbitmqamqp.Dial(context.TODO(), "amqp://localhost:5672", nil)
57+
if err != nil {
58+
panic(err)
59+
}
60+
61+
rpcClient, err := clientConn.NewRpcClient(context.TODO(), &rabbitmqamqp.RpcClientOptions{
62+
RequestQueueName: rpcServerQueueName,
63+
})
64+
if err != nil {
65+
panic(err)
66+
}
67+
68+
// Set up a channel to listen for OS signals
69+
sigs := make(chan os.Signal, 1)
70+
signal.Notify(sigs, os.Interrupt) // Listen for Ctrl+C
71+
72+
// Goroutine to handle graceful shutdown
73+
go func() {
74+
<-sigs // Wait for Ctrl+C
75+
fmt.Println("\nReceived Ctrl+C, gracefully shutting down...")
76+
srv.stop(context.TODO())
77+
_ = clientConn.Close(context.TODO())
78+
_ = srvConn.Close(context.TODO())
79+
os.Exit(0)
80+
}()
81+
82+
reader := bufio.NewReader(os.Stdin)
83+
fmt.Println("Type a message and press Enter to send (Ctrl+C to quit):")
84+
85+
for {
86+
fmt.Print("Enter message: ")
87+
input, _ := reader.ReadString('\n')
88+
// Remove newline character from input
89+
message := input[:len(input)-1]
90+
91+
if message == "" {
92+
continue
93+
}
94+
95+
resp, err := rpcClient.Publish(context.TODO(), amqp.NewMessage([]byte(message)))
96+
if err != nil {
97+
fmt.Printf("Error calling RPC: %v\n", err)
98+
continue
99+
}
100+
m, ok := <-resp
101+
if !ok {
102+
fmt.Println("timed out waiting for response")
103+
continue
104+
}
105+
fmt.Printf("response: %s\n", m.GetData())
106+
}
107+
}

pkg/rabbitmqamqp/rpc_client.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ type RpcClient interface {
3939
Publish(context.Context, *amqp.Message) (<-chan *amqp.Message, error)
4040
}
4141

42-
// CorrectionIdSupplier is an interface for providing correlation IDs for RPC requests.
42+
// CorrelationIdSupplier is an interface for providing correlation IDs for RPC requests.
4343
// Implementations should generate unique identifiers for each request.
4444
// The returned value from `Get()` should be an AMQP type, or a type that can be
4545
// encoded into an AMQP message property (e.g., string, int, []byte, etc.).
46-
type CorrectionIdSupplier interface {
46+
type CorrelationIdSupplier interface {
4747
Get() any
4848
}
4949

@@ -61,7 +61,7 @@ func (c *randomUuidCorrelationIdSupplier) Get() any {
6161
return s
6262
}
6363

64-
func newRandomUuidCorrelationIdSupplier() CorrectionIdSupplier {
64+
func newRandomUuidCorrelationIdSupplier() CorrelationIdSupplier {
6565
u, err := uuid.NewRandom()
6666
if err != nil {
6767
panic(err)
@@ -96,17 +96,29 @@ var DefaultRpcRequestTimeout = 30 * time.Second
9696
// RpcClientOptions is a struct that contains the options for the RPC client.
9797
// It is used to configure the RPC client.
9898
type RpcClientOptions struct {
99-
// The name of the queue to send requests to.
99+
// The name of the queue to send requests to. This queue must exist.
100+
//
101+
// Mandatory.
100102
RequestQueueName string
101103
// The name of the queue to receive replies from.
104+
//
105+
// Optional. If not set, a dedicated reply-to queue will be created for each request.
102106
ReplyToQueueName string
103107
// Generator of correlation IDs for requests. Each correlationID generated must be unique.
104-
CorrelationIdSupplier CorrectionIdSupplier
108+
//
109+
// Optional. If not set, a random UUID will be used as prefix and an auto-incrementing counter as suffix.
110+
CorrelationIdSupplier CorrelationIdSupplier
105111
// Function to extract correlation IDs from replies.
112+
//
113+
// Optional. If not set, the `CorrelationID` message property will be used.
106114
CorrelationIdExtractor CorrelationIdExtractor
107115
// Function to modify requests before they are sent.
116+
//
117+
// Optional. If not set, the default `RequestPostProcessor` assigns the correlation ID to the `MessageID` property.
108118
RequestPostProcessor RequestPostProcessor
109119
// The timeout for requests.
120+
//
121+
// Optional. If not set, a default timeout of 30 seconds will be used.
110122
RequestTimeout time.Duration
111123
}
112124

@@ -124,7 +136,7 @@ type amqpRpcClient struct {
124136
publisher *Publisher
125137
consumer *Consumer
126138
requestPostProcessor RequestPostProcessor
127-
correlationIdSupplier CorrectionIdSupplier
139+
correlationIdSupplier CorrelationIdSupplier
128140
correlationIdExtractor CorrelationIdExtractor
129141
requestTimeout time.Duration
130142
mu sync.Mutex

0 commit comments

Comments
 (0)