Skip to content

Commit 8af63f5

Browse files
authored
REP-6832 Always read latest-possible version of documents (#177)
This improves the prevention of stale reads to incorporate the latest-seen cluster time and to use majority read concern. This is particularly relevant when reading from secondaries, but it can also, because of [SERVER-53813](https://jira.mongodb.org/browse/SERVER-53813), affect reads from primaries. This removes a `lag` from a trace-level log in order to simplify things a bit. That data point is redundant with the periodically-reported lag in the logs anyway.
1 parent b83d295 commit 8af63f5

File tree

8 files changed

+169
-121
lines changed

8 files changed

+169
-121
lines changed

internal/verifier/change_reader.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ const (
3434
type changeReader interface {
3535
getWhichCluster() whichCluster
3636
getReadChannel() <-chan changeEventBatch
37-
getStartTimestamp() option.Option[bson.Timestamp]
37+
getStartTimestamp() bson.Timestamp
38+
getLastSeenClusterTime() option.Option[bson.Timestamp]
3839
getEventsPerSecond() option.Option[float64]
3940
getLag() option.Option[time.Duration]
4041
getBufferSaturation() float64
@@ -48,9 +49,8 @@ type changeReader interface {
4849
type ChangeReaderCommon struct {
4950
readerType whichCluster
5051

51-
lastChangeEventTime *bson.Timestamp
52-
logger *logger.Logger
53-
namespaces []string
52+
logger *logger.Logger
53+
namespaces []string
5454

5555
metaDB *mongo.Database
5656
watcherClient *mongo.Client
@@ -62,6 +62,8 @@ type ChangeReaderCommon struct {
6262
changeEventBatchChan chan changeEventBatch
6363
writesOffTs *util.Eventual[bson.Timestamp]
6464

65+
lastChangeEventTime *msync.TypedAtomic[option.Option[bson.Timestamp]]
66+
6567
startAtTs *bson.Timestamp
6668

6769
lag *msync.TypedAtomic[option.Option[time.Duration]]
@@ -70,12 +72,32 @@ type ChangeReaderCommon struct {
7072
onDDLEvent ddlEventHandling
7173
}
7274

75+
func newChangeReaderCommon(clusterName whichCluster) ChangeReaderCommon {
76+
return ChangeReaderCommon{
77+
readerType: clusterName,
78+
changeEventBatchChan: make(chan changeEventBatch, batchChanBufferSize),
79+
writesOffTs: util.NewEventual[bson.Timestamp](),
80+
lag: msync.NewTypedAtomic(option.None[time.Duration]()),
81+
lastChangeEventTime: msync.NewTypedAtomic(option.None[bson.Timestamp]()),
82+
batchSizeHistory: history.New[int](time.Minute),
83+
onDDLEvent: lo.Ternary(
84+
clusterName == dst,
85+
onDDLEventAllow,
86+
"",
87+
),
88+
}
89+
}
90+
7391
func (rc *ChangeReaderCommon) getWhichCluster() whichCluster {
7492
return rc.readerType
7593
}
7694

77-
func (rc *ChangeReaderCommon) getStartTimestamp() option.Option[bson.Timestamp] {
78-
return option.FromPointer(rc.startAtTs)
95+
func (rc *ChangeReaderCommon) getStartTimestamp() bson.Timestamp {
96+
if rc.startAtTs == nil {
97+
panic("no start timestamp yet?!?")
98+
}
99+
100+
return *rc.startAtTs
79101
}
80102

81103
func (rc *ChangeReaderCommon) setWritesOff(ts bson.Timestamp) {
@@ -90,6 +112,10 @@ func (rc *ChangeReaderCommon) getReadChannel() <-chan changeEventBatch {
90112
return rc.changeEventBatchChan
91113
}
92114

115+
func (rc *ChangeReaderCommon) getLastSeenClusterTime() option.Option[bson.Timestamp] {
116+
return rc.lastChangeEventTime.Load()
117+
}
118+
93119
// getBufferSaturation returns the reader’s internal buffer’s saturation level
94120
// as a fraction. If saturation rises, that means we’re reading events faster
95121
// than we can persist them.

internal/verifier/change_stream.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,28 @@ type ChangeStreamReader struct {
4747

4848
var _ changeReader = &ChangeStreamReader{}
4949

50+
func (v *Verifier) newChangeStreamReader(
51+
namespaces []string,
52+
cluster whichCluster,
53+
client *mongo.Client,
54+
clusterInfo util.ClusterInfo,
55+
) *ChangeStreamReader {
56+
common := newChangeReaderCommon(cluster)
57+
common.namespaces = namespaces
58+
common.readerType = cluster
59+
common.watcherClient = client
60+
common.clusterInfo = clusterInfo
61+
62+
common.logger = v.logger
63+
common.metaDB = v.metaClient.Database(v.metaDBName)
64+
65+
common.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken
66+
67+
csr := &ChangeStreamReader{ChangeReaderCommon: common}
68+
69+
return csr
70+
}
71+
5072
// GetChangeStreamFilter returns an aggregation pipeline that filters
5173
// namespaces as per configuration.
5274
//
@@ -193,11 +215,9 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
193215
return errors.Errorf("Change event lacks a namespace: %+v", changeEvents[eventsRead])
194216
}
195217

196-
if changeEvents[eventsRead].ClusterTime != nil &&
197-
(csr.lastChangeEventTime == nil ||
198-
csr.lastChangeEventTime.Before(*changeEvents[eventsRead].ClusterTime)) {
199-
200-
csr.lastChangeEventTime = changeEvents[eventsRead].ClusterTime
218+
eventTime := changeEvents[eventsRead].ClusterTime
219+
if eventTime != nil && csr.lastChangeEventTime.Load().OrZero().Before(*eventTime) {
220+
csr.lastChangeEventTime.Store(option.Some(*eventTime))
201221
latestEvent = option.Some(changeEvents[eventsRead])
202222
}
203223

@@ -230,9 +250,6 @@ func (csr *ChangeStreamReader) readAndHandleOneChangeEventBatch(
230250
events: changeEvents,
231251

232252
resumeToken: cs.ResumeToken(),
233-
234-
// NB: We know by now that OperationTime is non-nil.
235-
clusterTime: *sess.OperationTime(),
236253
}:
237254
}
238255

@@ -314,19 +331,19 @@ func (csr *ChangeStreamReader) iterateChangeStream(
314331

315332
if gotwritesOffTimestamp {
316333
csr.running = false
317-
if csr.lastChangeEventTime != nil {
318-
csr.startAtTs = csr.lastChangeEventTime
334+
if ts, has := csr.lastChangeEventTime.Load().Get(); has {
335+
csr.startAtTs = &ts
319336
}
320337

321338
break
322339
}
323340
}
324341

325342
infoLog := csr.logger.Info()
326-
if csr.lastChangeEventTime == nil {
327-
infoLog = infoLog.Str("lastEventTime", "none")
343+
if ts, has := csr.lastChangeEventTime.Load().Get(); has {
344+
infoLog = infoLog.Any("lastEventTime", ts)
328345
} else {
329-
infoLog = infoLog.Any("lastEventTime", *csr.lastChangeEventTime)
346+
infoLog = infoLog.Str("lastEventTime", "none")
330347
}
331348

332349
infoLog.

internal/verifier/change_stream_test.go

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -441,9 +441,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamResumability() {
441441

442442
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier2)
443443

444-
startAtTs, hasStartAtTs := verifier2.srcChangeReader.getStartTimestamp().Get()
445-
446-
suite.Require().True(hasStartAtTs)
444+
startAtTs := verifier2.srcChangeReader.getStartTimestamp()
447445

448446
suite.Assert().False(
449447
startAtTs.After(newTime),
@@ -631,14 +629,13 @@ func (suite *IntegrationTestSuite) TestStartAtTimeNoChanges() {
631629

632630
eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
633631

634-
startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get()
635-
suite.Require().True(hasStartAtTs, "startAtTs should be set")
632+
startAtTs := verifier.srcChangeReader.getStartTimestamp()
636633

637634
verifier.srcChangeReader.setWritesOff(insertTs)
638635

639636
suite.Require().NoError(eg.Wait())
640637

641-
startAtTs2 := verifier.srcChangeReader.getStartTimestamp().MustGet()
638+
startAtTs2 := verifier.srcChangeReader.getStartTimestamp()
642639

643640
suite.Require().False(
644641
startAtTs2.Before(startAtTs),
@@ -663,8 +660,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
663660
suite.Require().NotNil(origSessionTime)
664661
eg := suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
665662

666-
startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get()
667-
suite.Require().True(hasStartAtTs, "startAtTs should be set")
663+
startAtTs := verifier.srcChangeReader.getStartTimestamp()
668664

669665
// srcStartAtTs derives from the change stream’s resume token, which can
670666
// postdate our session time but should not precede it.
@@ -697,8 +693,7 @@ func (suite *IntegrationTestSuite) TestStartAtTimeWithChanges() {
697693

698694
suite.Require().NoError(eg.Wait())
699695

700-
startAtTs, hasStartAtTs = verifier.srcChangeReader.getStartTimestamp().Get()
701-
suite.Require().True(hasStartAtTs, "startAtTs should be set")
696+
startAtTs = verifier.srcChangeReader.getStartTimestamp()
702697

703698
suite.Assert().Equal(
704699
*postEventsSessionTime,
@@ -720,8 +715,7 @@ func (suite *IntegrationTestSuite) TestNoStartAtTime() {
720715
suite.Require().NotNil(origStartTs)
721716
suite.startSrcChangeStreamReaderAndHandler(ctx, verifier)
722717

723-
startAtTs, hasStartAtTs := verifier.srcChangeReader.getStartTimestamp().Get()
724-
suite.Require().True(hasStartAtTs, "startAtTs should be set")
718+
startAtTs := verifier.srcChangeReader.getStartTimestamp()
725719

726720
suite.Require().NotNil(startAtTs)
727721
suite.Require().LessOrEqual(origStartTs.Compare(startAtTs), 0)

internal/verifier/check.go

Lines changed: 48 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,9 @@ import (
66
"time"
77

88
"github.com/10gen/migration-verifier/contextplus"
9-
"github.com/10gen/migration-verifier/history"
109
"github.com/10gen/migration-verifier/internal/logger"
1110
"github.com/10gen/migration-verifier/internal/retry"
12-
"github.com/10gen/migration-verifier/internal/util"
1311
"github.com/10gen/migration-verifier/mslices"
14-
"github.com/10gen/migration-verifier/msync"
15-
"github.com/10gen/migration-verifier/option"
1612
mapset "github.com/deckarep/golang-set/v2"
1713
"github.com/goaux/timer"
1814
"github.com/pkg/errors"
@@ -255,28 +251,10 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
255251
verifier.phase = Idle
256252
}()
257253

258-
changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx)
259-
for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) {
260-
if changeReader.isRunning() {
261-
verifier.logger.Debug().Msgf("Check: %s already running.", changeReader)
262-
} else {
263-
verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader)
264-
265-
err = changeReader.start(groupCtx, changeReaderGroup)
266-
if err != nil {
267-
return errors.Wrapf(err, "failed to start %s", changeReader)
268-
}
269-
changeReaderGroup.Go(func() error {
270-
return verifier.RunChangeEventPersistor(groupCtx, changeReader)
271-
})
272-
}
254+
if err := verifier.startChangeHandling(ctx); err != nil {
255+
return err
273256
}
274257

275-
changeHandlingErr := verifier.changeHandlingErr
276-
go func() {
277-
changeHandlingErr.Set(changeReaderGroup.Wait())
278-
}()
279-
280258
// Log the verification status when initially booting up so it's easy to see the current state
281259
verificationStatus, err := verifier.GetVerificationStatus(ctx)
282260
if err != nil {
@@ -409,6 +387,38 @@ func (verifier *Verifier) CheckDriver(ctx context.Context, filter bson.D, testCh
409387
}
410388
}
411389

390+
// startChangeHandling starts the goroutines that read changes
391+
// from the source & destination and that persist those changes
392+
// to the metadata.
393+
//
394+
// As part of this, it sets the change readers’ start timestamps.
395+
// (It blocks until those are set.)
396+
func (verifier *Verifier) startChangeHandling(ctx context.Context) error {
397+
changeReaderGroup, groupCtx := contextplus.ErrGroup(ctx)
398+
for _, changeReader := range mslices.Of(verifier.srcChangeReader, verifier.dstChangeReader) {
399+
if changeReader.isRunning() {
400+
verifier.logger.Debug().Msgf("Check: %s already running.", changeReader)
401+
} else {
402+
verifier.logger.Debug().Msgf("%s not running; starting change reader", changeReader)
403+
404+
err := changeReader.start(groupCtx, changeReaderGroup)
405+
if err != nil {
406+
return errors.Wrapf(err, "failed to start %s", changeReader)
407+
}
408+
changeReaderGroup.Go(func() error {
409+
return verifier.RunChangeEventPersistor(groupCtx, changeReader)
410+
})
411+
}
412+
}
413+
414+
changeHandlingErr := verifier.changeHandlingErr
415+
go func() {
416+
changeHandlingErr.Set(changeReaderGroup.Wait())
417+
}()
418+
419+
return nil
420+
}
421+
412422
func (verifier *Verifier) setupAllNamespaceList(ctx context.Context) error {
413423
// We want to check all user collections on both source and dest.
414424
srcNamespaces, err := ListAllUserNamespaces(ctx, verifier.logger, verifier.srcClient, verifier.metaDBName)
@@ -600,36 +610,18 @@ func (verifier *Verifier) work(ctx context.Context, workerNum int) error {
600610
}
601611
}
602612

603-
func (verifier *Verifier) initializeChangeReaders() {
604-
srcReader := &ChangeStreamReader{
605-
ChangeReaderCommon: ChangeReaderCommon{
606-
readerType: src,
607-
namespaces: verifier.srcNamespaces,
608-
watcherClient: verifier.srcClient,
609-
clusterInfo: *verifier.srcClusterInfo,
610-
},
611-
}
612-
verifier.srcChangeReader = srcReader
613-
614-
dstReader := &ChangeStreamReader{
615-
ChangeReaderCommon: ChangeReaderCommon{
616-
readerType: dst,
617-
namespaces: verifier.dstNamespaces,
618-
watcherClient: verifier.dstClient,
619-
clusterInfo: *verifier.dstClusterInfo,
620-
onDDLEvent: onDDLEventAllow,
621-
},
622-
}
623-
verifier.dstChangeReader = dstReader
624-
625-
// Common elements in both readers:
626-
for _, csr := range mslices.Of(srcReader, dstReader) {
627-
csr.logger = verifier.logger
628-
csr.metaDB = verifier.metaClient.Database(verifier.metaDBName)
629-
csr.changeEventBatchChan = make(chan changeEventBatch, batchChanBufferSize)
630-
csr.writesOffTs = util.NewEventual[bson.Timestamp]()
631-
csr.lag = msync.NewTypedAtomic(option.None[time.Duration]())
632-
csr.batchSizeHistory = history.New[int](time.Minute)
633-
csr.resumeTokenTSExtractor = extractTSFromChangeStreamResumeToken
634-
}
613+
func (v *Verifier) initializeChangeReaders() {
614+
v.srcChangeReader = v.newChangeStreamReader(
615+
v.srcNamespaces,
616+
src,
617+
v.srcClient,
618+
*v.srcClusterInfo,
619+
)
620+
621+
v.dstChangeReader = v.newChangeStreamReader(
622+
v.dstNamespaces,
623+
dst,
624+
v.dstClient,
625+
*v.dstClusterInfo,
626+
)
635627
}

0 commit comments

Comments
 (0)