Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions dockey/agg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
// the returned key may not always match the change stream’s `documentKey`
// (because the server misreports its own sharding logic).
func ExtractTrueDocKeyAgg(fieldNames []string, docExpr string) bson.D {
assertFieldNameUniqueness(fieldNames)

return bson.D{
{"$arrayToObject", mslices.Of(lo.Map(
fieldNames,
Expand Down
20 changes: 0 additions & 20 deletions dockey/agg_test.go

This file was deleted.

25 changes: 14 additions & 11 deletions dockey/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"fmt"
"strings"

"github.com/10gen/migration-verifier/mslices"
"github.com/pkg/errors"
"github.com/samber/lo"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/x/bsonx/bsoncore"
)

// This extracts the document key from a document gets its field names.
// ExtractTrueDocKeyFromDoc extracts the document key from a document
// given its field names.
//
// NB: This avoids the problem documented in SERVER-109340; as a result,
// the returned key may not always match the change stream’s `documentKey`
Expand All @@ -21,7 +22,8 @@ func ExtractTrueDocKeyFromDoc(
) (bson.Raw, error) {
assertFieldNameUniqueness(fieldNames)

var dk bson.D
docBuilder := bsoncore.NewDocumentBuilder()

for _, field := range fieldNames {
var val bson.RawValue

Expand All @@ -38,19 +40,20 @@ func ExtractTrueDocKeyFromDoc(
return nil, errors.Wrapf(err, "extracting doc key field %#q from doc %+v", field, doc)
}

dk = append(dk, bson.E{field, val})
}

docKey, err := bson.Marshal(dk)
if err != nil {
return nil, errors.Wrapf(err, "marshaling doc key %v from doc %v", dk, docKey)
docBuilder.AppendValue(
field,
bsoncore.Value{
Type: val.Type,
Data: val.Value,
},
)
}

return docKey, nil
return bson.Raw(docBuilder.Build()), nil
}

func assertFieldNameUniqueness(fieldNames []string) {
if len(lo.Uniq(fieldNames)) != len(fieldNames) {
if mslices.FindFirstDupe(fieldNames).IsSome() {
panic(fmt.Sprintf("Duplicate field names: %v", fieldNames))
}
}
11 changes: 0 additions & 11 deletions dockey/raw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,4 @@ func TestExtractTrueDocKeyFromDoc(t *testing.T) {
)
}
}

assert.Panics(
t,
func() {
_, _ = ExtractTrueDocKeyFromDoc(
[]string{"foo", "bar", "foo"},
bson.Raw{0},
)
},
"duplicate field name should cause panic",
)
}
21 changes: 12 additions & 9 deletions internal/verifier/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/10gen/migration-verifier/internal/retry"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/msync"
"github.com/10gen/migration-verifier/option"
mapset "github.com/deckarep/golang-set/v2"
Expand Down Expand Up @@ -43,7 +44,7 @@ var supportedEventOpTypes = mapset.NewSet(
type ParsedEvent struct {
OpType string `bson:"operationType"`
Ns *Namespace `bson:"ns,omitempty"`
DocID any `bson:"_docID,omitempty"`
DocID bson.RawValue `bson:"_docID,omitempty"`
FullDocument bson.Raw `bson:"fullDocument,omitempty"`
FullDocLen option.Option[types.ByteCount] `bson:"_fullDocLen"`
ClusterTime *primitive.Timestamp `bson:"clusterTime,omitEmpty"`
Expand Down Expand Up @@ -186,7 +187,7 @@ func (verifier *Verifier) HandleChangeStreamEvents(ctx context.Context, batch ch

dbNames := make([]string, len(batch.events))
collNames := make([]string, len(batch.events))
docIDs := make([]any, len(batch.events))
docIDs := make([]bson.RawValue, len(batch.events))
dataSizes := make([]int, len(batch.events))

latestTimestamp := primitive.Timestamp{}
Expand Down Expand Up @@ -857,17 +858,19 @@ func (csr *ChangeStreamReader) persistChangeStreamResumeToken(ctx context.Contex
}

func extractTimestampFromResumeToken(resumeToken bson.Raw) (primitive.Timestamp, error) {
tokenStruct := struct {
Data string `bson:"_data"`
}{}

// Change stream token is always a V1 keystring in the _data field
err := bson.Unmarshal(resumeToken, &tokenStruct)
tokenDataRV, err := resumeToken.LookupErr("_data")

if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "extracting %#q from resume token (%v)", "_data", resumeToken)
}

tokenData, err := mbson.CastRawValue[string](tokenDataRV)
if err != nil {
return primitive.Timestamp{}, errors.Wrapf(err, "failed to extract %#q from resume token (%v)", "_data", resumeToken)
return primitive.Timestamp{}, errors.Wrapf(err, "parsing resume token (%v)", resumeToken)
}

resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenStruct.Data)
resumeTokenBson, err := keystring.KeystringToBson(keystring.V1, tokenData)
if err != nil {
return primitive.Timestamp{}, err
}
Expand Down
3 changes: 2 additions & 1 deletion internal/verifier/change_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/10gen/migration-verifier/internal/testutil"
"github.com/10gen/migration-verifier/internal/types"
"github.com/10gen/migration-verifier/internal/util"
"github.com/10gen/migration-verifier/mbson"
"github.com/10gen/migration-verifier/mslices"
"github.com/10gen/migration-verifier/mstrings"
"github.com/pkg/errors"
Expand Down Expand Up @@ -131,7 +132,7 @@ func (suite *IntegrationTestSuite) TestChangeStreamFilter_BsonSize() {
suite.Require().Equal("insert", parsed.OpType)

suite.Require().Equal(
"abc",
mbson.ToRawValue("abc"),
parsed.DocID,
"event should reference expected document",
)
Expand Down
34 changes: 18 additions & 16 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,16 +455,20 @@ func (verifier *Verifier) maybeAppendGlobalFilterToPredicates(predicates bson.A)
return append(predicates, verifier.globalFilter)
}

func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDoc, dstClientDoc bson.Raw, namespace string, id any, fieldPrefix string) (results []VerificationResult) {
func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDoc, dstClientDoc bson.Raw, namespace string, id bson.RawValue, fieldPrefix string) (results []VerificationResult) {
results = make(
[]VerificationResult,
0,
len(mismatch.missingFieldOnSrc)+len(mismatch.missingFieldOnDst)+len(mismatch.fieldContentsDiffer),
)

for _, field := range mismatch.missingFieldOnSrc {
result := VerificationResult{
Field: fieldPrefix + field,
Details: Missing,
Cluster: ClusterSource,
NameSpace: namespace}
if id != nil {
result.ID = id
NameSpace: namespace,
ID: id,
}

results = append(results, result)
Expand All @@ -475,9 +479,8 @@ func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDo
Field: fieldPrefix + field,
Details: Missing,
Cluster: ClusterTarget,
NameSpace: namespace}
if id != nil {
result.ID = id
NameSpace: namespace,
ID: id,
}

results = append(results, result)
Expand All @@ -502,9 +505,8 @@ func mismatchResultsToVerificationResults(mismatch *MismatchDetails, srcClientDo
Field: fieldPrefix + field,
Details: details,
Cluster: ClusterTarget,
NameSpace: namespace}
if id != nil {
result.ID = id
NameSpace: namespace,
ID: id,
}

results = append(results, result)
Expand Down Expand Up @@ -564,11 +566,11 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
Int("mismatchesCount", len(problems)).
Msg("Discrepancies found. Will recheck in the next generation.")

var dataSizes []int
dataSizes := make([]int, 0, len(problems))

// This stores all IDs for the next generation to check.
// Its length should equal len(mismatches) + len(missingIds).
var idsToRecheck []any
idsToRecheck := make([]bson.RawValue, 0, len(problems))

for _, mismatch := range problems {
idsToRecheck = append(idsToRecheck, mismatch.ID)
Expand Down Expand Up @@ -818,7 +820,7 @@ func (verifier *Verifier) compareCollectionSpecifications(
Details: Mismatch + fmt.Sprintf(" : src: %v, dst: %v", srcSpec.Options, dstSpec.Options),
})
} else {
results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, "spec", "Options.")...)
results = append(results, mismatchResultsToVerificationResults(mismatchDetails, srcSpec.Options, dstSpec.Options, srcNs, mbson.ToRawValue("spec"), "Options.")...)
}
}

Expand Down Expand Up @@ -1005,7 +1007,7 @@ func (verifier *Verifier) verifyIndexes(

if !theyMatch {
results = append(results, VerificationResult{
ID: indexName,
ID: mbson.ToRawValue(indexName),
Field: "index",
NameSpace: FullName(dstColl),
Cluster: ClusterTarget,
Expand All @@ -1014,7 +1016,7 @@ func (verifier *Verifier) verifyIndexes(
}
} else {
results = append(results, VerificationResult{
ID: indexName,
ID: mbson.ToRawValue(indexName),
Field: "index",
Details: Missing,
Cluster: ClusterSource,
Expand All @@ -1027,7 +1029,7 @@ func (verifier *Verifier) verifyIndexes(
for indexName := range srcMap {
if !srcMapUsed[indexName] {
results = append(results, VerificationResult{
ID: indexName,
ID: mbson.ToRawValue(indexName),
Field: "index",
Details: Missing,
Cluster: ClusterTarget,
Expand Down
Loading