Skip to content

Commit 704509c

Browse files
Implement super stream ha producer (#401)
* Implement a super stream HA producer and HA consumer. Closes: #400 Signed-off-by: Gabriele Santomaggio <[email protected]> --------- Signed-off-by: Gabriele Santomaggio <[email protected]> Co-authored-by: Alberto Moretti <[email protected]>
1 parent e02e667 commit 704509c

16 files changed

+870
-123
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,4 @@ dist/
2929
perfTest/perfTest
3030
go.dev/
3131
local_ex/
32+
compose/ha_tls/data*

.golangci.yml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ linters:
2323
- unconvert
2424
- unused
2525
- whitespace
26+
exclusions:
27+
paths:
28+
- local_ex
2629
settings:
2730
gosec:
2831
excludes:
@@ -33,7 +36,6 @@ linters:
3336
- name: unexported-return
3437
disabled: true
3538
- name: unused-parameter
36-
3739
formatters:
3840
enable:
3941
- gofmt
@@ -43,4 +45,4 @@ formatters:
4345
simplify: false
4446
rewrite-rules:
4547
- pattern: interface{}
46-
replacement: any
48+
replacement: any

compose/ha_tls/docker-compose.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ services:
1717
volumes:
1818
- ./conf/:/etc/rabbitmq/
1919
- "./tls-gen/basic/result/:/certs"
20+
- ./data0/:/var/lib/rabbitmq/
2021
rabbit_node1:
2122
environment:
2223
- RABBITMQ_ERLANG_COOKIE='secret_cookie'
@@ -34,6 +35,7 @@ services:
3435
volumes:
3536
- ./conf/:/etc/rabbitmq/
3637
- "./tls-gen/basic/result/:/certs"
38+
- ./data1/:/var/lib/rabbitmq/
3739
rabbit_node2:
3840
environment:
3941
- RABBITMQ_ERLANG_COOKIE='secret_cookie'
@@ -51,6 +53,7 @@ services:
5153
volumes:
5254
- ./conf/:/etc/rabbitmq/
5355
- "./tls-gen/basic/result/:/certs"
56+
- ./data2/:/var/lib/rabbitmq/
5457
haproxy:
5558
image: haproxy-rabbitmq-cluster
5659
# container_name: haproxy

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ Stream examples
1616
- [Single Active Consumer](./single_active_consumer) - Single Active Consumer example
1717
- [Reliable](./reliable) - Reliable Producer and Reliable Consumer example
1818
- [Super Stream](./super_stream) - Super Stream example with Single Active Consumer
19+
- [Reliable Super Stream started](./reliable_super_stream_getting_started) - Reliable Super Stream getting started example
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package main
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
8+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
9+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
10+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
11+
)
12+
13+
func main() {
14+
fmt.Printf("Getting started with Super Stream client for RabbitMQ \n")
15+
16+
// Create the environment. You can set the log level to DEBUG for more information
17+
// stream.SetLevelInfo(logs.DEBUG)
18+
// the environment is the connection to the broker(s)
19+
env, err := stream.NewEnvironment(stream.NewEnvironmentOptions().
20+
SetHost("localhost").
21+
SetPort(5552))
22+
if err != nil {
23+
fmt.Printf("Error creating environment: %v\n", err)
24+
return
25+
}
26+
27+
// Create a super stream
28+
streamName := "my-super-stream"
29+
// It is highly recommended to define the stream retention policy
30+
err = env.DeclareSuperStream(streamName, stream.NewPartitionsOptions(3).
31+
SetMaxLengthBytes(stream.ByteCapacity{}.GB(2)))
32+
33+
// ignore the error if the stream already exists
34+
if err != nil && !errors.Is(err, stream.StreamAlreadyExists) {
35+
fmt.Printf("Error declaring stream: %v\n", err)
36+
return
37+
}
38+
39+
// declare the reliable consumer using the package ha
40+
consumer, err := ha.NewReliableSuperStreamConsumer(env, streamName,
41+
// handler where the messages will be processed
42+
func(_ stream.ConsumerContext, message *amqp.Message) {
43+
fmt.Printf("Message received: %s\n", message.GetData())
44+
},
45+
// start from the beginning of the stream
46+
stream.NewSuperStreamConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()),
47+
)
48+
49+
if err != nil {
50+
fmt.Printf("Error creating consumer: %v\n", err)
51+
return
52+
}
53+
54+
// Create the reliable producer using the package ha
55+
producer, err := ha.NewReliableSuperStreamProducer(env, streamName,
56+
// we leave the default options
57+
stream.NewSuperStreamProducerOptions(stream.NewHashRoutingStrategy(func(message message.StreamMessage) string {
58+
return message.GetMessageProperties().MessageID.(string)
59+
})),
60+
// handler for the confirmation of the messages
61+
func(messageConfirm []*stream.PartitionPublishConfirm) {
62+
for _, msgs := range messageConfirm {
63+
for _, msg := range msgs.ConfirmationStatus {
64+
if msg.IsConfirmed() {
65+
fmt.Printf("message %s confirmed \n", msg.GetMessage().GetData())
66+
} else {
67+
fmt.Printf("message %s failed \n", msg.GetMessage().GetData())
68+
}
69+
}
70+
}
71+
})
72+
73+
if err != nil {
74+
fmt.Printf("Error creating producer: %v\n", err)
75+
return
76+
}
77+
78+
// Send a message
79+
for i := range 10 {
80+
msg := amqp.NewMessage([]byte(fmt.Sprintf("Hello stream:%d", i)))
81+
msg.Properties = &amqp.MessageProperties{
82+
MessageID: fmt.Sprintf("msg-%d", i),
83+
}
84+
err = producer.Send(msg)
85+
if err != nil {
86+
fmt.Printf("Error sending message: %v\n", err)
87+
return
88+
}
89+
}
90+
91+
// press any key to exit
92+
fmt.Printf("Press any close the producer, consumer and environment\n")
93+
_, _ = fmt.Scanln()
94+
95+
//// Close the producer
96+
err = producer.Close()
97+
if err != nil {
98+
fmt.Printf("Error closing producer: %v\n", err)
99+
}
100+
101+
// Close the consumer
102+
err = consumer.Close()
103+
if err != nil {
104+
fmt.Printf("Error closing consumer: %v\n", err)
105+
}
106+
107+
err = env.DeleteSuperStream(streamName)
108+
if err != nil {
109+
fmt.Printf("Error deleting stream: %v\n", err)
110+
}
111+
112+
// Close the environment
113+
err = env.Close()
114+
if err != nil {
115+
fmt.Printf("Error closing environment: %s\n", err)
116+
}
117+
}

