Skip to content

Commit d4a63d5

Browse files
committed
refactor(events): replace time.Sleep with ReadySignal for deterministic test synchronization
1 parent b893e3e commit d4a63d5

File tree

7 files changed

+79
-44
lines changed

7 files changed

+79
-44
lines changed

internal/events/eventstest/contract_suite.go

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@ import (
1515

1616
// PublisherSubscriberFactory creates a matched Publisher and Subscriber
1717
// pair for testing. The factory is called once per test case.
18-
type PublisherSubscriberFactory func(t *testing.T) (events.Publisher, events.Subscriber)
18+
//
19+
// The returned ready channel is closed (or receives a value) when the
20+
// subscriber is connected and ready to receive notifications. For
21+
// memory.Bus, return a pre-closed channel. For PostgreSQL, use
22+
// SubscriberConfig.ReadySignal.
23+
type PublisherSubscriberFactory func(t *testing.T) (events.Publisher, events.Subscriber, <-chan struct{})
1924

2025
// ContractSuite verifies that any Publisher+Subscriber implementation
2126
// satisfies the events library's behavioral contract. Run it against
@@ -25,18 +30,28 @@ type ContractSuite struct {
2530
Factory PublisherSubscriberFactory
2631
}
2732

28-
func (s *ContractSuite) TestSinglePublishSubscribe() {
29-
pub, sub := s.Factory(s.T())
30-
ch := sub.Subscribe(events.ChannelInputReceived)
33+
// startListening starts sub.Listen and waits for the subscriber to be ready.
34+
func (s *ContractSuite) startListening(sub events.Subscriber, ready <-chan struct{}) {
3135
ctx := s.T().Context()
3236
go sub.Listen(ctx) //nolint:errcheck
33-
time.Sleep(200 * time.Millisecond)
37+
select {
38+
case <-ready:
39+
case <-time.After(5 * time.Second):
40+
s.Fail("subscriber did not become ready")
41+
}
42+
}
43+
44+
func (s *ContractSuite) TestSinglePublishSubscribe() {
45+
pub, sub, ready := s.Factory(s.T())
46+
ch := sub.Subscribe(events.ChannelInputReceived)
47+
s.startListening(sub, ready)
3448

3549
expected := events.Notification{
3650
Channel: events.ChannelInputReceived,
3751
ApplicationID: 42,
3852
EpochIndex: 7,
3953
}
54+
ctx := s.T().Context()
4055
pub.Publish(ctx, expected)
4156

4257
select {
@@ -49,12 +64,11 @@ func (s *ContractSuite) TestSinglePublishSubscribe() {
4964
}
5065

5166
func (s *ContractSuite) TestChannelIsolation() {
52-
pub, sub := s.Factory(s.T())
67+
pub, sub, ready := s.Factory(s.T())
5368
ch := sub.Subscribe(events.ChannelInputReceived)
54-
ctx := s.T().Context()
55-
go sub.Listen(ctx) //nolint:errcheck
56-
time.Sleep(200 * time.Millisecond)
69+
s.startListening(sub, ready)
5770

71+
ctx := s.T().Context()
5872
// Publish on a different channel.
5973
pub.Publish(ctx, events.Notification{
6074
Channel: events.ChannelClaimComputed,
@@ -75,12 +89,11 @@ func (s *ContractSuite) TestChannelIsolation() {
7589
}
7690

7791
func (s *ContractSuite) TestBufferOverflowDropsWithoutBlocking() {
78-
pub, sub := s.Factory(s.T())
92+
pub, sub, ready := s.Factory(s.T())
7993
ch := sub.Subscribe(events.ChannelInputReceived)
80-
ctx := s.T().Context()
81-
go sub.Listen(ctx) //nolint:errcheck
82-
time.Sleep(200 * time.Millisecond)
94+
s.startListening(sub, ready)
8395

96+
ctx := s.T().Context()
8497
// Publish more than buffer size (64) without reading.
8598
for i := range 100 {
8699
pub.Publish(ctx, events.Notification{
@@ -99,13 +112,12 @@ func (s *ContractSuite) TestBufferOverflowDropsWithoutBlocking() {
99112
}
100113

101114
func (s *ContractSuite) TestMultipleSubscriptions() {
102-
pub, sub := s.Factory(s.T())
115+
pub, sub, ready := s.Factory(s.T())
103116
ch1 := sub.Subscribe(events.ChannelInputReceived)
104117
ch2 := sub.Subscribe(events.ChannelClaimComputed)
105-
ctx := s.T().Context()
106-
go sub.Listen(ctx) //nolint:errcheck
107-
time.Sleep(200 * time.Millisecond)
118+
s.startListening(sub, ready)
108119

120+
ctx := s.T().Context()
109121
pub.Publish(ctx, events.Notification{
110122
Channel: events.ChannelInputReceived,
111123
ApplicationID: 1,
@@ -130,13 +142,12 @@ func (s *ContractSuite) TestMultipleSubscriptions() {
130142
}
131143

132144
func (s *ContractSuite) TestSubscribeWithFilterByAppID() {
133-
pub, sub := s.Factory(s.T())
145+
pub, sub, ready := s.Factory(s.T())
134146
filter := events.SubscriptionFilter{ApplicationIDs: []int64{42}}
135147
ch := sub.SubscribeWithFilter(filter, events.ChannelInputReceived)
136-
ctx := s.T().Context()
137-
go sub.Listen(ctx) //nolint:errcheck
138-
time.Sleep(200 * time.Millisecond)
148+
s.startListening(sub, ready)
139149

150+
ctx := s.T().Context()
140151
// Publish for a non-matching app.
141152
pub.Publish(ctx, events.Notification{
142153
Channel: events.ChannelInputReceived,

internal/events/eventstest/property_suite.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ type PropertySuite struct {
2424
suite.Suite
2525
Factory PublisherSubscriberFactory
2626

27-
// SettleTime is the time to wait after subscribe+listen for connection
28-
// readiness, and after publish for delivery. For memory.Bus this is 0;
29-
// for PG LISTEN/NOTIFY this should be ~200ms.
27+
// SettleTime is the time to wait after publish for async delivery.
28+
// For memory.Bus this is 0; for PG LISTEN/NOTIFY this should be ~200ms.
29+
// Connection readiness uses the factory's ready channel instead.
3030
SettleTime time.Duration
3131
}
3232

@@ -37,12 +37,15 @@ func (s *PropertySuite) settle() {
3737
}
3838
}
3939

40-
// startListening starts sub.Listen in a background goroutine and waits
41-
// for the connection to be established. Must be called after Subscribe.
42-
func (s *PropertySuite) startListening(sub events.Subscriber) {
40+
// startListening starts sub.Listen and waits for the subscriber to be ready.
41+
func (s *PropertySuite) startListening(sub events.Subscriber, ready <-chan struct{}) {
4342
ctx := s.T().Context()
4443
go sub.Listen(ctx) //nolint:errcheck
45-
s.settle()
44+
select {
45+
case <-ready:
46+
case <-time.After(5 * time.Second):
47+
s.Fail("subscriber did not become ready")
48+
}
4649
}
4750

4851
// TestP1NoWorkLossUnderEventLoss verifies that for any sequence of published
@@ -53,9 +56,9 @@ func (s *PropertySuite) TestP1NoWorkLossUnderEventLoss() {
5356
n := rand.IntN(20) + 1 //nolint:gosec
5457
db := make(map[int64]bool)
5558

56-
pub, sub := s.Factory(s.T())
59+
pub, sub, ready := s.Factory(s.T())
5760
ch := sub.Subscribe(events.ChannelInputReceived)
58-
s.startListening(sub)
61+
s.startListening(sub, ready)
5962

6063
ctx := s.T().Context()
6164
for i := range n {
@@ -106,9 +109,9 @@ func (s *PropertySuite) TestP5ChannelIsolation() {
106109
allChannels := events.AllChannels()
107110

108111
for _, subChannel := range allChannels {
109-
pub, sub := s.Factory(s.T())
112+
pub, sub, ready := s.Factory(s.T())
110113
ch := sub.Subscribe(subChannel)
111-
s.startListening(sub)
114+
s.startListening(sub, ready)
112115

113116
ctx := s.T().Context()
114117
for _, pubChannel := range allChannels {
@@ -142,14 +145,14 @@ func (s *PropertySuite) TestP6PipelineEventualDelivery() {
142145
const syncInterval = 3
143146

144147
for range 10 {
145-
pub, sub := s.Factory(s.T())
148+
pub, sub, ready := s.Factory(s.T())
146149
ctx := s.T().Context()
147150

148151
// Set up a 3-stage pipeline: input -> processed -> computed.
149152
stage1Ch := sub.Subscribe(events.ChannelInputReceived)
150153
stage2Ch := sub.Subscribe(events.ChannelInputsProcessed)
151154
stage3Ch := sub.Subscribe(events.ChannelClaimComputed)
152-
s.startListening(sub)
155+
s.startListening(sub, ready)
153156

154157
// Simulated DB: each stage writes its output here.
155158
stage1Done := false

internal/events/memory/bus_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,9 +202,11 @@ func TestBusDefaultBufferSize(t *testing.T) {
202202

203203
func TestMemoryBusContract(t *testing.T) {
204204
suite.Run(t, &eventstest.ContractSuite{
205-
Factory: func(_ *testing.T) (events.Publisher, events.Subscriber) {
205+
Factory: func(_ *testing.T) (events.Publisher, events.Subscriber, <-chan struct{}) {
206206
bus := NewBus(64)
207-
return bus, bus
207+
ready := make(chan struct{})
208+
close(ready) // memory bus is immediately ready
209+
return bus, bus, ready
208210
},
209211
})
210212
}

internal/events/postgres/property_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,18 @@ func TestPropertySuitePostgres(t *testing.T) {
2626
connStr := getTestConnString(t)
2727

2828
suite.Run(t, &eventstest.PropertySuite{
29-
Factory: func(t *testing.T) (events.Publisher, events.Subscriber) {
29+
Factory: func(t *testing.T) (events.Publisher, events.Subscriber, <-chan struct{}) {
3030
ctx := context.Background()
3131
pool, err := pgxpool.New(ctx, connStr)
3232
require.NoError(t, err)
3333
t.Cleanup(pool.Close)
3434

35+
ready := make(chan struct{}, 1)
3536
pub := NewPublisher(pool, slog.Default())
36-
sub := NewSubscriber(connStr, slog.Default(), nil)
37+
sub := NewSubscriber(connStr, slog.Default(), &SubscriberConfig{ReadySignal: ready})
3738
t.Cleanup(func() { _ = sub.Close() })
3839

39-
return pub, sub
40+
return pub, sub, ready
4041
},
4142
SettleTime: 200 * time.Millisecond,
4243
})

internal/events/postgres/subscriber.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ type SubscriberConfig struct {
4646
// Default: 64. Increase for systems with many applications where a single
4747
// block could generate notifications for all of them.
4848
BufferSize int
49+
50+
// ReadySignal, if non-nil, receives a value each time the subscriber
51+
// successfully connects and issues LISTEN commands. This enables tests
52+
// to wait for readiness without time.Sleep.
53+
ReadySignal chan<- struct{}
4954
}
5055

5156
// subscription holds the delivery channel, subscribed channels, and optional
@@ -64,6 +69,7 @@ type Subscriber struct {
6469
logger *slog.Logger
6570
bufferSize int
6671
heartbeatTimeout time.Duration
72+
readySignal chan<- struct{}
6773

6874
mu sync.Mutex
6975
channels []events.Channel
@@ -74,19 +80,22 @@ type Subscriber struct {
7480
func NewSubscriber(connString string, logger *slog.Logger, cfg *SubscriberConfig) *Subscriber {
7581
heartbeat := defaultHeartbeatTimeout
7682
bufSize := defaultBufferSize
83+
var readySig chan<- struct{}
7784
if cfg != nil {
7885
if cfg.HeartbeatTimeout > 0 {
7986
heartbeat = cfg.HeartbeatTimeout
8087
}
8188
if cfg.BufferSize > 0 {
8289
bufSize = cfg.BufferSize
8390
}
91+
readySig = cfg.ReadySignal
8492
}
8593
return &Subscriber{
8694
connString: connString,
8795
logger: logger,
8896
bufferSize: bufSize,
8997
heartbeatTimeout: heartbeat,
98+
readySignal: readySig,
9099
}
91100
}
92101

@@ -197,6 +206,12 @@ func (s *Subscriber) listenLoop(
197206
}
198207

199208
s.logger.Info("Event listener connected", "channels", channels)
209+
if s.readySignal != nil {
210+
select {
211+
case s.readySignal <- struct{}{}:
212+
default:
213+
}
214+
}
200215

201216
for {
202217
waitCtx, cancel := context.WithTimeout(ctx, s.heartbeatTimeout)

internal/events/postgres/subscriber_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -361,17 +361,18 @@ func TestPostgresContract(t *testing.T) {
361361
connStr := getTestConnString(t)
362362

363363
suite.Run(t, &eventstest.ContractSuite{
364-
Factory: func(_ *testing.T) (events.Publisher, events.Subscriber) {
364+
Factory: func(_ *testing.T) (events.Publisher, events.Subscriber, <-chan struct{}) {
365365
ctx := context.Background()
366366
pool, err := pgxpool.New(ctx, connStr)
367367
require.NoError(t, err)
368368
t.Cleanup(pool.Close)
369369

370+
ready := make(chan struct{}, 1)
370371
pub := NewPublisher(pool, slog.Default())
371-
sub := NewSubscriber(connStr, slog.Default(), nil)
372+
sub := NewSubscriber(connStr, slog.Default(), &SubscriberConfig{ReadySignal: ready})
372373
t.Cleanup(func() { _ = sub.Close() })
373374

374-
return pub, sub
375+
return pub, sub, ready
375376
},
376377
})
377378
}

internal/events/property_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@ import (
2525
// in internal/events/postgres/property_test.go.
2626
func TestPropertySuiteMemory(t *testing.T) {
2727
suite.Run(t, &eventstest.PropertySuite{
28-
Factory: func(t *testing.T) (events.Publisher, events.Subscriber) {
28+
Factory: func(t *testing.T) (events.Publisher, events.Subscriber, <-chan struct{}) {
2929
bus := memory.NewBus(64)
3030
t.Cleanup(func() { _ = bus.Close() })
31-
return bus, bus
31+
ready := make(chan struct{})
32+
close(ready) // memory bus is immediately ready
33+
return bus, bus, ready
3234
},
3335
})
3436
}

0 commit comments

Comments
 (0)