Skip to content

Commit 75c670e

Browse files
authored
mongo: enable ShardedCluster + better code sharing for validation (#3349)
Change Streams API is documented to work out-of-the-box for sharded cluster. This PR updates the shared validation code to: - check to make sure that the instance is either replica set or sharded cluster (previously, it ran `replSetGetStatus` to check this, we can instead use 'hello' command which is more general) - add explicit validation for required roles (`readAnyDatabase`, `clusterMonitor`) - better separation of concern by creating a `commands.go` file for all the db commands, while `validation.go` implements the validation logic -- will follow up with ClickPipes side as well. Testing: - [x] tested replica set on Atlas - [x] test replica set on EC2 - [x] test sharded cluster on Atlas - [x] test sharded clsuter on EC2
1 parent ba6e4ea commit 75c670e

File tree

3 files changed

+178
-67
lines changed

3 files changed

+178
-67
lines changed

flow/connectors/mongo/validate.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,20 @@ package connmongo
22

33
import (
44
"context"
5-
"errors"
6-
"fmt"
75

86
"github.com/PeerDB-io/peerdb/flow/generated/protos"
97
shared_mongo "github.com/PeerDB-io/peerdb/flow/shared/mongo"
108
)
119

1210
func (c *MongoConnector) ValidateCheck(ctx context.Context) error {
13-
version, err := c.GetVersion(ctx)
14-
if err != nil {
11+
if err := shared_mongo.ValidateServerCompatibility(ctx, c.client); err != nil {
1512
return err
1613
}
17-
cmp, err := shared_mongo.CompareServerVersions(version, shared_mongo.MinSupportedVersion)
18-
if err != nil {
14+
15+
if err := shared_mongo.ValidateUserRoles(ctx, c.client); err != nil {
1916
return err
2017
}
21-
if cmp == -1 {
22-
return fmt.Errorf("require minimum mongo version %s", shared_mongo.MinSupportedVersion)
23-
}
18+
2419
return nil
2520
}
2621

@@ -29,21 +24,9 @@ func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
2924
return nil
3025
}
3126

32-
if _, err := shared_mongo.GetReplSetGetStatus(ctx, c.client); err != nil {
33-
return err
34-
}
35-
36-
serverStatus, err := shared_mongo.GetServerStatus(ctx, c.client)
37-
if err != nil {
27+
if err := shared_mongo.ValidateOplogRetention(ctx, c.client); err != nil {
3828
return err
3929
}
40-
if serverStatus.StorageEngine.Name != "wiredTiger" {
41-
return errors.New("storage engine must be 'wiredTiger'")
42-
}
43-
if serverStatus.OplogTruncation.OplogMinRetentionHours == 0 ||
44-
serverStatus.OplogTruncation.OplogMinRetentionHours < shared_mongo.MinOplogRetentionHours {
45-
return errors.New("oplog retention must be set to >= 24 hours")
46-
}
4730

4831
return nil
4932
}

flow/shared/mongo/commands.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package mongo
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"go.mongodb.org/mongo-driver/v2/bson"
8+
"go.mongodb.org/mongo-driver/v2/mongo"
9+
)
10+
11+
type BuildInfo struct {
12+
Version string `bson:"version"`
13+
}
14+
15+
func GetBuildInfo(ctx context.Context, client *mongo.Client) (BuildInfo, error) {
16+
return runCommand[BuildInfo](ctx, client, "buildInfo")
17+
}
18+
19+
type ReplSetGetStatus struct {
20+
Set string `bson:"set"`
21+
MyState int `bson:"myState"`
22+
}
23+
24+
func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (ReplSetGetStatus, error) {
25+
return runCommand[ReplSetGetStatus](ctx, client, "replSetGetStatus")
26+
}
27+
28+
type OplogTruncation struct {
29+
OplogMinRetentionHours float64 `bson:"oplogMinRetentionHours"`
30+
}
31+
32+
type StorageEngine struct {
33+
Name string `bson:"name"`
34+
}
35+
36+
type ServerStatus struct {
37+
StorageEngine StorageEngine `bson:"storageEngine"`
38+
OplogTruncation OplogTruncation `bson:"oplogTruncation"`
39+
}
40+
41+
func GetServerStatus(ctx context.Context, client *mongo.Client) (ServerStatus, error) {
42+
return runCommand[ServerStatus](ctx, client, "serverStatus")
43+
}
44+
45+
type ConnectionStatus struct {
46+
AuthInfo AuthInfo `bson:"authInfo"`
47+
}
48+
49+
type AuthInfo struct {
50+
AuthenticatedUserRoles []Role `bson:"authenticatedUserRoles"`
51+
}
52+
53+
type Role struct {
54+
Role string `bson:"role"`
55+
DB string `bson:"db"`
56+
}
57+
58+
func GetConnectionStatus(ctx context.Context, client *mongo.Client) (ConnectionStatus, error) {
59+
return runCommand[ConnectionStatus](ctx, client, "connectionStatus")
60+
}
61+
62+
type HelloResponse struct {
63+
Msg string `bson:"msg,omitempty"`
64+
Hosts []string `bson:"hosts,omitempty"`
65+
}
66+
67+
func GetHelloResponse(ctx context.Context, client *mongo.Client) (HelloResponse, error) {
68+
return runCommand[HelloResponse](ctx, client, "hello")
69+
}
70+
71+
func runCommand[T any](ctx context.Context, client *mongo.Client, command string) (T, error) {
72+
var result T
73+
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
74+
bson.E{Key: command, Value: 1},
75+
})
76+
if singleResult.Err() != nil {
77+
return result, fmt.Errorf("'%s' failed: %v", command, singleResult.Err())
78+
}
79+
80+
if err := singleResult.Decode(&result); err != nil {
81+
return result, fmt.Errorf("'%s' failed: %v", command, err)
82+
}
83+
return result, nil
84+
}

