Skip to content

Commit 2464931

Browse files
authored
Implement Filters (#38)
* Closes: Implement properties-filter and application-properties-filter #25 * Refactor the interfaces to be more coherent --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 8ffd1e6 commit 2464931

28 files changed

+624
-190
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ format:
1212

1313
vet:
1414
go vet ./pkg/rabbitmqamqp
15+
go vet ./docs/examples/...
1516

1617
STATICCHECK ?= $(GOBIN)/staticcheck
1718
STATICCHECK_VERSION ?= latest
1819
$(STATICCHECK):
1920
go install honnef.co/go/tools/cmd/staticcheck@$(STATICCHECK_VERSION)
2021
check: $(STATICCHECK)
2122
$(STATICCHECK) ./pkg/rabbitmqamqp
23+
$(STATICCHECK) ./docs/examples/...
2224

2325

2426

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Suitable for testing in pre-production environments.
1515

1616
## Documentation
1717

18-
- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries) (work in progress for this client)
18+
- [Client Guide](https://www.rabbitmq.com/client-libraries/amqp-client-libraries)
1919

2020

2121

docs/examples/getting_started/main.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func main() {
112112
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
113113
Exchange: exchangeName,
114114
Key: routingKey,
115-
}, "getting-started-publisher")
115+
}, nil)
116116
if err != nil {
117117
rmq.Error("Error creating publisher", err)
118118
return
@@ -129,17 +129,14 @@ func main() {
129129
switch publishResult.Outcome.(type) {
130130
case *rmq.StateAccepted:
131131
rmq.Info("[NewPublisher]", "Message accepted", publishResult.Message.Data[0])
132-
break
133132
case *rmq.StateReleased:
134133
rmq.Warn("[NewPublisher]", "Message was not routed", publishResult.Message.Data[0])
135-
break
136134
case *rmq.StateRejected:
137135
rmq.Warn("[NewPublisher]", "Message rejected", publishResult.Message.Data[0])
138136
stateType := publishResult.Outcome.(*rmq.StateRejected)
139137
if stateType.Error != nil {
140138
rmq.Warn("[NewPublisher]", "Message rejected with error: %v", stateType.Error)
141139
}
142-
break
143140
default:
144141
// these status are not supported. Leave it for AMQP 1.0 compatibility
145142
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes

docs/examples/publisher_msg_targets/publisher_msg_targets.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func main() {
3131
}
3232

3333
// create a publisher without a target
34-
publisher, err := amqpConnection.NewPublisher(context.TODO(), nil, "stream-publisher")
34+
publisher, err := amqpConnection.NewPublisher(context.TODO(), nil, nil)
3535
checkError(err)
3636

3737
// publish messages to the stream
@@ -55,7 +55,6 @@ func main() {
5555
switch publishResult.Outcome.(type) {
5656
case *amqp.StateAccepted:
5757
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
58-
break
5958
default:
6059
rmq.Warn("[Publisher]", "Message not accepted", publishResult.Message.Data[0])
6160
}

docs/examples/reliable/reliable.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func main() {
118118

119119
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.QueueAddress{
120120
Queue: queueName,
121-
}, "reliable-publisher")
121+
}, nil)
122122
if err != nil {
123123
rmq.Error("Error creating publisher", err)
124124
return
@@ -147,13 +147,10 @@ func main() {
147147
switch publishResult.Outcome.(type) {
148148
case *rmq.StateAccepted:
149149
atomic.AddInt32(&stateAccepted, 1)
150-
break
151150
case *rmq.StateReleased:
152151
atomic.AddInt32(&stateReleased, 1)
153-
break
154152
case *rmq.StateRejected:
155153
atomic.AddInt32(&stateRejected, 1)
156-
break
157154
default:
158155
// these status are not supported. Leave it for AMQP 1.0 compatibility
159156
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes

docs/examples/streams/streams.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func main() {
3535

3636
// create a stream publisher. In this case we use the QueueAddress to make the example
3737
// simple. So we use the default exchange here.
38-
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, "stream-publisher")
38+
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, nil)
3939
checkError(err)
4040

4141
// publish messages to the stream
@@ -47,17 +47,14 @@ func main() {
4747
switch publishResult.Outcome.(type) {
4848
case *rmq.StateAccepted:
4949
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
50-
break
5150
case *rmq.StateReleased:
5251
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
53-
break
5452
case *rmq.StateRejected:
5553
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
5654
stateType := publishResult.Outcome.(*rmq.StateRejected)
5755
if stateType.Error != nil {
5856
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
5957
}
60-
break
6158
default:
6259
// these status are not supported. Leave it for AMQP 1.0 compatibility
6360
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes

docs/examples/streams_filtering/streams_filtering.go

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func checkError(err error) {
1717

1818
func main() {
1919

20+
// see also: https://www.rabbitmq.com/blog/2024/12/13/amqp-filter-expressions
2021
rmq.Info("Golang AMQP 1.0 Streams example with filtering")
2122
queueStream := "stream-go-queue-filtering-" + time.Now().String()
2223
env := rmq.NewEnvironment([]string{"amqp://"}, nil)
@@ -35,7 +36,7 @@ func main() {
3536

3637
// create a stream publisher. In this case we use the QueueAddress to make the example
3738
// simple. So we use the default exchange here.
38-
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, "stream-publisher")
39+
publisher, err := amqpConnection.NewPublisher(context.TODO(), &rmq.QueueAddress{Queue: queueStream}, nil)
3940
checkError(err)
4041

4142
filters := []string{"MyFilter1", "MyFilter2", "MyFilter3", "MyFilter4"}
@@ -50,17 +51,14 @@ func main() {
5051
switch publishResult.Outcome.(type) {
5152
case *rmq.StateAccepted:
5253
rmq.Info("[Publisher]", "Message accepted", publishResult.Message.Data[0])
53-
break
5454
case *rmq.StateReleased:
5555
rmq.Warn("[Publisher]", "Message was not routed", publishResult.Message.Data[0])
56-
break
5756
case *rmq.StateRejected:
5857
rmq.Warn("[Publisher]", "Message rejected", publishResult.Message.Data[0])
5958
stateType := publishResult.Outcome.(*rmq.StateRejected)
6059
if stateType.Error != nil {
6160
rmq.Warn("[Publisher]", "Message rejected with error: %v", stateType.Error)
6261
}
63-
break
6462
default:
6563
// these status are not supported. Leave it for AMQP 1.0 compatibility
6664
// see: https://www.rabbitmq.com/docs/next/amqp#outcomes
@@ -76,7 +74,25 @@ func main() {
7674

7775
// add a filter to the consumer, in this case we use only the filter values
7876
// MyFilter1 and MyFilter2. So all other messages won't be received
79-
Filters: []string{"MyFilter1", "MyFilter2"},
77+
StreamFilterOptions: &rmq.StreamFilterOptions{
78+
Values: []string{"MyFilter1", "MyFilter2"},
79+
// it is also possible to filter by application properties or message properties
80+
// you can create filters like:
81+
// msg.ApplicationProperties = map[string]interface{}{"key3": "value3"}
82+
// during the publish you can do something like:
83+
// msg.ApplicationProperties = map[string]interface{}{"key1": "value1"}
84+
// publisher.Publish(context.Background(), msg)
85+
//ApplicationProperties: nil,
86+
87+
// or here you can filter by message properties
88+
// like:
89+
// msg.Properties = &amqp.MessageProperties{Subject: "MySubject"}
90+
// during the publish you can do something like:
91+
// msg.Properties = &amqp.MessageProperties{Subject: "MySubject"}
92+
// publisher.Publish(context.Background(), msg)
93+
//Properties: nil,
94+
// see amqp_consumer_stream_test.go for more examples
95+
},
8096
})
8197
checkError(err)
8298

docs/examples/video/getting_started.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func main() {
6060
publisher, err := amqpConnection.NewPublisher(context.Background(), &rmq.ExchangeAddress{
6161
Exchange: exchangeName,
6262
Key: routingKey,
63-
}, "getting-started-publisher")
63+
}, nil)
6464
if err != nil {
6565
rmq.Error("Error creating publisher", err)
6666
return

pkg/rabbitmqamqp/address.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import (
66
"strings"
77
)
88

9-
// TargetAddress is an interface that represents an address that can be used to send messages to.
9+
// ITargetAddress is an interface that represents an address that can be used to send messages to.
1010
// It can be either a Queue or an Exchange with a routing key.
11-
type TargetAddress interface {
11+
type ITargetAddress interface {
1212
toAddress() (string, error)
1313
}
1414

pkg/rabbitmqamqp/address_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,3 @@ var _ = Describe("address builder test ", func() {
6060
})
6161
})
6262
})
63-
64-
// Helper function to create string pointers
65-
func stringPtr(s string) *string {
66-
return &s
67-
}

0 commit comments

Comments
 (0)