diff --git a/checkpoint.go b/checkpoint.go index de3c9185fd..394dbb3708 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/vfs" "github.com/cockroachdb/pebble/vfs/atomicfs" + "github.com/cockroachdb/pebble/wal" ) // checkpointOptions hold the optional parameters to construct checkpoint @@ -207,6 +208,11 @@ func (d *DB) Checkpoint( // before our call to List. allLogicalLogs := d.mu.log.manager.List() + // Grab the visible sequence number. The checkpoint's view of the state of + // the database will be equivalent to an iterator that acquired this same + // visible sequence number. + visibleSeqNum := d.mu.versions.visibleSeqNum.Load() + // Release the manifest and DB.mu so we don't block other operations on the // database. // @@ -391,24 +397,29 @@ func (d *DB) Checkpoint( } } - // Copy the WAL files. We copy rather than link because WAL file recycling - // will cause the WAL files to be reused which would invalidate the - // checkpoint. It's possible allLogicalLogs includes logs that are not - // relevant (beneath the version's MinUnflushedLogNum). These extra files - // are harmless. The earlier (wal.Manager).List call will not include - // obsolete logs that are sitting in the recycler or have already been - // passed off to the cleanup manager for deletion. + // Copy the WAL files. We copy rather than link for a few reasons: + // - WAL file recycling will cause the WAL files to be reused which + // would invalidate the checkpoint. + // - While we're performing our checkpoint, the latest WAL may still be + // receiving writes. We must exclude these writes, otherwise we'll + // violate the guarantee that the checkpoint is a consistent snapshot + // (we could copy a write that was committed after an ingest that was + // committed after we acquired our version). + // + // It's possible allLogicalLogs includes logs that are not relevant (beneath + // the version's MinUnflushedLogNum). These extra files are harmless. The + // earlier (wal.Manager).List call will not include obsolete logs that are + // sitting in the recycler or have already been passed off to the cleanup + // manager for deletion. // // TODO(jackson): It would be desirable to copy all recycling and obsolete // WALs to aid corruption postmortem debugging should we need them. for _, log := range allLogicalLogs { - for i := 0; i < log.NumSegments(); i++ { - srcFS, srcPath := log.SegmentLocation(i) - destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath)) - ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath) - if ckErr != nil { - return ckErr - } + ckErr = wal.Copy(fs, destDir, log, visibleSeqNum, record.LogWriterConfig{ + WriteWALSyncOffsets: func() bool { return formatVers > FormatWALSyncChunks }, + }) + if ckErr != nil { + return ckErr } } diff --git a/replay/testdata/replay b/replay/testdata/replay index c523829d8d..5166d053f8 100644 --- a/replay/testdata/replay +++ b/replay/testdata/replay @@ -18,7 +18,7 @@ tree 508 000007.sst 133 MANIFEST-000001 checkpoint/ - 25 000004.log + 11 000004.log 480 000005.sst 85 MANIFEST-000001 2769 OPTIONS-000003 diff --git a/replay/testdata/replay_ingest b/replay/testdata/replay_ingest index 58cef1ed13..23dac2756b 100644 --- a/replay/testdata/replay_ingest +++ b/replay/testdata/replay_ingest @@ -21,7 +21,7 @@ tree 172 MANIFEST-000001 209 MANIFEST-000008 checkpoint/ - 25 000002.log + 11 000002.log 678 000004.sst 661 000005.sst 172 MANIFEST-000001 diff --git a/replay/testdata/replay_paced b/replay/testdata/replay_paced index d332e6a52e..74dff20f1d 100644 --- a/replay/testdata/replay_paced +++ b/replay/testdata/replay_paced @@ -23,7 +23,7 @@ tree checkpoint/ 758 000005.sst 454 000007.sst - 39 000008.log + 11 000008.log 454 000009.sst 157 MANIFEST-000010 2769 OPTIONS-000003 diff --git a/replay/testdata/replay_val_sep b/replay/testdata/replay_val_sep index e1ab194e28..d6b257aaca 100644 --- a/replay/testdata/replay_val_sep +++ b/replay/testdata/replay_val_sep @@ -29,7 +29,7 @@ tree 101 000006.blob 766 000008.sst 97 000009.blob - 64 000011.log + 11 000011.log 660 000012.sst 187 MANIFEST-000013 2906 OPTIONS-000003 diff --git a/testdata/checkpoint b/testdata/checkpoint index 4528da950e..31a4db135c 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -155,8 +155,8 @@ sync-data: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 -open: db/000006.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint1/000006.log +open: db/000006.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint1/000006.log close: checkpoints/checkpoint1/000006.log close: db/000006.log @@ -198,8 +198,8 @@ sync-data: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 -open: db/000006.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint2/000006.log +open: db/000006.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint2/000006.log close: checkpoints/checkpoint2/000006.log close: db/000006.log @@ -238,8 +238,8 @@ sync-data: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 -open: db/000006.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint3/000006.log +open: db/000006.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint3/000006.log close: checkpoints/checkpoint3/000006.log close: db/000006.log @@ -563,8 +563,8 @@ sync-data: checkpoints/checkpoint4/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint4/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint4 close: checkpoints/checkpoint4 -open: db/000008.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint4/000008.log +open: db/000008.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint4/000008.log close: checkpoints/checkpoint4/000008.log close: db/000008.log @@ -672,8 +672,8 @@ sync-data: checkpoints/checkpoint5/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint5/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint5 close: checkpoints/checkpoint5 -open: db/000008.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint5/000008.log +open: db/000008.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint5/000008.log close: checkpoints/checkpoint5/000008.log close: db/000008.log @@ -773,8 +773,8 @@ sync-data: checkpoints/checkpoint6/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint6/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint6 close: checkpoints/checkpoint6 -open: db/000008.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint6/000008.log +open: db/000008.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint6/000008.log close: checkpoints/checkpoint6/000008.log close: db/000008.log @@ -1008,11 +1008,11 @@ sync-data: checkpoints/checkpoint8/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint8/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint8 close: checkpoints/checkpoint8 -open: valsepdb/000004.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint8/000004.log +open: valsepdb/000004.log (options: *vfs.sequentialReadsOption) +close: valsepdb/000004.log sync-data: checkpoints/checkpoint8/000004.log close: checkpoints/checkpoint8/000004.log -close: valsepdb/000004.log sync: checkpoints/checkpoint8 close: checkpoints/checkpoint8 @@ -1122,8 +1122,8 @@ sync-data: checkpoints/checkpoint9/marker.manifest.000001.MANIFEST-000001 close: checkpoints/checkpoint9/marker.manifest.000001.MANIFEST-000001 sync: checkpoints/checkpoint9 close: checkpoints/checkpoint9 -open: valsepdb/000007.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint9/000007.log +open: valsepdb/000007.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint9/000007.log close: checkpoints/checkpoint9/000007.log close: valsepdb/000007.log diff --git a/testdata/checkpoint_shared b/testdata/checkpoint_shared index ab3cc140a1..5d8d100343 100644 --- a/testdata/checkpoint_shared +++ b/testdata/checkpoint_shared @@ -152,8 +152,8 @@ sync-data: checkpoints/checkpoint1/marker.remote-obj-catalog.000001.REMOTE-OBJ-C close: checkpoints/checkpoint1/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 -open: db/000006.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint1/000006.log +open: db/000006.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint1/000006.log close: checkpoints/checkpoint1/000006.log close: db/000006.log @@ -205,8 +205,8 @@ sync-data: checkpoints/checkpoint2/marker.remote-obj-catalog.000001.REMOTE-OBJ-C close: checkpoints/checkpoint2/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 -open: db/000006.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint2/000006.log +open: db/000006.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint2/000006.log close: checkpoints/checkpoint2/000006.log close: db/000006.log @@ -254,8 +254,8 @@ sync-data: checkpoints/checkpoint3/marker.remote-obj-catalog.000001.REMOTE-OBJ-C close: checkpoints/checkpoint3/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 -open: db/000006.log (options: *vfs.sequentialReadsOption) create: checkpoints/checkpoint3/000006.log +open: db/000006.log (options: *vfs.sequentialReadsOption) sync-data: checkpoints/checkpoint3/000006.log close: checkpoints/checkpoint3/000006.log close: db/000006.log diff --git a/testdata/event_listener b/testdata/event_listener index 6661496b04..018f7494a2 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -500,11 +500,11 @@ sync-data: checkpoint/marker.manifest.000001.MANIFEST-000023 close: checkpoint/marker.manifest.000001.MANIFEST-000023 sync: checkpoint close: checkpoint -open: wal/000021.log (options: *vfs.sequentialReadsOption) create: checkpoint/000021.log +open: wal/000021.log (options: *vfs.sequentialReadsOption) +close: wal/000021.log sync-data: checkpoint/000021.log close: checkpoint/000021.log -close: wal/000021.log sync: checkpoint close: checkpoint diff --git a/wal/reader.go b/wal/reader.go index cfc2f7665f..6167e40835 100644 --- a/wal/reader.go +++ b/wal/reader.go @@ -247,11 +247,16 @@ var _ Reader = (*virtualWALReader)(nil) // are no more records. The reader returned becomes stale after the next // NextRecord call, and should no longer be used. func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { + rec, _, off, err := r.nextRecord() + return rec, off, err +} + +func (r *virtualWALReader) nextRecord() (io.Reader, batchrepr.Header, Offset, error) { // On the first call, we need to open the first file. if r.currIndex < 0 { err := r.nextFile() if err != nil { - return nil, Offset{}, err + return nil, batchrepr.Header{}, Offset{}, err } } @@ -264,7 +269,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { // This file is exhausted; continue to the next. err := r.nextFile() if err != nil { - return nil, r.off, err + return nil, batchrepr.Header{}, r.off, err } continue } @@ -290,11 +295,11 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { // in-flight write at the time of process exit/crash. See #453. if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 { if err := r.nextFile(); err != nil { - return nil, r.off, err + return nil, batchrepr.Header{}, r.off, err } continue } else if err != nil { - return nil, r.off, err + return nil, batchrepr.Header{}, r.off, err } // We may observe repeat records between the physical files that make up @@ -314,7 +319,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { // corruption. We could return the record to the caller, allowing // the caller to interpret it as corruption, but it seems safer to // be explicit and surface the corruption error here. - return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch", + return nil, h, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch", r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex)) } @@ -337,7 +342,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) { continue } r.lastSeqNum = h.SeqNum - return &r.recordBuf, r.off, nil + return &r.recordBuf, h, r.off, nil } } @@ -376,3 +381,67 @@ func (r *virtualWALReader) nextFile() error { r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num)) return nil } + +// Copy copies the contents of the provided LogicalLog to a new WAL file on fs +// within dstDir. The copy will be a logical copy and will not include the +// source WAL's structure if split across multiple physical segment files. Copy +// will only copy the prefix of the WAL up until visibleSeqNum (exclusive). +// +// Copy does NOT sync the destination directory, and the caller must explicitly +// sync it if they require the new WAL file to be durably linked. +// +// The WAL file identified by ll may be written concurrently. +func Copy( + fs vfs.FS, dstDir string, ll LogicalLog, visibleSeqNum base.SeqNum, cfg record.LogWriterConfig, +) (err error) { + dstPath := fs.PathJoin(dstDir, makeLogFilename(ll.Num, 0)) + var dstFile vfs.File + dstFile, err = fs.Create(dstPath, vfs.WriteCategoryUnspecified) + if err != nil { + return err + } + w := record.NewLogWriter(dstFile, base.DiskFileNum(ll.Num), cfg) + + r := newVirtualWALReader(ll) + defer func() { err = errors.CombineErrors(err, r.Close()) }() + for { + rec, h, _, err := r.nextRecord() + if errors.Is(err, io.EOF) || errors.Is(err, record.ErrUnexpectedEOF) { + break + } + if err != nil { + return errors.Wrapf(err, "copying WAL file %s", ll.Num) + } + // The visible sequence number indicates that all keys written with + // sequence numbers strictly less than visibleSeqNum are visible. Any + // batches with sequence numbers greater than or equal to visibleSeqNum + // are not visible and indicate we can stop. + if h.SeqNum >= visibleSeqNum { + break + } + // A batch assigns sequence numbers beginning at the header's SeqNum, + // increasing for successive internal keys. We expect that if the + // batch's first sequence number is visible, all the keys should be + // visible, because the visible sequence number is ratcheted up + // atomically when a batch is committed. If this doeesn't hold, error + // out. + batchLargestSeqNum := h.SeqNum + base.SeqNum(h.Count-1) + if batchLargestSeqNum >= visibleSeqNum { + return errors.AssertionFailedf("batch's sequence numbers %s-%s unexpectedly straddle the visible sequence number %s", + h.SeqNum, batchLargestSeqNum, visibleSeqNum) + } + b, err := io.ReadAll(rec) + if err != nil { + return errors.Wrapf(err, "copying WAL file %s", ll.Num) + } + if _, err := w.WriteRecord(b); err != nil { + return errors.Wrapf(err, "copying WAL file %s", ll.Num) + } + } + // Close on the LogWriter will sync the file (but not the containing + // directory). + if err := w.Close(); err != nil { + return errors.Wrapf(err, "copying WAL file %s", ll.Num) + } + return err +} diff --git a/wal/reader_test.go b/wal/reader_test.go index a5db56b68e..499526bb01 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -15,6 +15,7 @@ import ( "strings" "sync" "testing" + "unicode" "github.com/cockroachdb/datadriven" "github.com/cockroachdb/pebble/batchrepr" @@ -97,9 +98,18 @@ func TestList(t *testing.T) { // TestReader tests the virtual WAL reader that merges across multiple physical // log files. func TestReader(t *testing.T) { - fs := vfs.NewCrashableMem() rng := rand.New(rand.NewPCG(1, 1)) var buf bytes.Buffer + var fs vfs.FS + var memFS *vfs.MemFS + setFS := func(mem *vfs.MemFS) { + memFS = mem + fs = vfs.WithLogging(mem, func(format string, args ...interface{}) { + s := fmt.Sprintf("# "+format, args...) + fmt.Fprintln(&buf, strings.TrimRightFunc(s, unicode.IsSpace)) + }) + } + setFS(vfs.NewCrashableMem()) datadriven.RunTest(t, "testdata/reader", func(t *testing.T, td *datadriven.TestData) string { buf.Reset() switch td.Cmd { @@ -202,19 +212,43 @@ func TestReader(t *testing.T) { } } if td.HasArg("close-unclean") { - crashFS := fs.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0}) + crashFS := memFS.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0}) require.NoError(t, w.Close()) - fs = crashFS + setFS(crashFS) } else { require.NoError(t, w.Close()) } return buf.String() + case "copy": + var logNum uint64 + var copyDir string + var visibleSeqNum uint64 + td.ScanArgs(t, "logNum", &logNum) + td.ScanArgs(t, "copyDir", ©Dir) + td.ScanArgs(t, "visibleSeqNum", &visibleSeqNum) + logs, err := Scan(Dir{FS: fs}) + require.NoError(t, err) + log, ok := logs.Get(NumWAL(logNum)) + if !ok { + return fmt.Sprintf("log with logNum %d not found", logNum) + } + require.NoError(t, fs.MkdirAll(copyDir, os.ModePerm)) + cfg := record.LogWriterConfig{ + WriteWALSyncOffsets: func() bool { return false }, + } + err = Copy(fs, copyDir, log, base.SeqNum(visibleSeqNum), cfg) + if err != nil { + return err.Error() + } + return buf.String() case "read": var logNum uint64 var forceLogNameIndexes []uint64 + dirname := "" td.ScanArgs(t, "logNum", &logNum) td.MaybeScanArgs(t, "forceLogNameIndexes", &forceLogNameIndexes) - logs, err := Scan(Dir{FS: fs}) + td.MaybeScanArgs(t, "dirname", &dirname) + logs, err := Scan(Dir{Dirname: dirname, FS: fs}) require.NoError(t, err) log, ok := logs.Get(NumWAL(logNum)) if !ok { diff --git a/wal/testdata/reader b/wal/testdata/reader index bc2fe6f6a5..09714a87e7 100644 --- a/wal/testdata/reader +++ b/wal/testdata/reader @@ -7,13 +7,21 @@ batch count=3 size=1024 seq=1 batch count=2 size=30 seq=20 batch count=50 size=512000 seq=21 sync ---- +# create: 000001.log created "000001.log" +# open-dir: +# sync: +# close: 0..1035: batch #1 1035..1076: batch #20 +# sync: 000001.log 1076..513252: batch #21 +# sync: 000001.log +# close: 000001.log read logNum=000001 ---- +# open: 000001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000001.log: 0), ) io.ReadAll(rr) = ("0100000000000000030000004892e32448351fc1f9e2d52044496ca2cd51999f... <1024-byte record>", ) BatchHeader: [seqNum=1,count=3] @@ -23,6 +31,7 @@ r.NextRecord() = (rr, (000001.log: 1035), ) r.NextRecord() = (rr, (000001.log: 1076), ) io.ReadAll(rr) = ("150000000000000032000000fcbdf64fad60c5383b3f201ec0b234ca2f21a4f7... <512000-byte record>", ) BatchHeader: [seqNum=21,count=50] +# close: 000001.log r.NextRecord() = (rr, (000001.log: 513252), EOF) # Add a new physical file for the same logical log, this one with a batch that @@ -36,13 +45,20 @@ batch count=2 seq=22 size=412 batch count=0 seq=24 size=64 batch count=1 seq=24 size=100 ---- +# create: 000001-001.log created "000001-001.log" +# open-dir: +# sync: +# close: 0..423: batch #22 423..498: batch #24 498..609: batch #24 +# sync: 000001-001.log +# close: 000001-001.log read logNum=000001 ---- +# open: 000001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000001.log: 0), ) io.ReadAll(rr) = ("0100000000000000030000004892e32448351fc1f9e2d52044496ca2cd51999f... <1024-byte record>", ) BatchHeader: [seqNum=1,count=3] @@ -52,12 +68,15 @@ r.NextRecord() = (rr, (000001.log: 1035), ) r.NextRecord() = (rr, (000001.log: 1076), ) io.ReadAll(rr) = ("150000000000000032000000fcbdf64fad60c5383b3f201ec0b234ca2f21a4f7... <512000-byte record>", ) BatchHeader: [seqNum=21,count=50] +# close: 000001.log +# open: 000001-001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000001-001.log: 0), 513252 from previous files, ) io.ReadAll(rr) = ("160000000000000002000000eb2b0fd29c3e15ed510704c8c9ae977d8e6df815... <412-byte record>", ) BatchHeader: [seqNum=22,count=2] r.NextRecord() = (rr, (000001-001.log: 498), 513252 from previous files, ) io.ReadAll(rr) = ("1800000000000000010000004c163720a7957d7d24986efbc7e26d6194d09fdc... <100-byte record>", ) BatchHeader: [seqNum=24,count=1] +# close: 000001-001.log r.NextRecord() = (rr, (000001-001.log: 609), 513252 from previous files, EOF) # Test a recycled log file. Recycle 000001.log as 000002.log. This time, do not @@ -69,10 +88,19 @@ batch count=10 size=100 seq=10 sync batch count=22 size=150 seq=20 sync batch count=1 size=64000 seq=42 sync ---- +# reuseForWrite: 000001.log -> 000002.log recycled "000001.log" as "000002.log" +# open-dir: +# sync: +# close: +# sync: 000002.log 0..111: batch #10 +# sync: 000002.log 111..272: batch #20 +# sync: 000002.log 272..64294: batch #42 +# sync: 000002.log +# close: 000002.log # Reading a recycled log file with an unclean close can result in an error at # the tail of the file; eg, "invalid chunk." This is okay and ignored by Open as @@ -81,6 +109,7 @@ recycled "000001.log" as "000002.log" read logNum=000002 ---- +# open: 000002.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000002.log: 0), ) io.ReadAll(rr) = ("0a000000000000000a000000afd43820144bc9d4446adeaacc407cd91c971e79... <100-byte record>", ) BatchHeader: [seqNum=10,count=10] @@ -91,6 +120,52 @@ r.NextRecord() = (rr, (000002.log: 272), ) io.ReadAll(rr) = ("2a0000000000000001000000ec8367c42ebf0ffad5c57ece37b18559ba95ad78... <64000-byte record>", ) BatchHeader: [seqNum=42,count=1] r.NextRecord() = (rr, (000002.log: 64294), pebble/record: unexpected EOF) +# close: 000002.log + +copy logNum=000002 copyDir=fullcopy000002 visibleSeqNum=100 +---- +# mkdir-all: fullcopy000002 0777 +# create: fullcopy000002/000002.log +# open: 000002.log (options: *vfs.sequentialReadsOption) +# sync: fullcopy000002/000002.log +# close: fullcopy000002/000002.log +# close: 000002.log + +read logNum=000002 dirname=fullcopy000002 +---- +# open: fullcopy000002/000002.log (options: *vfs.sequentialReadsOption) +r.NextRecord() = (rr, (fullcopy000002/000002.log: 0), ) + io.ReadAll(rr) = ("0a000000000000000a000000afd43820144bc9d4446adeaacc407cd91c971e79... <100-byte record>", ) + BatchHeader: [seqNum=10,count=10] +r.NextRecord() = (rr, (fullcopy000002/000002.log: 111), ) + io.ReadAll(rr) = ("140000000000000016000000fe05c7d6c9bbd0d55b0da8b6943848245b384bf1... <150-byte record>", ) + BatchHeader: [seqNum=20,count=22] +r.NextRecord() = (rr, (fullcopy000002/000002.log: 272), ) + io.ReadAll(rr) = ("2a0000000000000001000000ec8367c42ebf0ffad5c57ece37b18559ba95ad78... <64000-byte record>", ) + BatchHeader: [seqNum=42,count=1] +# close: fullcopy000002/000002.log +r.NextRecord() = (rr, (fullcopy000002/000002.log: 64294), EOF) + +copy logNum=000002 copyDir=partialcopy000002 visibleSeqNum=42 +---- +# mkdir-all: partialcopy000002 0777 +# create: partialcopy000002/000002.log +# open: 000002.log (options: *vfs.sequentialReadsOption) +# sync: partialcopy000002/000002.log +# close: partialcopy000002/000002.log +# close: 000002.log + +read logNum=000002 dirname=partialcopy000002 +---- +# open: partialcopy000002/000002.log (options: *vfs.sequentialReadsOption) +r.NextRecord() = (rr, (partialcopy000002/000002.log: 0), ) + io.ReadAll(rr) = ("0a000000000000000a000000afd43820144bc9d4446adeaacc407cd91c971e79... <100-byte record>", ) + BatchHeader: [seqNum=10,count=10] +r.NextRecord() = (rr, (partialcopy000002/000002.log: 111), ) + io.ReadAll(rr) = ("140000000000000016000000fe05c7d6c9bbd0d55b0da8b6943848245b384bf1... <150-byte record>", ) + BatchHeader: [seqNum=20,count=22] +# close: partialcopy000002/000002.log +r.NextRecord() = (rr, (partialcopy000002/000002.log: 272), EOF) # Test a typical failure scenario. Start off with a recycled log file (000003) # that would be on the primary device. It closes "unclean" because we're unable @@ -101,10 +176,18 @@ batch count=10 size=100 seq=10 sync batch count=22 size=150 seq=20 batch count=1 size=20 seq=42 sync ---- +# reuseForWrite: 000002.log -> 000003.log recycled "000002.log" as "000003.log" +# open-dir: +# sync: +# close: +# sync: 000003.log 0..111: batch #10 111..272: batch #20 +# sync: 000003.log 272..303: batch #42 +# sync: 000003.log +# close: 000003.log # Then the WAL fails over to a new physical WAL file on a new device. The last # two batches of previous WAL are duplicated. @@ -115,16 +198,26 @@ batch count=1 size=20 seq=42 sync batch count=3 size=80 seq=43 sync batch count=9 size=2055 seq=46 sync ---- +# create: 000003-001.log created "000003-001.log" +# open-dir: +# sync: +# close: 0..161: batch #20 +# sync: 000003-001.log 161..192: batch #42 +# sync: 000003-001.log 192..283: batch #43 +# sync: 000003-001.log 283..2349: batch #46 +# sync: 000003-001.log +# close: 000003-001.log # Reading the log file should transparently deduplicate the repeated batches. read logNum=000003 ---- +# open: 000003.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000003.log: 0), ) io.ReadAll(rr) = ("0a000000000000000a00000062ab232e0552e5fd6091f6e21102bf30aaa79627... <100-byte record>", ) BatchHeader: [seqNum=10,count=10] @@ -134,12 +227,15 @@ r.NextRecord() = (rr, (000003.log: 111), ) r.NextRecord() = (rr, (000003.log: 272), ) io.ReadAll(rr) = ("2a00000000000000010000000000000000000000", ) BatchHeader: [seqNum=42,count=1] +# close: 000003.log +# open: 000003-001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000003-001.log: 192), 303 from previous files, ) io.ReadAll(rr) = ("2b0000000000000003000000f3eb964c6fd7dd151f60c3031282d500dad32aad... <80-byte record>", ) BatchHeader: [seqNum=43,count=3] r.NextRecord() = (rr, (000003-001.log: 283), 303 from previous files, ) io.ReadAll(rr) = ("2e00000000000000090000004d9b33adaebf7365f3192f4e20106a03e9241a88... <2055-byte record>", ) BatchHeader: [seqNum=46,count=9] +# close: 000003-001.log r.NextRecord() = (rr, (000003-001.log: 2349), 303 from previous files, EOF) # Extend logical log file 000003 with another log file, the result of failing @@ -150,13 +246,23 @@ batch count=3 size=80 seq=43 sync batch count=9 size=2055 seq=46 sync batch count=2 size=205 seq=55 sync ---- +# create: 000003-002.log created "000003-002.log" +# open-dir: +# sync: +# close: +# sync: 000003-002.log 0..91: batch #43 +# sync: 000003-002.log 91..2157: batch #46 +# sync: 000003-002.log 2157..2373: batch #55 +# sync: 000003-002.log +# close: 000003-002.log read logNum=000003 ---- +# open: 000003.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000003.log: 0), ) io.ReadAll(rr) = ("0a000000000000000a00000062ab232e0552e5fd6091f6e21102bf30aaa79627... <100-byte record>", ) BatchHeader: [seqNum=10,count=10] @@ -166,17 +272,59 @@ r.NextRecord() = (rr, (000003.log: 111), ) r.NextRecord() = (rr, (000003.log: 272), ) io.ReadAll(rr) = ("2a00000000000000010000000000000000000000", ) BatchHeader: [seqNum=42,count=1] +# close: 000003.log +# open: 000003-001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000003-001.log: 192), 303 from previous files, ) io.ReadAll(rr) = ("2b0000000000000003000000f3eb964c6fd7dd151f60c3031282d500dad32aad... <80-byte record>", ) BatchHeader: [seqNum=43,count=3] r.NextRecord() = (rr, (000003-001.log: 283), 303 from previous files, ) io.ReadAll(rr) = ("2e00000000000000090000004d9b33adaebf7365f3192f4e20106a03e9241a88... <2055-byte record>", ) BatchHeader: [seqNum=46,count=9] +# close: 000003-001.log +# open: 000003-002.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000003-002.log: 2157), 2652 from previous files, ) io.ReadAll(rr) = ("3700000000000000020000000f17a02ef83a97ceae6703f0d784c2497a2021a5... <205-byte record>", ) BatchHeader: [seqNum=55,count=2] +# close: 000003-002.log r.NextRecord() = (rr, (000003-002.log: 2373), 2652 from previous files, EOF) +copy logNum=000003 copyDir=fullcopy000003 visibleSeqNum=100 +---- +# mkdir-all: fullcopy000003 0777 +# create: fullcopy000003/000003.log +# open: 000003.log (options: *vfs.sequentialReadsOption) +# close: 000003.log +# open: 000003-001.log (options: *vfs.sequentialReadsOption) +# close: 000003-001.log +# open: 000003-002.log (options: *vfs.sequentialReadsOption) +# close: 000003-002.log +# sync: fullcopy000003/000003.log +# close: fullcopy000003/000003.log + +read logNum=000003 dirname=fullcopy000003 +---- +# open: fullcopy000003/000003.log (options: *vfs.sequentialReadsOption) +r.NextRecord() = (rr, (fullcopy000003/000003.log: 0), ) + io.ReadAll(rr) = ("0a000000000000000a00000062ab232e0552e5fd6091f6e21102bf30aaa79627... <100-byte record>", ) + BatchHeader: [seqNum=10,count=10] +r.NextRecord() = (rr, (fullcopy000003/000003.log: 111), ) + io.ReadAll(rr) = ("140000000000000016000000966b624625070dadfd22ddb0d5d7cc582718a5d6... <150-byte record>", ) + BatchHeader: [seqNum=20,count=22] +r.NextRecord() = (rr, (fullcopy000003/000003.log: 272), ) + io.ReadAll(rr) = ("2a00000000000000010000000000000000000000", ) + BatchHeader: [seqNum=42,count=1] +r.NextRecord() = (rr, (fullcopy000003/000003.log: 303), ) + io.ReadAll(rr) = ("2b0000000000000003000000f3eb964c6fd7dd151f60c3031282d500dad32aad... <80-byte record>", ) + BatchHeader: [seqNum=43,count=3] +r.NextRecord() = (rr, (fullcopy000003/000003.log: 394), ) + io.ReadAll(rr) = ("2e00000000000000090000004d9b33adaebf7365f3192f4e20106a03e9241a88... <2055-byte record>", ) + BatchHeader: [seqNum=46,count=9] +r.NextRecord() = (rr, (fullcopy000003/000003.log: 2460), ) + io.ReadAll(rr) = ("3700000000000000020000000f17a02ef83a97ceae6703f0d784c2497a2021a5... <205-byte record>", ) + BatchHeader: [seqNum=55,count=2] +# close: fullcopy000003/000003.log +r.NextRecord() = (rr, (fullcopy000003/000003.log: 2676), EOF) + # Test reading a log file that does not exist. read logNum=000004 @@ -190,18 +338,28 @@ define logNum=000004 batch count=1 seq=1 size=20 sync batch size=5 sync ---- +# create: 000004.log created "000004.log" +# open-dir: +# sync: +# close: +# sync: 000004.log 0..31: batch #1 +# sync: 000004.log 31..47: batch #0 +# sync: 000004.log +# close: 000004.log # Reading the corrupt batch should error with a corruption error. read logNum=000004 ---- +# open: 000004.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000004.log: 0), ) io.ReadAll(rr) = ("0100000000000000010000000000000000000000", ) BatchHeader: [seqNum=1,count=1] r.NextRecord() = (rr, (000004.log: 31), pebble: corrupt log file logNum=4, logNameIndex=000: invalid batch) +# close: 000004.log # Test a two segment log file where the second log file ends in garbage. The # invalid chunk error of the final log file should be propagated up. @@ -212,11 +370,21 @@ batch count=9 seq=95226 size=295 sync batch count=8 seq=95235 size=2525 sync batch count=256 seq=95243 size=2566 sync ---- +# create: 000005.log created "000005.log" +# open-dir: +# sync: +# close: +# sync: 000005.log 0..603: batch #95225 +# sync: 000005.log 603..909: batch #95226 +# sync: 000005.log 909..3445: batch #95235 +# sync: 000005.log 3445..6022: batch #95243 +# sync: 000005.log +# close: 000005.log define logNum=000005 logNameIndex=001 unclean-close batch count=2 seq=95499 size=44 sync @@ -224,14 +392,25 @@ batch count=5 seq=95501 size=416 sync batch count=29 seq=95506 size=199 sync write-garbage size=353 sync ---- +# create: 000005-001.log created "000005-001.log" +# open-dir: +# sync: +# close: +# sync: 000005-001.log 0..55: batch #95499 +# sync: 000005-001.log 55..482: batch #95501 +# sync: 000005-001.log 482..692: batch #95506 +# sync: 000005-001.log 692..1045: write-garbage +# sync: 000005-001.log +# close: 000005-001.log read logNum=000005 ---- +# open: 000005.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000005.log: 0), ) io.ReadAll(rr) = ("f97301000000000001000000602fdb1fc7d9daaa4dd5741e5a8b07423f81d57e... <592-byte record>", ) BatchHeader: [seqNum=95225,count=1] @@ -244,6 +423,8 @@ r.NextRecord() = (rr, (000005.log: 909), ) r.NextRecord() = (rr, (000005.log: 3445), ) io.ReadAll(rr) = ("0b7401000000000000010000313db6e445a8213e7224d87caf90dcfe6eecf25a... <2566-byte record>", ) BatchHeader: [seqNum=95243,count=256] +# close: 000005.log +# open: 000005-001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000005-001.log: 0), 6022 from previous files, ) io.ReadAll(rr) = ("0b75010000000000020000003faf62c0166a4c4efa66d3c426c1979f1ada38c0... <44-byte record>", ) BatchHeader: [seqNum=95499,count=2] @@ -254,6 +435,7 @@ r.NextRecord() = (rr, (000005-001.log: 482), 6022 from previous files, ) io.ReadAll(rr) = ("12750100000000001d0000007575c6296b096226e5e78b9760aa7c2ecfa913b6... <199-byte record>", ) BatchHeader: [seqNum=95506,count=29] r.NextRecord() = (rr, (000005-001.log: 692), 6022 from previous files, pebble/record: unexpected EOF) +# close: 000005-001.log # Read again, this time pretending we found a third segment with the # logNameIndex=002. This helps exercise error conditions switching to a new @@ -261,6 +443,7 @@ r.NextRecord() = (rr, (000005-001.log: 692), 6022 from previous files, pebble/re read logNum=000005 forceLogNameIndexes=(002) ---- +# open: 000005.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000005.log: 0), ) io.ReadAll(rr) = ("f97301000000000001000000602fdb1fc7d9daaa4dd5741e5a8b07423f81d57e... <592-byte record>", ) BatchHeader: [seqNum=95225,count=1] @@ -273,6 +456,8 @@ r.NextRecord() = (rr, (000005.log: 909), ) r.NextRecord() = (rr, (000005.log: 3445), ) io.ReadAll(rr) = ("0b7401000000000000010000313db6e445a8213e7224d87caf90dcfe6eecf25a... <2566-byte record>", ) BatchHeader: [seqNum=95243,count=256] +# close: 000005.log +# open: 000005-001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000005-001.log: 0), 6022 from previous files, ) io.ReadAll(rr) = ("0b75010000000000020000003faf62c0166a4c4efa66d3c426c1979f1ada38c0... <44-byte record>", ) BatchHeader: [seqNum=95499,count=2] @@ -282,6 +467,8 @@ r.NextRecord() = (rr, (000005-001.log: 55), 6022 from previous files, ) r.NextRecord() = (rr, (000005-001.log: 482), 6022 from previous files, ) io.ReadAll(rr) = ("12750100000000001d0000007575c6296b096226e5e78b9760aa7c2ecfa913b6... <199-byte record>", ) BatchHeader: [seqNum=95506,count=29] +# close: 000005-001.log +# open: 000005-002.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000005-002.log: 0), 6714 from previous files, opening WAL file segment "000005-002.log": open 000005-002.log: file does not exist) # Test a scenario where 4 unique batches are split across three physical log @@ -293,16 +480,32 @@ batch count=3 seq=535 size=395 sync batch count=2 seq=538 size=93666 sync batch count=1 seq=540 size=180 sync ---- +# create: 000006.log created "000006.log" +# open-dir: +# sync: +# close: +# sync: 000006.log 0..406: batch #535 +# sync: 000006.log 406..94105: batch #538 +# sync: 000006.log 94105..94296: batch #540 +# sync: 000006.log +# close: 000006.log define logNum=000006 logNameIndex=001 unclean-close batch count=2 seq=538 size=93666 sync ---- +# create: 000006-001.log created "000006-001.log" +# open-dir: +# sync: +# close: +# sync: 000006-001.log 0..93699: batch #538 +# sync: 000006-001.log +# close: 000006-001.log define logNum=000006 logNameIndex=001 unclean-close @@ -310,13 +513,23 @@ batch count=2 seq=538 size=93666 sync batch count=1 seq=540 size=180 sync batch count=5 seq=541 size=2055 sync ---- +# create: 000006-001.log created "000006-001.log" +# open-dir: +# sync: +# close: +# sync: 000006-001.log 0..93699: batch #538 +# sync: 000006-001.log 93699..93890: batch #540 +# sync: 000006-001.log 93890..95956: batch #541 +# sync: 000006-001.log +# close: 000006-001.log read logNum=000006 ---- +# open: 000006.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000006.log: 0), ) io.ReadAll(rr) = ("17020000000000000300000080d4f05e2b4d5d702d779b34ebc6e3202c8b3680... <395-byte record>", ) BatchHeader: [seqNum=535,count=3] @@ -326,9 +539,12 @@ r.NextRecord() = (rr, (000006.log: 406), ) r.NextRecord() = (rr, (000006.log: 94105), ) io.ReadAll(rr) = ("1c0200000000000001000000f36197eb67e130dc302fed040b972901649d2813... <180-byte record>", ) BatchHeader: [seqNum=540,count=1] +# close: 000006.log +# open: 000006-001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000006-001.log: 93890), 94296 from previous files, ) io.ReadAll(rr) = ("1d020000000000000500000071ce0052b80db1237c9bfa873ecdba4a8cf6f94e... <2055-byte record>", ) BatchHeader: [seqNum=541,count=5] +# close: 000006-001.log r.NextRecord() = (rr, (000006-001.log: 95956), 94296 from previous files, EOF) # Test corrupting the tail of a batch that's large enough to be split into @@ -341,23 +557,43 @@ batch count=29 seq=95506 size=199 sync batch count=19 seq=95535 size=45991 sync corrupt-tail len=1020 ---- +# create: 000007.log created "000007.log" +# open-dir: +# sync: +# close: +# sync: 000007.log 0..55: batch #95499 +# sync: 000007.log 55..482: batch #95501 +# sync: 000007.log 482..692: batch #95506 +# sync: 000007.log 692..46705: batch #95535 +# write-at(45685, 1020): 000007.log 45685..46705: corrupt-tail +# sync: 000007.log +# close: 000007.log define logNum=000007 logNameIndex=001 batch count=19 seq=95535 size=45991 sync batch count=19 seq=95554 size=292 sync ---- +# create: 000007-001.log created "000007-001.log" +# open-dir: +# sync: +# close: +# sync: 000007-001.log 0..46013: batch #95535 +# sync: 000007-001.log 46013..46316: batch #95554 +# sync: 000007-001.log +# close: 000007-001.log read logNum=000007 ---- +# open: 000007.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000007.log: 0), ) io.ReadAll(rr) = ("0b750100000000000200000012c40408ad4f401c0cf9c8bb22f284fdaba4c168... <44-byte record>", ) BatchHeader: [seqNum=95499,count=2] @@ -367,10 +603,13 @@ r.NextRecord() = (rr, (000007.log: 55), ) r.NextRecord() = (rr, (000007.log: 482), ) io.ReadAll(rr) = ("12750100000000001d000000d66726e6403e80c59c6e3c71add1b77eb8b7bdd2... <199-byte record>", ) BatchHeader: [seqNum=95506,count=29] +# close: 000007.log +# open: 000007-001.log (options: *vfs.sequentialReadsOption) r.NextRecord() = (rr, (000007-001.log: 0), 692 from previous files, ) io.ReadAll(rr) = ("2f7501000000000013000000130d8192532f4eade44dc7af7778d76fd3b28c90... <45991-byte record>", ) BatchHeader: [seqNum=95535,count=19] r.NextRecord() = (rr, (000007-001.log: 46013), 692 from previous files, ) io.ReadAll(rr) = ("427501000000000013000000bb15abda06f4a71f4aeae5998fcfdb5e4dad7789... <292-byte record>", ) BatchHeader: [seqNum=95554,count=19] +# close: 000007-001.log r.NextRecord() = (rr, (000007-001.log: 46316), 692 from previous files, EOF)