From a544bf1702feb7d16cabb5f30ca5547f4b0d6408 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 23 Oct 2025 22:17:41 -0400 Subject: [PATCH 1/5] Optimize dockey extraction by avoiding bson.Marshal. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This code gets called repeatedly, so it’s worth optimizing. This also optimizes the field-names uniqueness check. --- dockey/agg_test.go | 20 -------------------- dockey/raw.go | 25 ++++++++++++++----------- dockey/raw_test.go | 11 ----------- mslices/slices.go | 22 ++++++++++++++++++++++ 4 files changed, 36 insertions(+), 42 deletions(-) delete mode 100644 dockey/agg_test.go diff --git a/dockey/agg_test.go b/dockey/agg_test.go deleted file mode 100644 index 5a4d3ec5..00000000 --- a/dockey/agg_test.go +++ /dev/null @@ -1,20 +0,0 @@ -package dockey - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestAggPanic(t *testing.T) { - assert.Panics( - t, - func() { - ExtractTrueDocKeyAgg( - []string{"foo", "bar", "foo"}, - "$$ROOT", - ) - }, - "duplicate field name should cause panic", - ) -} diff --git a/dockey/raw.go b/dockey/raw.go index 0a6d0f46..208ae7d0 100644 --- a/dockey/raw.go +++ b/dockey/raw.go @@ -4,13 +4,14 @@ import ( "fmt" "strings" + "github.com/10gen/migration-verifier/mslices" "github.com/pkg/errors" - "github.com/samber/lo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) -// This extracts the document key from a document gets its field names. +// ExtractTrueDocKeyFromDoc extracts the document key from a document +// given its field names. // // NB: This avoids the problem documented in SERVER-109340; as a result, // the returned key may not always match the change stream’s `documentKey` @@ -21,7 +22,8 @@ func ExtractTrueDocKeyFromDoc( ) (bson.Raw, error) { assertFieldNameUniqueness(fieldNames) - var dk bson.D + docBuilder := bsoncore.NewDocumentBuilder() + for _, field := range fieldNames { var val bson.RawValue @@ -38,19 +40,20 @@ func ExtractTrueDocKeyFromDoc( return nil, errors.Wrapf(err, "extracting doc key field %#q from doc %+v", field, doc) } - dk = append(dk, bson.E{field, val}) - } - - docKey, err := bson.Marshal(dk) - if err != nil { - return nil, errors.Wrapf(err, "marshaling doc key %v from doc %v", dk, docKey) + docBuilder.AppendValue( + field, + bsoncore.Value{ + Type: val.Type, + Data: val.Value, + }, + ) } - return docKey, nil + return bson.Raw(docBuilder.Build()), nil } func assertFieldNameUniqueness(fieldNames []string) { - if len(lo.Uniq(fieldNames)) != len(fieldNames) { + if mslices.FindFirstDupe(fieldNames).IsSome() { panic(fmt.Sprintf("Duplicate field names: %v", fieldNames)) } } diff --git a/dockey/raw_test.go b/dockey/raw_test.go index 3e5a9652..8f5bbe4f 100644 --- a/dockey/raw_test.go +++ b/dockey/raw_test.go @@ -46,15 +46,4 @@ func TestExtractTrueDocKeyFromDoc(t *testing.T) { ) } } - - assert.Panics( - t, - func() { - _, _ = ExtractTrueDocKeyFromDoc( - []string{"foo", "bar", "foo"}, - bson.Raw{0}, - ) - }, - "duplicate field name should cause panic", - ) } diff --git a/mslices/slices.go b/mslices/slices.go index 31698847..26a2be81 100644 --- a/mslices/slices.go +++ b/mslices/slices.go @@ -3,6 +3,8 @@ package mslices import ( "reflect" "slices" + + "github.com/10gen/migration-verifier/option" ) // This package complements the Go standard library’s package of the @@ -43,3 +45,23 @@ func Compact[T any, S ~[]T](slc S) S { func isZero[T any](val T) bool { return reflect.ValueOf(&val).Elem().IsZero() } + +// FindFirstDupe returns the first item in the slice that has at +// least 1 duplicate, or none if all slice members are unique. +func FindFirstDupe[T comparable](items []T) option.Option[T] { + for i := range items { + j := 1 + i + + if j == len(items) { + break + } + + for ; j < len(items); j++ { + if items[i] == items[j] { + return option.Some(items[i]) + } + } + } + + return option.None[T]() +} From aae909951624f1182eb20ec1b0f1104f54e562e9 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 23 Oct 2025 22:22:06 -0400 Subject: [PATCH 2/5] Avoid BSON unmarshal in extractTimestampFromResumeToken. Unmarshaling is slow due to reflection. This function gets called frequently enough to justify the optimization. --- internal/verifier/change_stream.go | 17 +++++----- mbson/raw_value.go | 50 ++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 7 deletions(-) create mode 100644 mbson/raw_value.go diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 6a8e2d42..76c0c815 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -10,6 +10,7 @@ import ( "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/msync" "github.com/10gen/migration-verifier/option" mapset "github.com/deckarep/golang-set/v2" @@ -857,17 +858,19 @@ func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Contex } func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, error) { - tokenStruct := struct { - Data string `bson:"_data"` - }{} - // Change stream token is always a V1 keystring in the _data field - err := bson.Unmarshal(resumeToken, &tokenStruct) + tokenDataRV, err := resumeToken.LookupErr("_data") + + if err != nil { + return primitive.Timestamp{}, errors.Wrapf(err, "extracting %#q from resume token (%v)", "_data", resumeToken) + } + + tokenData, err := mbson.CastRawValue[string](tokenDataRV) if err != nil { - return primitive.Timestamp{}, errors.Wrapf(err, "failed to extract %#q from resume token (%v)", "_data", resumeToken) + return primitive.Timestamp{}, errors.Wrapf(err, "parsing resume token (%v)", resumeToken) } - resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenStruct.Data) + resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenData) if err != nil { return primitive.Timestamp{}, err } diff --git a/mbson/raw_value.go b/mbson/raw_value.go new file mode 100644 index 00000000..668f36e8 --- /dev/null +++ b/mbson/raw_value.go @@ -0,0 +1,50 @@ +package mbson + +import ( + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/bson/bsontype" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type bsonType interface { + bson.Raw | primitive.Timestamp | string +} + +type cannotCastErr struct { + gotBSONType bsontype.Type + toGoType any +} + +func (ce cannotCastErr) Error() string { + return fmt.Sprintf("cannot cast BSON %s to %T", ce.gotBSONType, ce.toGoType) +} + +// CastRawValue is a “one-stop-shop” interface around bson.RawValue’s various +// casting interfaces. Unlike those functions, though, this returns an error +// if the target type doesn’t match the value. +// +// Augment bsonType if you find a type here that’s missing. +func CastRawValue[T bsonType](in bson.RawValue) (T, error) { + retPtr := new(T) + + switch any(*retPtr).(type) { + case bson.Raw: + if doc, isDoc := in.DocumentOK(); isDoc { + return any(doc).(T), nil + } + case primitive.Timestamp: + if t, i, ok := in.TimestampOK(); ok { + return any(primitive.Timestamp{t, i}).(T), nil + } + case string: + if str, ok := in.StringValueOK(); ok { + return any(str).(T), nil + } + default: + panic(fmt.Sprintf("Unrecognized Go type: %T (maybe augment bsonType?)", *retPtr)) + } + + return *retPtr, cannotCastErr{in.Type, *retPtr} +} From 570d9749e06fc054df90acdb2034f3664a5ce985 Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 23 Oct 2025 22:56:24 -0400 Subject: [PATCH 3/5] Persist doc IDs as RawValue rather than any. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This makes the document IDs easier to reason about and sets up further optimizations. It also adds a few “shortcut” utilities to marshal/unmarshal BSON without reflection. --- dockey/agg.go | 2 - internal/verifier/change_stream.go | 4 +- internal/verifier/change_stream_test.go | 3 +- internal/verifier/migration_verifier.go | 34 +++++----- internal/verifier/migration_verifier_test.go | 48 +++++++++----- internal/verifier/recheck.go | 23 +++---- internal/verifier/recheck_test.go | 18 +++++- internal/verifier/result.go | 3 +- mbson/raw_value.go | 67 ++++++++++++++++++-- mbson/raw_value_test.go | 27 ++++++++ 10 files changed, 168 insertions(+), 61 deletions(-) create mode 100644 mbson/raw_value_test.go diff --git a/dockey/agg.go b/dockey/agg.go index 020c01e3..6394a9a1 100644 --- a/dockey/agg.go +++ b/dockey/agg.go @@ -16,8 +16,6 @@ import ( // the returned key may not always match the change stream’s `documentKey` // (because the server misreports its own sharding logic). func ExtractTrueDocKeyAgg(fieldNames []string, docExpr string) bson.D { - assertFieldNameUniqueness(fieldNames) - return bson.D{ {"$arrayToObject", mslices.Of(lo.Map( fieldNames, diff --git a/internal/verifier/change_stream.go b/internal/verifier/change_stream.go index 76c0c815..abd3c03d 100644 --- a/internal/verifier/change_stream.go +++ b/internal/verifier/change_stream.go @@ -44,7 +44,7 @@ var supportedEventOpTypes = mapset.NewSet( type ParsedEvent struct { OpType string `bson:"operationType"` Ns *Namespace `bson:"ns,omitempty"` - DocID any `bson:"_docID,omitempty"` + DocID bson.RawValue `bson:"_docID,omitempty"` FullDocument bson.Raw `bson:"fullDocument,omitempty"` FullDocLen option.Option[types.ByteCount] `bson:"_fullDocLen"` ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"` @@ -187,7 +187,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch dbNames := make([]string, len(batch.events)) collNames := make([]string, len(batch.events)) - docIDs := make([]any, len(batch.events)) + docIDs := make([]bson.RawValue, len(batch.events)) dataSizes := make([]int, len(batch.events)) latestTimestamp := primitive.Timestamp{} diff --git a/internal/verifier/change_stream_test.go b/internal/verifier/change_stream_test.go index 673cdac1..e5a92055 100644 --- a/internal/verifier/change_stream_test.go +++ b/internal/verifier/change_stream_test.go @@ -11,6 +11,7 @@ import ( "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/mslices" "github.com/10gen/migration-verifier/mstrings" "github.com/pkg/errors" @@ -131,7 +132,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() { suite.Require().Equal("insert", parsed.OpType) suite.Require().Equal( - "abc", + mbson.ToRawValue("abc"), parsed.DocID, "event should reference expected document", ) diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index 92ebcb6c..ae4a7a5b 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -455,16 +455,20 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A) return append(predicates, verifier.globalFilter) } -func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDoc, dstClientDoc bson.Raw, namespace string, id any, fieldPrefix string) (results []VerificationResult) { +func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDoc, dstClientDoc bson.Raw, namespace string, id bson.RawValue, fieldPrefix string) (results []VerificationResult) { + results = make( + []VerificationResult, + 0, + len(mismatch.missingFieldOnSrc)+len(mismatch.missingFieldOnDst)+len(mismatch.fieldContentsDiffer), + ) for _, field := range mismatch.missingFieldOnSrc { result := VerificationResult{ Field: fieldPrefix + field, Details: Missing, Cluster: ClusterSource, - NameSpace: namespace} - if id != nil { - result.ID = id + NameSpace: namespace, + ID: id, } results = append(results, result) @@ -475,9 +479,8 @@ func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDo Field: fieldPrefix + field, Details: Missing, Cluster: ClusterTarget, - NameSpace: namespace} - if id != nil { - result.ID = id + NameSpace: namespace, + ID: id, } results = append(results, result) @@ -502,9 +505,8 @@ func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDo Field: fieldPrefix + field, Details: details, Cluster: ClusterTarget, - NameSpace: namespace} - if id != nil { - result.ID = id + NameSpace: namespace, + ID: id, } results = append(results, result) @@ -564,11 +566,11 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, Int("mismatchesCount", len(problems)). Msg("Discrepancies found. Will recheck in the next generation.") - var dataSizes []int + dataSizes := make([]int, 0, len(problems)) // This stores all IDs for the next generation to check. // Its length should equal len(mismatches) + len(missingIds). - var idsToRecheck []any + idsToRecheck := make([]bson.RawValue, 0, len(problems)) for _, mismatch := range problems { idsToRecheck = append(idsToRecheck, mismatch.ID) @@ -818,7 +820,7 @@ func (verifier *Verifier) compareCollectionSpecifications( Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Options, dstSpec.Options), }) } else { - results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, "spec", "Options.")...) + results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, mbson.ToRawValue("spec"), "Options.")...) } } @@ -1005,7 +1007,7 @@ func (verifier *Verifier) verifyIndexes( if !theyMatch { results = append(results, VerificationResult{ - ID: indexName, + ID: mbson.ToRawValue(indexName), Field: "index", NameSpace: FullName(dstColl), Cluster: ClusterTarget, @@ -1014,7 +1016,7 @@ func (verifier *Verifier) verifyIndexes( } } else { results = append(results, VerificationResult{ - ID: indexName, + ID: mbson.ToRawValue(indexName), Field: "index", Details: Missing, Cluster: ClusterSource, @@ -1027,7 +1029,7 @@ func (verifier *Verifier) verifyIndexes( for indexName := range srcMap { if !srcMapUsed[indexName] { results = append(results, VerificationResult{ - ID: indexName, + ID: mbson.ToRawValue(indexName), Field: "index", Details: Missing, Cluster: ClusterTarget, diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index 354f5ad0..86b5d883 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -25,6 +25,7 @@ import ( "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" + "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/mslices" "github.com/cespare/permute/v2" "github.com/rs/zerolog" @@ -543,7 +544,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { suite.Assert().Regexp(regexp.MustCompile("^"+Mismatch), results[0].Details, "mismatch expected") suite.Assert().EqualValues( any(id), - results[0].ID.(bson.RawValue).AsInt64(), + results[0].ID.AsInt64(), "mismatch recorded as expeceted", ) @@ -562,7 +563,7 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { suite.Assert().Regexp(regexp.MustCompile("^"+Mismatch), results[0].Details, "mismatch expeceted") suite.Assert().EqualValues( any(id), - results[0].ID.(bson.RawValue).AsInt64(), + results[0].ID.AsInt64(), "mismatch recorded as expeceted", ) } @@ -684,7 +685,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() events: []ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll2"}, - DocID: "heyhey", + DocID: mbson.ToRawValue("heyhey"), ClusterTime: &primitive.Timestamp{ T: uint32(time.Now().Unix()), }, @@ -700,7 +701,7 @@ func (suite *IntegrationTestSuite) TestGetPersistedNamespaceStatistics_Recheck() events: []ParsedEvent{{ OpType: "insert", Ns: &Namespace{DB: "mydb", Coll: "coll1"}, - DocID: "hoohoo", + DocID: mbson.ToRawValue("hoohoo"), ClusterTime: &primitive.Timestamp{ T: uint32(time.Now().Unix()), }, @@ -935,14 +936,29 @@ func (suite *IntegrationTestSuite) TestGetNamespaceStatistics_Gen0() { func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { ctx := suite.Context() verifier := suite.BuildVerifier() - err := verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar", []any{42}, []int{100}) + err := verifier.InsertFailedCompareRecheckDocs( + ctx, + "foo.bar", + mslices.Of(mbson.ToRawValue(42)), + []int{100}, + ) suite.Require().NoError(err) - err = verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar", []any{43, 44}, []int{100, 100}) + err = verifier.InsertFailedCompareRecheckDocs( + ctx, + "foo.bar", + mslices.Of(mbson.ToRawValue(43), mbson.ToRawValue(44)), + []int{100, 100}, + ) suite.Require().NoError(err) - err = verifier.InsertFailedCompareRecheckDocs(ctx, "foo.bar2", []any{42}, []int{100}) + err = verifier.InsertFailedCompareRecheckDocs( + ctx, + "foo.bar2", + mslices.Of(mbson.ToRawValue(42)), + []int{100}, + ) suite.Require().NoError(err) event := ParsedEvent{ - DocID: int32(55), + DocID: mbson.ToRawValue(int32(55)), OpType: "delete", Ns: &Namespace{ DB: "foo", @@ -1056,7 +1072,7 @@ func TestVerifierCompareDocs(t *testing.T) { compareFn: func(t *testing.T, mismatchResults []VerificationResult) { if assert.Equal(t, 1, len(mismatchResults)) { var res int - require.Nil(t, mismatchResults[0].ID.(bson.RawValue).Unmarshal(&res)) + require.Nil(t, mismatchResults[0].ID.Unmarshal(&res)) assert.Equal(t, id, res) assert.Regexp(t, regexp.MustCompile("^"+Mismatch), mismatchResults[0].Details) } @@ -1074,7 +1090,7 @@ func TestVerifierCompareDocs(t *testing.T) { compareFn: func(t *testing.T, mismatchResults []VerificationResult) { if assert.Equal(t, 1, len(mismatchResults)) { var res int - require.Nil(t, mismatchResults[0].ID.(bson.RawValue).Unmarshal(&res)) + require.Nil(t, mismatchResults[0].ID.Unmarshal(&res)) assert.Equal(t, id, res) assert.Regexp(t, regexp.MustCompile("^"+Mismatch), mismatchResults[0].Details) } @@ -1555,7 +1571,7 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { failures := suite.getFailuresForTask(verifier, task.PrimaryKey) if suite.Equal(1, len(failures)) { - suite.Equal(srcIndexNames[1], failures[0].ID) + suite.Equal(mbson.ToRawValue(srcIndexNames[1]), failures[0].ID) suite.Equal(Missing, failures[0].Details) suite.Equal(ClusterTarget, failures[0].Cluster) suite.Equal("testDb.testColl1", failures[0].NameSpace) @@ -1588,7 +1604,7 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { suite.T().Logf("failures: %+v", failures) if suite.Equal(1, len(failures)) { - suite.Equal(dstIndexNames[1], failures[0].ID) + suite.Equal(mbson.ToRawValue(dstIndexNames[1]), failures[0].ID) suite.Equal(Missing, failures[0].Details) suite.Equal(ClusterSource, failures[0].Cluster) suite.Equal("testDb.testColl2", failures[0].NameSpace) @@ -1620,13 +1636,13 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { failures = suite.getFailuresForTask(verifier, task.PrimaryKey) if suite.Equal(2, len(failures)) { sort.Slice(failures, func(i, j int) bool { - return failures[i].ID.(string) < failures[j].ID.(string) + return failures[i].ID.StringValue() < failures[j].ID.StringValue() }) - suite.Equal(dstIndexNames[1], failures[0].ID) + suite.Equal(mbson.ToRawValue(dstIndexNames[1]), failures[0].ID) suite.Equal(Missing, failures[0].Details) suite.Equal(ClusterSource, failures[0].Cluster) suite.Equal("testDb.testColl3", failures[0].NameSpace) - suite.Equal(srcIndexNames[0], failures[1].ID) + suite.Equal(mbson.ToRawValue(srcIndexNames[0]), failures[1].ID) suite.Equal(Missing, failures[1].Details) suite.Equal(ClusterTarget, failures[1].Cluster) suite.Equal("testDb.testColl3", failures[1].NameSpace) @@ -1658,7 +1674,7 @@ func (suite *IntegrationTestSuite) TestVerifierCompareIndexes() { suite.Equal(verificationTaskMetadataMismatch, task.Status) failures = suite.getFailuresForTask(verifier, task.PrimaryKey) if suite.Equal(1, len(failures)) { - suite.Equal("wrong", failures[0].ID) + suite.Equal(mbson.ToRawValue("wrong"), failures[0].ID) suite.Regexp(regexp.MustCompile("^"+Mismatch), failures[0].Details) suite.Equal(ClusterTarget, failures[0].Cluster) suite.Equal("testDb.testColl4", failures[0].NameSpace) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index be9589f4..f425c013 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -9,7 +9,6 @@ import ( "github.com/10gen/migration-verifier/internal/retry" "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" - "github.com/10gen/migration-verifier/mbson" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsontype" @@ -50,7 +49,7 @@ type RecheckDoc struct { // InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check. func (verifier *Verifier) InsertFailedCompareRecheckDocs( ctx context.Context, - namespace string, documentIDs []any, dataSizes []int) error { + namespace string, documentIDs []bson.RawValue, dataSizes []int) error { dbName, collName := SplitNamespace(namespace) dbNames := make([]string, len(documentIDs)) @@ -77,7 +76,7 @@ func (verifier *Verifier) insertRecheckDocs( ctx context.Context, dbNames []string, collNames []string, - documentIDs []any, + documentIDs []bson.RawValue, dataSizes []int, ) error { verifier.mux.RLock() @@ -191,7 +190,7 @@ func (verifier *Verifier) insertRecheckDocs( func deduplicateRechecks( dbNames, collNames []string, - documentIDs []any, + documentIDs []bson.RawValue, dataSizes []int, ) ([]string, []string, []bson.RawValue, []int) { dedupeMap := map[string]map[string]map[string]int{} @@ -200,15 +199,13 @@ func deduplicateRechecks( for i, dbName := range dbNames { collName := collNames[i] - docID := documentIDs[i] + docIDRaw := documentIDs[i] dataSize := dataSizes[i] - docIDRaw := mbson.MustConvertToRawValue(docID) - - docIDStr := string(append( - []byte{byte(docIDRaw.Type)}, - docIDRaw.Value..., - )) + docIDBuf := make([]byte, 1+len(docIDRaw.Value)) + docIDBuf[0] = byte(docIDRaw.Type) + copy(docIDBuf[1:], docIDRaw.Value) + docIDStr := string(docIDBuf) if _, ok := dedupeMap[dbName]; !ok { dedupeMap[dbName] = map[string]map[string]int{ @@ -251,8 +248,8 @@ func deduplicateRechecks( rawDocIDs = append( rawDocIDs, bson.RawValue{ - Type: []bsontype.Type(docIDStr)[0], - Value: []byte(docIDStr)[1:], + Type: bsontype.Type(docIDStr[0]), + Value: []byte(docIDStr[1:]), }, ) dataSizes = append(dataSizes, dataSize) diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 7fd08104..e8344bd4 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -8,6 +8,7 @@ import ( "github.com/10gen/migration-verifier/internal/testutil" "github.com/10gen/migration-verifier/internal/types" + "github.com/10gen/migration-verifier/mbson" "github.com/10gen/migration-verifier/mslices" "github.com/rs/zerolog" "github.com/samber/lo" @@ -24,7 +25,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { verifier.InsertFailedCompareRecheckDocs( ctx, "the.namespace", - []any{"theDocID"}, + []bson.RawValue{mbson.ToRawValue("theDocID")}, []int{1234}, ), "insert failed-comparison recheck", @@ -48,7 +49,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { event := ParsedEvent{ OpType: "insert", - DocID: "theDocID", + DocID: mbson.ToRawValue("theDocID"), Ns: &Namespace{ DB: "the", Coll: "namespace", @@ -488,5 +489,16 @@ func insertRecheckDocs( collNames[i] = collName } - return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes) + rawIDs := lo.Map( + documentIDs, + func(idAny any, _ int) bson.RawValue { + btype, buf := lo.Must2(bson.MarshalValue(idAny)) + return bson.RawValue{ + Type: btype, + Value: buf, + } + }, + ) + + return verifier.insertRecheckDocs(ctx, dbNames, collNames, rawIDs, dataSizes) } diff --git a/internal/verifier/result.go b/internal/verifier/result.go index bd75c368..13a4a572 100644 --- a/internal/verifier/result.go +++ b/internal/verifier/result.go @@ -2,6 +2,7 @@ package verifier import ( "github.com/10gen/migration-verifier/option" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -21,7 +22,7 @@ type VerificationResult struct { // VerificationResult instances might share the same ID. That’s OK, // though; it’ll just make the recheck include all docs with that ID, // regardless of which ones actually need the recheck. - ID any + ID bson.RawValue `bson:",omitempty"` Field string Details string diff --git a/mbson/raw_value.go b/mbson/raw_value.go index 668f36e8..0db5fc3a 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -1,17 +1,23 @@ package mbson import ( + "encoding/binary" "fmt" + "math" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsontype" "go.mongodb.org/mongo-driver/bson/primitive" ) -type bsonType interface { +type bsonCastRecipient interface { bson.Raw | primitive.Timestamp | string } +type bsonSourceTypes interface { + string | int | int32 | int64 +} + type cannotCastErr struct { gotBSONType bsontype.Type toGoType any @@ -26,10 +32,8 @@ func (ce cannotCastErr) Error() string { // if the target type doesn’t match the value. // // Augment bsonType if you find a type here that’s missing. -func CastRawValue[T bsonType](in bson.RawValue) (T, error) { - retPtr := new(T) - - switch any(*retPtr).(type) { +func CastRawValue[T bsonCastRecipient](in bson.RawValue) (T, error) { + switch any(*new(T)).(type) { case bson.Raw: if doc, isDoc := in.DocumentOK(); isDoc { return any(doc).(T), nil @@ -43,8 +47,57 @@ func CastRawValue[T bsonType](in bson.RawValue) (T, error) { return any(str).(T), nil } default: - panic(fmt.Sprintf("Unrecognized Go type: %T (maybe augment bsonType?)", *retPtr)) + panic(fmt.Sprintf("Unrecognized Go type: %T (maybe augment bsonType?)", in)) } - return *retPtr, cannotCastErr{in.Type, *retPtr} + return *new(T), cannotCastErr{in.Type, any(in)} +} + +// ToRawValue is a bit like bson.MarshalValue, but: +// - It’s faster since it avoids reflection. +// - It always succeeds since it only accepts certain known types. +func ToRawValue[T bsonSourceTypes](in T) bson.RawValue { + switch typedIn := any(in).(type) { + case int: + if typedIn < math.MinInt32 || typedIn > math.MaxInt32 { + return i64ToRawValue(int64(typedIn)) + } + + return i32ToRawValue(typedIn) + case int32: + return i32ToRawValue(typedIn) + case int64: + return i64ToRawValue(typedIn) + case string: + strLen := len(typedIn) + buf := make([]byte, 5+strLen) + + binary.LittleEndian.PutUint32(buf, 1+uint32(strLen)) + copy(buf[4:], []byte(typedIn)) + + return bson.RawValue{ + Type: bson.TypeString, + Value: buf, + } + } + + panic(fmt.Sprintf("Unrecognized Go type: %T (maybe add marshal instructions?)", in)) +} + +type i32Ish interface { + int | int32 +} + +func i32ToRawValue[T i32Ish](in T) bson.RawValue { + return bson.RawValue{ + Type: bson.TypeInt32, + Value: binary.LittleEndian.AppendUint32(nil, uint32(in)), + } +} + +func i64ToRawValue(in int64) bson.RawValue { + return bson.RawValue{ + Type: bson.TypeInt64, + Value: binary.LittleEndian.AppendUint64(nil, uint64(in)), + } } diff --git a/mbson/raw_value_test.go b/mbson/raw_value_test.go new file mode 100644 index 00000000..01a3c653 --- /dev/null +++ b/mbson/raw_value_test.go @@ -0,0 +1,27 @@ +package mbson + +import ( + "math" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestToRawValue_Int(t *testing.T) { + ints := []int{ + 0, + -1, + math.MaxInt32 - 1, + math.MaxInt32, + math.MaxInt32 + 1, + math.MaxInt64, + } + + for _, cur := range ints { + viaMarshal := MustConvertToRawValue(cur) + + viaUs := ToRawValue(cur) + + assert.Equal(t, viaMarshal, viaUs, "%d", cur) + } +} From 49a29a24b54af843959ea97752284d4484d08f4b Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 23 Oct 2025 23:05:23 -0400 Subject: [PATCH 4/5] Fix typo when batching rechecks. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This fixes a typo that caused more recheck batches than was intended. It’s merely a small performance fix; it does not impact correctness. --- internal/verifier/recheck.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index f425c013..2fb2a043 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -118,7 +118,7 @@ func (verifier *Verifier) insertRecheckDocs( mongo.NewInsertOneModel().SetDocument(recheckDoc), ) curBatchSize += len(recheckRaw) - if curBatchSize > recheckBatchByteLimit || len(recheckRaw) >= recheckBatchCountLimit { + if curBatchSize > recheckBatchByteLimit || len(curRechecks) >= recheckBatchCountLimit { recheckBatches = append(recheckBatches, curRechecks) curRechecks = nil curBatchSize = 0 From 899476fbe7e6394ae9c52899d238f9c378e765ac Mon Sep 17 00:00:00 2001 From: Felipe Gasper Date: Thu, 23 Oct 2025 23:08:19 -0400 Subject: [PATCH 5/5] Optimize insertion of rechecks. This minimizes GC overhead, especially from BSON marshaling. --- internal/verifier/recheck.go | 112 ++++++++++++++++++------------ internal/verifier/recheck_test.go | 20 +++--- 2 files changed, 78 insertions(+), 54 deletions(-) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 2fb2a043..7ed17c09 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -10,10 +10,12 @@ import ( "github.com/10gen/migration-verifier/internal/types" "github.com/10gen/migration-verifier/internal/util" "github.com/pkg/errors" + "github.com/samber/lo" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/bsontype" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx/bsoncore" ) const ( @@ -31,9 +33,22 @@ const ( // sorting by _id will guarantee that all rechecks for a given // namespace appear consecutively. type RecheckPrimaryKey struct { - SrcDatabaseName string `bson:"db"` - SrcCollectionName string `bson:"coll"` - DocumentID any `bson:"docID"` + SrcDatabaseName string `bson:"db"` + SrcCollectionName string `bson:"coll"` + DocumentID bson.RawValue `bson:"docID"` +} + +var _ bson.Marshaler = &RecheckPrimaryKey{} + +func (rk *RecheckPrimaryKey) MarshalBSON() ([]byte, error) { + return bsoncore.NewDocumentBuilder(). + AppendString("db", rk.SrcDatabaseName). + AppendString("coll", rk.SrcCollectionName). + AppendValue("docID", bsoncore.Value{ + Type: rk.DocumentID.Type, + Data: rk.DocumentID.Value, + }). + Build(), nil } // RecheckDoc stores the necessary information to know which documents must be rechecked. @@ -46,6 +61,15 @@ type RecheckDoc struct { DataSize int `bson:"dataSize"` } +var _ bson.Marshaler = &RecheckDoc{} + +func (rd *RecheckDoc) MarshalBSON() ([]byte, error) { + return bsoncore.NewDocumentBuilder(). + AppendDocument("_id", lo.Must(bson.Marshal(rd.PrimaryKey))). + AppendInt64("dataSize", int64(rd.DataSize)). + Build(), nil +} + // InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check. func (verifier *Verifier) InsertFailedCompareRecheckDocs( ctx context.Context, @@ -95,50 +119,16 @@ func (verifier *Verifier) insertRecheckDocs( genCollection := verifier.getRecheckQueueCollection(generation) - var recheckBatches [][]mongo.WriteModel - var curRechecks []mongo.WriteModel - curBatchSize := 0 - for i, dbName := range dbNames { - recheckDoc := RecheckDoc{ - PrimaryKey: RecheckPrimaryKey{ - SrcDatabaseName: dbName, - SrcCollectionName: collNames[i], - DocumentID: rawDocIDs[i], - }, - DataSize: dataSizes[i], - } - - recheckRaw, err := bson.Marshal(recheckDoc) - if err != nil { - return errors.Wrapf(err, "marshaling recheck for %#q", dbName+"."+collNames[i]) - } - - curRechecks = append( - curRechecks, - mongo.NewInsertOneModel().SetDocument(recheckDoc), - ) - curBatchSize += len(recheckRaw) - if curBatchSize > recheckBatchByteLimit || len(curRechecks) >= recheckBatchCountLimit { - recheckBatches = append(recheckBatches, curRechecks) - curRechecks = nil - curBatchSize = 0 - } - } - - if len(curRechecks) > 0 { - recheckBatches = append(recheckBatches, curRechecks) - } - - for _, models := range recheckBatches { + sendRechecks := func(rechecks []bson.Raw) { eg.Go(func() error { retryer := retry.New() err := retryer.WithCallback( func(retryCtx context.Context, _ *retry.FuncInfo) error { - _, err := genCollection.BulkWrite( + _, err := genCollection.InsertMany( retryCtx, - models, - options.BulkWrite().SetOrdered(false), + lo.ToAnySlice(rechecks), + options.InsertMany().SetOrdered(false), ) // We expect duplicate-key errors from the above because: @@ -157,20 +147,54 @@ func (verifier *Verifier) insertRecheckDocs( // and document sizes probably remain stable(-ish) across updates. err = util.TolerateSimpleDuplicateKeyInBulk( verifier.logger, - len(models), + len(rechecks), err, ) return err }, "persisting %d recheck(s)", - len(models), + len(rechecks), ).Run(groupCtx, verifier.logger) - return errors.Wrapf(err, "batch of %d rechecks", len(models)) + return errors.Wrapf(err, "batch of %d rechecks", len(rechecks)) }) } + curRechecks := make([]bson.Raw, 0, recheckBatchCountLimit) + curBatchBytes := 0 + for i, dbName := range dbNames { + recheckDoc := RecheckDoc{ + PrimaryKey: RecheckPrimaryKey{ + SrcDatabaseName: dbName, + SrcCollectionName: collNames[i], + DocumentID: rawDocIDs[i], + }, + DataSize: dataSizes[i], + } + + recheckRaw, err := bson.Marshal(recheckDoc) + if err != nil { + return errors.Wrapf(err, "marshaling recheck for %#q", dbName+"."+collNames[i]) + } + + curRechecks = append( + curRechecks, + bson.Raw(recheckRaw), + ) + + curBatchBytes += len(recheckRaw) + if curBatchBytes > recheckBatchByteLimit || len(curRechecks) >= recheckBatchCountLimit { + sendRechecks(curRechecks) + curRechecks = make([]bson.Raw, 0, recheckBatchCountLimit) + curBatchBytes = 0 + } + } + + if len(curRechecks) > 0 { + sendRechecks(curRechecks) + } + if err := eg.Wait(); err != nil { return errors.Wrapf( err, diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index e8344bd4..e0c4e4f1 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -39,7 +39,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: "the", SrcCollectionName: "namespace", - DocumentID: "theDocID", + DocumentID: mbson.ToRawValue("theDocID"), }, }, }, @@ -74,7 +74,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: "the", SrcCollectionName: "namespace", - DocumentID: "theDocID", + DocumentID: mbson.ToRawValue("theDocID"), }, }, }, @@ -282,13 +282,13 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() { PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: "testDB", SrcCollectionName: "testColl", - DocumentID: id1, + DocumentID: mbson.ToRawValue(id1), }, } d2 := d1 - d2.PrimaryKey.DocumentID = id2 + d2.PrimaryKey.DocumentID = mbson.ToRawValue(id2) d3 := d1 - d3.PrimaryKey.DocumentID = id3 + d3.PrimaryKey.DocumentID = mbson.ToRawValue(id3) results := suite.fetchRecheckDocs(ctx, verifier) suite.ElementsMatch([]any{d1, d2, d3}, results) @@ -342,13 +342,13 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() { PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: "testDB", SrcCollectionName: "testColl", - DocumentID: id1, + DocumentID: mbson.ToRawValue(id1), }, } d2 := d1 - d2.PrimaryKey.DocumentID = id2 + d2.PrimaryKey.DocumentID = mbson.ToRawValue(id2) d3 := d1 - d3.PrimaryKey.DocumentID = id3 + d3.PrimaryKey.DocumentID = mbson.ToRawValue(id3) results := suite.fetchRecheckDocs(ctx, verifier) suite.ElementsMatch([]any{d1, d2, d3}, results) @@ -451,11 +451,11 @@ func (suite *IntegrationTestSuite) TestGenerationalClear() { PrimaryKey: RecheckPrimaryKey{ SrcDatabaseName: "testDB", SrcCollectionName: "testColl", - DocumentID: id1, + DocumentID: mbson.ToRawValue(id1), }, } d2 := d1 - d2.PrimaryKey.DocumentID = id2 + d2.PrimaryKey.DocumentID = mbson.ToRawValue(id2) results := suite.fetchRecheckDocs(ctx, verifier) suite.Assert().ElementsMatch([]any{d1, d2}, results)