flow/shared/mongo/validation.go

Lines changed: 89 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,75 +2,119 @@ package mongo
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
7+
"slices"
68

7-
"go.mongodb.org/mongo-driver/v2/bson"
89
"go.mongodb.org/mongo-driver/v2/mongo"
910
)
1011

1112
const (
1213
MinSupportedVersion = "5.1.0"
1314
MinOplogRetentionHours = 24
15+
16+
ReplicaSet = "ReplicaSet"
17+
ShardedCluster = "ShardedCluster"
1418
)
1519

16-
type BuildInfo struct {
17-
Version string `bson:"version"`
18-
}
20+
var RequiredRoles = [...]string{"readAnyDatabase", "clusterMonitor"}
1921

20-
type ReplSetGetStatus struct {
21-
Set string `bson:"set"`
22-
MyState int `bson:"myState"`
23-
}
22+
func ValidateServerCompatibility(ctx context.Context, client *mongo.Client) error {
23+
buildInfo, err := GetBuildInfo(ctx, client)
24+
if err != nil {
25+
return err
26+
}
2427

25-
type OplogTruncation struct {
26-
OplogMinRetentionHours float64 `bson:"oplogMinRetentionHours"`
27-
}
28+
if cmp, err := CompareServerVersions(buildInfo.Version, MinSupportedVersion); err != nil {
29+
return err
30+
} else if cmp < 0 {
31+
return fmt.Errorf("require minimum mongo version %s", MinSupportedVersion)
32+
}
2833

29-
type StorageEngine struct {
30-
Name string `bson:"name"`
31-
}
34+
validateStorageEngine := func(instanceCtx context.Context, instanceClient *mongo.Client) error {
35+
ss, err := GetServerStatus(instanceCtx, instanceClient)
36+
if err != nil {
37+
return err
38+
}
3239

33-
type ServerStatus struct {
34-
StorageEngine StorageEngine `bson:"storageEngine"`
35-
OplogTruncation OplogTruncation `bson:"oplogTruncation"`
40+
if ss.StorageEngine.Name != "wiredTiger" {
41+
return errors.New("only wiredTiger storage engine is supported")
42+
}
43+
return nil
44+
}
45+
46+
topologyType, err := GetTopologyType(ctx, client)
47+
if err != nil {
48+
return err
49+
}
50+
51+
if topologyType == ReplicaSet {
52+
return validateStorageEngine(ctx, client)
53+
} else {
54+
// TODO: run validation on shard
55+
return nil
56+
}
3657
}
3758

38-
func GetBuildInfo(ctx context.Context, client *mongo.Client) (*BuildInfo, error) {
39-
singleResult := client.Database("admin").RunCommand(ctx, bson.D{bson.E{Key: "buildInfo", Value: 1}})
40-
if singleResult.Err() != nil {
41-
return nil, fmt.Errorf("failed to run 'buildInfo' command: %w", singleResult.Err())
59+
func ValidateUserRoles(ctx context.Context, client *mongo.Client) error {
60+
connectionStatus, err := GetConnectionStatus(ctx, client)
61+
if err != nil {
62+
return err
4263
}
43-
var info BuildInfo
44-
if err := singleResult.Decode(&info); err != nil {
45-
return nil, fmt.Errorf("failed to decode BuildInfo: %w", err)
64+
65+
for _, requiredRole := range RequiredRoles {
66+
if !slices.ContainsFunc(connectionStatus.AuthInfo.AuthenticatedUserRoles, func(r Role) bool {
67+
return r.Role == requiredRole
68+
}) {
69+
return fmt.Errorf("missing required role: %s", requiredRole)
70+
}
4671
}
47-
return &info, nil
72+
73+
return nil
4874
}
4975

50-
func GetReplSetGetStatus(ctx context.Context, client *mongo.Client) (*ReplSetGetStatus, error) {
51-
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
52-
bson.E{Key: "replSetGetStatus", Value: 1},
53-
})
54-
if singleResult.Err() != nil {
55-
return nil, fmt.Errorf("failed to run 'replSetGetStatus' command: %w", singleResult.Err())
76+
func ValidateOplogRetention(ctx context.Context, client *mongo.Client) error {
77+
validateOplogRetention := func(instanceCtx context.Context, instanceClient *mongo.Client) error {
78+
ss, err := GetServerStatus(instanceCtx, instanceClient)
79+
if err != nil {
80+
return err
81+
}
82+
if ss.OplogTruncation.OplogMinRetentionHours == 0 ||
83+
ss.OplogTruncation.OplogMinRetentionHours < MinOplogRetentionHours {
84+
return fmt.Errorf("oplog retention must be set to >= 24 hours, but got %f",
85+
ss.OplogTruncation.OplogMinRetentionHours)
86+
}
87+
return nil
5688
}
57-
var status ReplSetGetStatus
58-
if err := singleResult.Decode(&status); err != nil {
59-
return nil, fmt.Errorf("failed to decode ReplSetGetStatus: %w", err)
89+
90+
topology, err := GetTopologyType(ctx, client)
91+
if err != nil {
92+
return err
93+
}
94+
if topology == ReplicaSet {
95+
return validateOplogRetention(ctx, client)
96+
} else {
97+
// TODO: run validation on shard
98+
return nil
6099
}
61-
return &status, nil
62100
}
63101

64-
func GetServerStatus(ctx context.Context, client *mongo.Client) (*ServerStatus, error) {
65-
singleResult := client.Database("admin").RunCommand(ctx, bson.D{
66-
bson.E{Key: "serverStatus", Value: 1},
67-
})
68-
if singleResult.Err() != nil {
69-
return nil, fmt.Errorf("failed to run 'serverStatus' command: %w", singleResult.Err())
102+
func GetTopologyType(ctx context.Context, client *mongo.Client) (string, error) {
103+
hello, err := GetHelloResponse(ctx, client)
104+
if err != nil {
105+
return "", err
106+
}
107+
108+
// Only replica set has 'hosts' field
109+
// https://www.mongodb.com/docs/manual/reference/command/hello/#mongodb-data-hello.hosts
110+
if len(hello.Hosts) > 0 {
111+
return ReplicaSet, nil
70112
}
71-
var status ServerStatus
72-
if err := singleResult.Decode(&status); err != nil {
73-
return nil, fmt.Errorf("failed to decode ServerStatus: %w", err)
113+
114+
// Only sharded cluster has 'msg' field, and equals to 'isdbgrid'
115+
// https://www.mongodb.com/docs/manual/reference/command/hello/#mongodb-data-hello.msg
116+
if hello.Msg == "isdbgrid" {
117+
return ShardedCluster, nil
74118
}
75-
return &status, nil
119+
return "", errors.New("topology type must be ReplicaSet or ShardedCluster")
76120
}

0 commit comments

Comments
 (0)