Skip to content

RPC high level constructs #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ set -o xtrace
script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
readonly rabbitmq_image=rabbitmq:4.1.0-beta.4-management-alpine
readonly rabbitmq_image=rabbitmq:4.1-management-alpine


readonly docker_name_prefix='rabbitmq-amqp-go-client'
Expand Down
12 changes: 12 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@ go.work.sum
coverage.txt
.DS_Store
.ci/ubuntu/log/rabbitmq.log

# Visual Studio Code
.vscode/*
!.vscode/settings.json
!.vscode/tasks.json
!.vscode/launch.json
!.vscode/extensions.json
!.vscode/*.code-snippets
!*.code-workspace

# Built Visual Studio Code Extensions
*.vsix
19 changes: 11 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,25 @@ module github.com/rabbitmq/rabbitmq-amqp-go-client

go 1.23.0

toolchain go1.24.5

require (
github.com/Azure/go-amqp v1.4.0
github.com/golang-jwt/jwt/v5 v5.2.2
github.com/google/uuid v1.6.0
github.com/onsi/ginkgo/v2 v2.22.1
github.com/onsi/gomega v1.36.2
github.com/onsi/ginkgo/v2 v2.23.4
github.com/onsi/gomega v1.38.0
)

require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/text v0.23.0 // indirect
golang.org/x/tools v0.28.0 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/tools v0.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
47 changes: 28 additions & 19 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,31 +10,40 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8=
github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad h1:a6HEuzUHeKH6hwfN/ZoQgRgVIWFJljSWa/zetS2WTvg=
github.com/google/pprof v0.0.0-20241210010833-40e02aabc2ad/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J0b1vyeLSOYI8bm5wbJM/8yDe8=
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/onsi/ginkgo/v2 v2.22.1 h1:QW7tbJAUDyVDVOM5dFa7qaybo+CRfR7bemlQUN6Z8aM=
github.com/onsi/ginkgo/v2 v2.22.1/go.mod h1:S6aTpoRsSq2cZOd+pssHAlKW/Q/jZt6cPrPlnj4a1xM=
github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8=
github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/onsi/ginkgo/v2 v2.23.4 h1:ktYTpKJAVZnDT4VjxSbiBenUjmlL/5QkBEocaWXiQus=
github.com/onsi/ginkgo/v2 v2.23.4/go.mod h1:Bt66ApGPBFzHyR+JO10Zbt0Gsp4uWxu5mIOTusL46e8=
github.com/onsi/gomega v1.38.0 h1:c/WX+w8SLAinvuKKQFh77WEucCnPk4j2OTUr7lt7BeY=
github.com/onsi/gomega v1.38.0/go.mod h1:OcXcwId0b9QsE7Y49u+BTrL4IdKOBOKnD6VQNTJEB6o=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc=
golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
145 changes: 143 additions & 2 deletions pkg/rabbitmqamqp/amqp_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"crypto/tls"
"fmt"
"github.com/Azure/go-amqp"
"github.com/google/uuid"
"math/rand"
"sync"
"sync/atomic"
"time"

"github.com/Azure/go-amqp"
"github.com/google/uuid"
)

type AmqpAddress struct {
Expand Down Expand Up @@ -168,6 +169,146 @@ func (a *AmqpConnection) NewConsumer(ctx context.Context, queueName string, opti
return newConsumer(ctx, a, destinationAdd, options)
}

// NewRpcServer creates a new RPC server that processes requests from the
// specified queue. The requestQueue in options is mandatory, while other
// fields are optional and will use defaults if not provided.
func (a *AmqpConnection) NewRpcServer(ctx context.Context, options RpcServerOptions) (RpcServer, error) {
if err := options.validate(); err != nil {
return nil, fmt.Errorf("rpc server options validation: %w", err)
}

// Create consumer for receiving requests
// consumer, err := a.NewConsumer(ctx, options.RequestQueue, nil)
consumer, err := a.NewConsumer(ctx, options.RequestQueue, &ConsumerOptions{InitialCredits: -1})
if err != nil {
return nil, fmt.Errorf("failed to create consumer: %w", err)
}
consumer.issueCredits(1)

// Create publisher for sending replies
publisher, err := a.NewPublisher(ctx, nil, nil)
if err != nil {
consumer.Close(ctx) // cleanup consumer on failure
return nil, fmt.Errorf("failed to create publisher: %w", err)
}

// Set defaults for optional fields
handler := options.Handler
if handler == nil {
handler = noOpHandler
}

correlationIdExtractor := options.CorrelationIdExtractor
if correlationIdExtractor == nil {
correlationIdExtractor = defaultCorrelationIdExtractor
}

replyPostProcessor := options.ReplyPostProcessor
if replyPostProcessor == nil {
replyPostProcessor = defaultReplyPostProcessor
}

server := &amqpRpcServer{
requestHandler: handler,
requestQueue: options.RequestQueue,
publisher: publisher,
consumer: consumer,
correlationIdExtractor: correlationIdExtractor,
replyPostProcessor: replyPostProcessor,
}
go server.handle()

return server, nil
}

// NewRpcClient creates a new RPC client that sends requests to the specified queue
// and receives replies on a dynamically created reply queue.
func (a *AmqpConnection) NewRpcClient(ctx context.Context, options *RpcClientOptions) (RpcClient, error) {
if options == nil {
return nil, fmt.Errorf("options cannot be nil")
}
if options.RequestQueueName == "" {
return nil, fmt.Errorf("requestQueueName is mandatory")
}

// Create publisher for sending requests
requestQueue := &QueueAddress{
Queue: options.RequestQueueName,
}
publisher, err := a.NewPublisher(ctx, requestQueue, nil)
if err != nil {
return nil, fmt.Errorf("failed to create publisher: %w", err)
}

replyQueueName := options.ReplyToQueueName
if len(replyQueueName) == 0 {
replyQueueName = generateNameWithDefaultPrefix()
}

// Declare reply queue as exclusive, auto-delete classic queue
q, err := a.management.DeclareQueue(ctx, &ClassicQueueSpecification{
Name: replyQueueName,
IsExclusive: true,
IsAutoDelete: true,
})
if err != nil {
return nil, fmt.Errorf("failed to declare reply queue: %w", err)
}

// Set defaults for optional fields
correlationIdSupplier := options.CorrelationIdSupplier
if correlationIdSupplier == nil {
correlationIdSupplier = newRandomUuidCorrelationIdSupplier()
}

requestPostProcessor := options.RequestPostProcessor
if requestPostProcessor == nil {
requestPostProcessor = func(request *amqp.Message, correlationID any) *amqp.Message {
if request.Properties == nil {
request.Properties = &amqp.MessageProperties{}
}
request.Properties.MessageID = correlationID
return request
}
}

requestTimeout := options.RequestTimeout
if requestTimeout == 0 {
requestTimeout = DefaultRpcRequestTimeout
}

correlationIdExtractor := options.CorrelationIdExtractor
if correlationIdExtractor == nil {
correlationIdExtractor = defaultReplyCorrelationIdExtractor
}

client := &amqpRpcClient{
requestQueue: requestQueue,
replyToQueue: &QueueAddress{Queue: replyQueueName},
publisher: publisher,
requestPostProcessor: requestPostProcessor,
correlationIdSupplier: correlationIdSupplier,
correlationIdExtractor: correlationIdExtractor,
requestTimeout: requestTimeout,
pendingRequests: make(map[any]*outstandingRequest),
done: make(chan struct{}),
}

// Create consumer for receiving replies
consumer, err := a.NewConsumer(ctx, q.Name(), nil)
if err != nil {
_ = publisher.Close(ctx) // cleanup publisher on failure
return nil, fmt.Errorf("failed to create consumer: %w", err)
}

client.consumer = consumer

go client.messageReceivedHandler()
go client.requestTimeoutTask()

return client, nil
}

// Dial connect to the AMQP 1.0 server using the provided connectionSettings
// Returns a pointer to the new AmqpConnection if successful else an error.
func Dial(ctx context.Context, address string, connOptions *AmqpConnOptions) (*AmqpConnection, error) {
Expand Down
50 changes: 49 additions & 1 deletion pkg/rabbitmqamqp/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package rabbitmqamqp
import (
"context"
"fmt"
"sync/atomic"

"github.com/Azure/go-amqp"
"github.com/google/uuid"
"sync/atomic"
)

type DeliveryContext struct {
Expand Down Expand Up @@ -65,6 +66,14 @@ func (dc *DeliveryContext) RequeueWithAnnotations(ctx context.Context, annotatio
})
}

type consumerState byte

const (
consumerStateRunning consumerState = iota
consumerStatePausing
consumerStatePaused
)

type Consumer struct {
receiver atomic.Pointer[amqp.Receiver]
connection *AmqpConnection
Expand All @@ -79,6 +88,8 @@ type Consumer struct {
For the AMQP queues it is just ignored.
*/
currentOffset int64

