Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
/build
artifacts
**/*.pprof
benchmark/ycsb/data

# Module caches
.gocache
Expand Down
96 changes: 82 additions & 14 deletions cmd/nokv-fsmeta-history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"time"

nokverrors "github.com/feichai0017/NoKV/errors"
"github.com/feichai0017/NoKV/fsmeta"
fsmetaclient "github.com/feichai0017/NoKV/fsmeta/client"
fsmetacontract "github.com/feichai0017/NoKV/fsmeta/contract"
)

func main() {
var (
addr = flag.String("addr", "127.0.0.1:8090", "FSMetadata gRPC address")
mount = flag.String("mount", "default", "registered mount ID")
seeds = flag.Int("seeds", 1, "number of deterministic seeds to run")
start = flag.Int64("seed-start", 1, "first deterministic seed")
steps = flag.Int("steps", 64, "generated operations per seed before external filtering")
batch = flag.Int("batch", 3, "concurrent history batch size")
timeout = flag.Duration("timeout", 60*time.Second, "overall command timeout")
scope = flag.String("scope-prefix", "history", "unique root directory prefix for isolating each generated history")
addr = flag.String("addr", "127.0.0.1:8090", "FSMetadata gRPC address")
mount = flag.String("mount", "default", "registered mount ID")
seeds = flag.Int("seeds", 1, "number of deterministic seeds to run")
start = flag.Int64("seed-start", 1, "first deterministic seed")
steps = flag.Int("steps", 64, "generated operations per seed before external filtering")
batch = flag.Int("batch", 3, "concurrent history batch size")
timeout = flag.Duration("timeout", 60*time.Second, "overall command timeout")
scope = flag.String("scope-prefix", "history", "unique root directory prefix for isolating each generated history")
allowIndeterminate = flag.Bool("allow-indeterminate-errors", false, "treat retryable availability errors as operations with unknown commit outcome")
)
flag.Parse()
if *seeds <= 0 || *start <= 0 || *steps <= 0 {
Expand All @@ -47,29 +50,76 @@ func main() {
unique := time.Now().UnixNano()
scopeName := fmt.Sprintf("%s-%06d-%d", *scope, seed, unique)
scopeInode := fsmeta.InodeID(9_000_000_000 + seed*1_000_000 + unique%1_000_000)
ops := externalHistoryOps(fsmetacontract.GenerateScript(seed, *steps), mountID, scopeName, scopeInode)
scopeOp := scopeCreateOperation(mountID, scopeName, scopeInode)
if err := createScopeWithRetry(ctx, cli, scopeOp); err != nil {
log.Fatalf("create history scope seed=%d: %v", seed, err)
}
if got := model.Apply(scopeOp); got.Err != nil {
log.Fatalf("apply history scope seed=%d: %v", seed, got.Err)
}
ops := externalHistoryOps(fsmetacontract.GenerateScript(seed, *steps), mountID, scopeInode, scopeInode)
if len(ops) == 0 {
log.Fatalf("seed %d generated no external-safe operations", seed)
}
if err := fsmetacontract.RunConcurrentBatches(ctx, cli, model, ops, *batch); err != nil {
opts := fsmetacontract.HistoryOptions{AllowIndeterminateErrors: *allowIndeterminate}
if err := fsmetacontract.RunConcurrentBatches(ctx, cli, model, ops, *batch, opts); err != nil {
fmt.Fprintf(os.Stderr, "fsmeta history failed seed=%d steps=%d filtered_ops=%d\n", seed, *steps, len(ops))
log.Fatal(err)
}
log.Printf("fsmeta history passed seed=%d filtered_ops=%d", seed, len(ops))
}
}

func externalHistoryOps(in []fsmetacontract.Operation, mount fsmeta.MountID, scopeName string, scopeInode fsmeta.InodeID) []fsmetacontract.Operation {
out := make([]fsmetacontract.Operation, 0, len(in)+1)
out = append(out, fsmetacontract.Operation{
func scopeCreateOperation(mount fsmeta.MountID, scopeName string, scopeInode fsmeta.InodeID) fsmetacontract.Operation {
return fsmetacontract.Operation{
Kind: fsmetacontract.OpCreate,
Mount: mount,
Parent: fsmeta.RootInode,
Name: scopeName,
Inode: scopeInode,
Type: fsmeta.InodeTypeDirectory,
Mode: 0o755,
})
}
}

func createScopeWithRetry(ctx context.Context, cli fsmetaclient.Client, op fsmetacontract.Operation) error {
delay := 100 * time.Millisecond
for {
err := cli.Create(ctx, fsmeta.CreateRequest{
Mount: op.Mount,
Parent: op.Parent,
Name: op.Name,
Inode: op.Inode,
}, fsmeta.InodeRecord{
Type: op.Type,
Mode: op.Mode,
LinkCount: 1,
})
if err == nil || errors.Is(err, fsmeta.ErrExists) {
return nil
}
// Compose startup can return NotFound until the fsmeta gateway observes
// rooted mount/root admission. The scope create is an admission barrier,
// so retrying here keeps startup synchronization out of the generated
// correctness history.
if !nokverrors.Retryable(err) && !nokverrors.IsKind(err, nokverrors.KindNotFound) {
return err
}
timer := time.NewTimer(delay)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
}
if delay < time.Second {
delay *= 2
}
}
}

func externalHistoryOps(in []fsmetacontract.Operation, mount fsmeta.MountID, scopeInode, inodeBase fsmeta.InodeID) []fsmetacontract.Operation {
out := make([]fsmetacontract.Operation, 0, len(in))
for _, op := range in {
switch op.Kind {
case fsmetacontract.OpOpenWriteSession,
Expand All @@ -80,17 +130,35 @@ func externalHistoryOps(in []fsmetacontract.Operation, mount fsmeta.MountID, sco
continue
default:
op.Mount = mount
// The generated inodes are unique only within one in-memory script.
// Docker chaos runs multiple seeds against the same mounted system,
// so external histories must shift inode ids into the per-seed scope
// to avoid cross-seed namespace pollution.
op.Inode = scopeGeneratedInode(inodeBase, op.Inode)
if op.Parent == fsmeta.RootInode {
op.Parent = scopeInode
} else {
op.Parent = scopeGeneratedInode(inodeBase, op.Parent)
}
if op.FromParent == fsmeta.RootInode {
op.FromParent = scopeInode
} else {
op.FromParent = scopeGeneratedInode(inodeBase, op.FromParent)
}
if op.ToParent == fsmeta.RootInode {
op.ToParent = scopeInode
} else {
op.ToParent = scopeGeneratedInode(inodeBase, op.ToParent)
}
out = append(out, op)
}
}
return out
}

func scopeGeneratedInode(base, inode fsmeta.InodeID) fsmeta.InodeID {
if inode == 0 {
return 0
}
return base + inode
}
21 changes: 13 additions & 8 deletions cmd/nokv-fsmeta-history/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,28 @@ func TestExternalHistoryOpsScopesRootOperationsAndFiltersInternalSessions(t *tes
scopeName = "history-scope"
scopeInode = fsmeta.InodeID(9001)
)
scopeOp := scopeCreateOperation(mount, scopeName, scopeInode)
requireOp(t, scopeOp, fsmetacontract.OpCreate, mount, fsmeta.RootInode, scopeName, scopeInode)

ops := externalHistoryOps([]fsmetacontract.Operation{
{Kind: fsmetacontract.OpOpenWriteSession, Mount: "vol", Parent: fsmeta.RootInode, Inode: 10},
{Kind: fsmetacontract.OpCreate, Mount: "vol", Parent: fsmeta.RootInode, Name: "alpha", Inode: 11},
{Kind: fsmetacontract.OpRenameSubtree, Mount: "vol", FromParent: fsmeta.RootInode, FromName: "alpha", ToParent: fsmeta.RootInode, ToName: "beta"},
{Kind: fsmetacontract.OpLink, Mount: "vol", Parent: fsmeta.RootInode, Name: "link", Inode: 11},
{Kind: fsmetacontract.OpAdvanceTime, Mount: "vol", AdvanceNs: 1},
}, mount, scopeName, scopeInode)
}, mount, scopeInode, scopeInode)

if len(ops) != 4 {
t.Fatalf("filtered op count=%d, want 4: %#v", len(ops), ops)
if len(ops) != 3 {
t.Fatalf("filtered op count=%d, want 3: %#v", len(ops), ops)
}
requireOp(t, ops[0], fsmetacontract.OpCreate, mount, scopeInode, "alpha", scopeInode+11)
if ops[1].Mount != mount || ops[1].FromParent != scopeInode || ops[1].ToParent != scopeInode {
t.Fatalf("rename was not scoped into generated root: %#v", ops[1])
}
requireOp(t, ops[0], fsmetacontract.OpCreate, mount, fsmeta.RootInode, scopeName, scopeInode)
requireOp(t, ops[1], fsmetacontract.OpCreate, mount, scopeInode, "alpha", 11)
if ops[2].Mount != mount || ops[2].FromParent != scopeInode || ops[2].ToParent != scopeInode {
t.Fatalf("rename was not scoped into generated root: %#v", ops[2])
requireOp(t, ops[2], fsmetacontract.OpLink, mount, scopeInode, "link", scopeInode+11)
if got := scopeGeneratedInode(scopeInode, 0); got != 0 {
t.Fatalf("zero inode remapped to %d", got)
}
requireOp(t, ops[3], fsmetacontract.OpLink, mount, scopeInode, "link", 11)
}

func requireOp(t *testing.T, op fsmetacontract.Operation, kind fsmetacontract.OperationKind, mount fsmeta.MountID, parent fsmeta.InodeID, name string, inode fsmeta.InodeID) {
Expand Down
2 changes: 1 addition & 1 deletion cmd/nokv-fsmeta-soak/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func runRound(ctx context.Context, addr string, mount fsmeta.MountID, seed int64
if len(ops) == 0 {
return fmt.Errorf("generated no namespace operations")
}
if err := fsmetacontract.RunConcurrentBatches(roundCtx, cli, model, ops, batch); err != nil {
if err := fsmetacontract.RunConcurrentBatches(roundCtx, cli, model, ops, batch, fsmetacontract.HistoryOptions{}); err != nil {
return fmt.Errorf("namespace history: %w", err)
}
if err := runSessionProbe(roundCtx, cli, mount, seed); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions coordinator/catalog/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,14 @@ func (c *Cluster) validateRootEventAgainstSnapshot(snapshot rootstate.Snapshot,
return applyRootEventToRegionView(regions, event)
}

// ValidateRootEventAgainstSnapshot validates an event against a caller-provided
// rooted snapshot. Storage-backed coordinators use this to keep validation on
// the same authority view that will receive the append, even when the local
// cache is still catching up through watch/replay.
func (c *Cluster) ValidateRootEventAgainstSnapshot(snapshot rootstate.Snapshot, event rootevent.Event) error {
return c.validateRootEventAgainstSnapshot(snapshot, event)
}

func (c *Cluster) clonePendingPeerChanges() map[uint64]rootstate.PendingPeerChange {
if c == nil {
return make(map[uint64]rootstate.PendingPeerChange)
Expand Down
24 changes: 17 additions & 7 deletions coordinator/server/service_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (s *Service) PublishRootEvent(ctx context.Context, req *coordpb.PublishRoot
if err := s.requireExpectedClusterEpoch(req.GetExpectedClusterEpoch()); err != nil {
return nil, err
}
assessment, err := s.assessRootEventLifecycle(event)
assessment, validationSnapshot, storageBacked, err := s.assessRootEventLifecycle(event)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
Expand All @@ -379,7 +379,17 @@ func (s *Service) PublishRootEvent(ctx context.Context, req *coordpb.PublishRoot
resp.Accepted = true
return resp, nil
}
if err := s.cluster.ValidateRootEvent(event); err != nil {
var validationErr error
if storageBacked {
// Durable root storage is the write authority. A coordinator can receive
// a valid follow-up event before its local cache has replayed the earlier
// event, so validate against the loaded storage snapshot used for the
// lifecycle decision instead of the potentially stale cache.
validationErr = s.cluster.ValidateRootEventAgainstSnapshot(validationSnapshot, event)
} else {
validationErr = s.cluster.ValidateRootEvent(event)
}
if err := validationErr; err != nil {
switch {
case errors.Is(err, catalog.ErrInvalidRegionID), errors.Is(err, catalog.ErrInvalidMountID):
return nil, status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -410,16 +420,16 @@ func (s *Service) PublishRootEvent(ctx context.Context, req *coordpb.PublishRoot
return resp, nil
}

func (s *Service) assessRootEventLifecycle(event rootevent.Event) (rootstate.TransitionAssessment, error) {
func (s *Service) assessRootEventLifecycle(event rootevent.Event) (rootstate.TransitionAssessment, rootstate.Snapshot, bool, error) {
if s == nil || s.storage == nil {
if s == nil || s.cluster == nil {
return rootstate.TransitionAssessment{}, nil
return rootstate.TransitionAssessment{}, rootstate.Snapshot{}, false, nil
}
return s.cluster.ObserveRootEventLifecycle(event), nil
return s.cluster.ObserveRootEventLifecycle(event), rootstate.Snapshot{}, false, nil
}
snapshot, err := s.storage.Load()
if err != nil {
return rootstate.TransitionAssessment{}, fmt.Errorf("load rooted snapshot: %w", err)
return rootstate.TransitionAssessment{}, rootstate.Snapshot{}, false, fmt.Errorf("load rooted snapshot: %w", err)
}
rooted := rootstate.Snapshot{
Stores: snapshot.Stores,
Expand All @@ -432,7 +442,7 @@ func (s *Service) assessRootEventLifecycle(event rootevent.Event) (rootstate.Tra
}
assessment := rootstate.AssessTransition(rooted, event)
_, err = rootstate.EvaluateRootEventLifecycle(rooted, event)
return assessment, err
return assessment, rooted, true, err
}

// RemoveRegion deletes region metadata from the Coordinator in-memory catalog.
Expand Down
29 changes: 29 additions & 0 deletions coordinator/server/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ func (f *fakeStorage) AppendRootEvent(_ context.Context, event rootevent.Event)
LastCommitted: rootstate.Cursor{Term: 1, Index: uint64(f.eventCalls)},
},
Stores: rootstate.CloneStoreMemberships(f.snapshot.Stores),
SnapshotEpochs: rootstate.CloneSnapshotEpochs(f.snapshot.SnapshotEpochs),
Mounts: rootstate.CloneMounts(f.snapshot.Mounts),
Subtrees: rootstate.CloneSubtreeAuthorities(f.snapshot.Subtrees),
Quotas: rootstate.CloneQuotaFences(f.snapshot.Quotas),
Descriptors: rootCloneDescriptorsForTest(f.snapshot.Descriptors),
PendingPeerChanges: rootstate.ClonePendingPeerChanges(f.snapshot.PendingPeerChanges),
PendingRangeChanges: rootstate.ClonePendingRangeChanges(f.snapshot.PendingRangeChanges),
Expand Down Expand Up @@ -1674,6 +1677,32 @@ func TestServiceAssessRootEventUsesStorageSnapshot(t *testing.T) {
require.Equal(t, "peer:171:add:2:201", resp.GetAssessment().GetTransitionId())
}

func TestServicePublishRootEventValidatesAgainstStorageSnapshot(t *testing.T) {
cluster := catalog.NewCluster()
require.NoError(t, cluster.PublishRootEvent(rootevent.MountRegistered("default", 1, 1)))

var rooted rootstate.Snapshot
rootstate.ApplyEventToSnapshot(&rooted, rootstate.Cursor{Term: 1, Index: 1}, rootevent.MountRegistered("default", 1, 1))
rootstate.ApplyEventToSnapshot(&rooted, rootstate.Cursor{Term: 1, Index: 2}, rootevent.SubtreeHandoffStarted("default", 1, 21))
store := &fakeStorage{
leader: true,
snapshot: rootview.SnapshotFromRoot(rooted),
}
svc := NewService(cluster, idalloc.NewIDAllocator(1), tso.NewAllocator(1), store)

resp, err := svc.PublishRootEvent(context.Background(), &coordpb.PublishRootEventRequest{
Event: metawire.RootEventToProto(rootevent.SubtreeHandoffCompleted("default", 1, 21)),
})
require.NoError(t, err)
require.True(t, resp.GetAccepted())
require.Equal(t, 1, store.eventCalls)

subtree := store.snapshot.Subtrees[rootstate.SubtreeAuthorityKey("default", 1)]
require.Equal(t, rootstate.SubtreeAuthorityActive, subtree.State)
require.Equal(t, uint64(21), subtree.Frontier)
require.Equal(t, "default/1#1", subtree.AuthorityID)
}

func TestServicePublishRootEventSkipsCompletedSplitPlan(t *testing.T) {
cluster := catalog.NewCluster()
left := testDescriptor(141, []byte("a"), []byte("m"), metaregion.Epoch{Version: 2, ConfVersion: 1}, nil)
Expand Down
2 changes: 1 addition & 1 deletion coordinator/server/transition_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Service) AssessRootEvent(_ context.Context, req *coordpb.AssessRootEven
if err != nil {
return nil, status.Error(codes.Internal, "normalize root event: "+err.Error())
}
assessment, err := s.assessRootEventLifecycle(event)
assessment, _, _, err := s.assessRootEventLifecycle(event)
if err != nil {
return nil, status.Error(codes.FailedPrecondition, err.Error())
}
Expand Down
Loading
Loading