pkg/ha/ha_consumer.go

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,7 @@ type ReliableConsumer struct {
3030
}
3131

3232
func (c *ReliableConsumer) GetStatusAsString() string {
33-
switch c.GetStatus() {
34-
case StatusOpen:
35-
return "Open"
36-
case StatusClosed:
37-
return "Closed"
38-
case StatusStreamDoesNotExist:
39-
return "StreamDoesNotExist"
40-
case StatusReconnecting:
41-
return "Reconnecting"
42-
default:
43-
return "Unknown"
44-
}
33+
return getStatusAsString(c)
4534
}
4635

4736
func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
@@ -51,7 +40,7 @@ func (c *ReliableConsumer) handleNotifyClose(channelClose stream.ChannelClose) {
5140
c.setStatus(StatusReconnecting)
5241
logs.LogWarn("[Reliable] - %s closed unexpectedly %s.. Reconnecting..", c.getInfo(), event.Reason)
5342
c.bootstrap = false
54-
err, reconnected := retry(1, c)
43+
err, reconnected := retry(1, c, c.GetStreamName())
5544
if err != nil {
5645
logs.LogInfo(""+
5746
"[Reliable] - %s won't be reconnected. Error: %s", c.getInfo(), err)
@@ -115,7 +104,7 @@ func (c *ReliableConsumer) GetStreamName() string {
115104
return c.streamName
116105
}
117106

118-
func (c *ReliableConsumer) getNewInstance() newEntityInstance {
107+
func (c *ReliableConsumer) getNewInstance(_ string) newEntityInstance {
119108
return c.newConsumer
120109
}
121110

pkg/ha/ha_publisher.go

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package ha
22

33
import (
4-
"errors"
54
"fmt"
65
"strings"
76
"sync"
@@ -30,7 +29,7 @@ func (p *ReliableProducer) handleNotifyClose(channelClose stream.ChannelClose) {
3029
waitTime := randomWaitWithBackoff(1)
3130
logs.LogWarn("[Reliable] - %s closed unexpectedly.. Reconnecting in %d milliseconds waiting pending messages", p.getInfo(), waitTime)
3231
time.Sleep(time.Duration(waitTime) * time.Millisecond)
33-
err, reconnected := retry(1, p)
32+
err, reconnected := retry(1, p, p.GetStreamName())
3433
if err != nil {
3534
logs.LogInfo(
3635
"[Reliable] - %s won't be reconnected. Error: %s", p.getInfo(), err)
@@ -113,62 +112,27 @@ func (p *ReliableProducer) newProducer() error {
113112
return err
114113
}
115114

116-
func (p *ReliableProducer) isReadyToSend() error {
117-
if p.GetStatus() == StatusStreamDoesNotExist {
118-
return stream.StreamDoesNotExist
119-
}
120-
121-
if p.GetStatus() == StatusClosed {
122-
return fmt.Errorf("%s is closed", p.getInfo())
123-
}
124-
125-
if p.GetStatus() == StatusReconnecting {
126-
logs.LogDebug("[Reliable] %s is reconnecting. The send is blocked", p.getInfo())
127-
p.reconnectionSignal.L.Lock()
128-
p.reconnectionSignal.Wait()
129-
p.reconnectionSignal.L.Unlock()
130-
logs.LogDebug("[Reliable] %s reconnected. The send is unlocked", p.getInfo())
131-
}
132-
133-
return nil
134-
}
135-
136-
func (p *ReliableProducer) checkWriteError(errW error) error {
137-
if errW != nil {
138-
switch {
139-
case errors.Is(errW, stream.FrameTooLarge):
140-
{
141-
return stream.FrameTooLarge
142-
}
143-
default:
144-
time.Sleep(500 * time.Millisecond)
145-
logs.LogError("[Reliable] %s - error during send %s", p.getInfo(), errW.Error())
146-
}
147-
}
148-
return nil
149-
}
150-
151115
func (p *ReliableProducer) Send(message message.StreamMessage) error {
152-
if err := p.isReadyToSend(); err != nil {
116+
if err := isReadyToSend(p, p.reconnectionSignal); err != nil {
153117
return err
154118
}
155119
p.mutex.Lock()
156120
errW := p.producer.Send(message)
157121
p.mutex.Unlock()
158122

159-
return p.checkWriteError(errW)
123+
return checkWriteError(p, errW)
160124
}
161125

162126
func (p *ReliableProducer) BatchSend(batchMessages []message.StreamMessage) error {
163-
if err := p.isReadyToSend(); err != nil {
127+
if err := isReadyToSend(p, p.reconnectionSignal); err != nil {
164128
return err
165129
}
166130

167131
p.mutex.Lock()
168132
errW := p.producer.BatchSend(batchMessages)
169133
p.mutex.Unlock()
170134

171-
return p.checkWriteError(errW)
135+
return checkWriteError(p, errW)
172136
}
173137

174138
func (p *ReliableProducer) IsOpen() bool {
@@ -184,18 +148,7 @@ func (p *ReliableProducer) GetStatus() int {
184148
}
185149

186150
func (p *ReliableProducer) GetStatusAsString() string {
187-
switch p.GetStatus() {
188-
case StatusOpen:
189-
return "Open"
190-
case StatusClosed:
191-
return "Closed"
192-
case StatusStreamDoesNotExist:
193-
return "StreamDoesNotExist"
194-
case StatusReconnecting:
195-
return "Reconnecting"
196-
default:
197-
return "Unknown"
198-
}
151+
return getStatusAsString(p)
199152
}
200153

201154
// IReliable interface
@@ -214,7 +167,7 @@ func (p *ReliableProducer) getEnv() *stream.Environment {
214167
return p.env
215168
}
216169

217-
func (p *ReliableProducer) getNewInstance() newEntityInstance {
170+
func (p *ReliableProducer) getNewInstance(_ string) newEntityInstance {
218171
return p.newProducer
219172
}
220173

0 commit comments

Comments
 (0)