state consumerState
}

func (c *Consumer) Id() string {
Expand Down Expand Up @@ -148,3 +159,40 @@ func (c *Consumer) Close(ctx context.Context) error {
c.connection.entitiesTracker.removeConsumer(c)
return c.receiver.Load().Close(ctx)
}

// pause drains the credits of the receiver and stops issuing new credits.
func (c *Consumer) pause(ctx context.Context) error {
if c.state == consumerStatePaused || c.state == consumerStatePausing {
return nil
}
c.state = consumerStatePausing
err := c.receiver.Load().DrainCredit(ctx, nil)
if err != nil {
c.state = consumerStateRunning
return fmt.Errorf("error draining credits: %w", err)
}
c.state = consumerStatePaused
return nil
}

// unpause requests new credits using the initial credits value of the options.
func (c *Consumer) unpause(credits uint32) error {
if c.state == consumerStateRunning {
return nil
}
err := c.receiver.Load().IssueCredit(credits)
if err != nil {
return fmt.Errorf("error issuing credits: %w", err)
}
c.state = consumerStateRunning
return nil
}

func (c *Consumer) isPausedOrPausing() bool {
return c.state != consumerStateRunning
}

// issueCredits issues more credits on the receiver.
func (c *Consumer) issueCredits(credits uint32) error {
return c.receiver.Load().IssueCredit(credits)
}
1 change: 1 addition & 0 deletions pkg/rabbitmqamqp/amqp_consumer_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ var _ = Describe("Consumer stream test", func() {
/*
Test the consumer should filter messages based on properties
*/
// TODO: defer cleanup to delete the stream queue
qName := generateName("consumer should filter messages based on properties")
qName += time.Now().String()
connection, err := Dial(context.Background(), "amqp://", nil)
Expand Down
Loading
Loading