Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/pebble/record"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/vfs/atomicfs"
"github.com/cockroachdb/pebble/wal"
)

// checkpointOptions hold the optional parameters to construct checkpoint
Expand Down Expand Up @@ -207,6 +208,11 @@ func (d *DB) Checkpoint(
// before our call to List.
allLogicalLogs := d.mu.log.manager.List()

// Grab the visible sequence number. The checkpoint's view of the state of
// the database will be equivalent to an iterator that acquired this same
// visible sequence number.
visibleSeqNum := d.mu.versions.visibleSeqNum.Load()

// Release the manifest and DB.mu so we don't block other operations on the
// database.
//
Expand Down Expand Up @@ -391,24 +397,29 @@ func (d *DB) Checkpoint(
}
}

// Copy the WAL files. We copy rather than link because WAL file recycling
// will cause the WAL files to be reused which would invalidate the
// checkpoint. It's possible allLogicalLogs includes logs that are not
// relevant (beneath the version's MinUnflushedLogNum). These extra files
// are harmless. The earlier (wal.Manager).List call will not include
// obsolete logs that are sitting in the recycler or have already been
// passed off to the cleanup manager for deletion.
// Copy the WAL files. We copy rather than link for a few reasons:
// - WAL file recycling will cause the WAL files to be reused which
// would invalidate the checkpoint.
// - While we're performing our checkpoint, the latest WAL may still be
// receiving writes. We must exclude these writes, otherwise we'll
// violate the guarantee that the checkpoint is a consistent snapshot
// (we could copy a write that was committed after an ingest that was
// committed after we acquired our version).
//
// It's possible allLogicalLogs includes logs that are not relevant (beneath
// the version's MinUnflushedLogNum). These extra files are harmless. The
// earlier (wal.Manager).List call will not include obsolete logs that are
// sitting in the recycler or have already been passed off to the cleanup
// manager for deletion.
//
// TODO(jackson): It would be desirable to copy all recycling and obsolete
// WALs to aid corruption postmortem debugging should we need them.
for _, log := range allLogicalLogs {
for i := 0; i < log.NumSegments(); i++ {
srcFS, srcPath := log.SegmentLocation(i)
destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
if ckErr != nil {
return ckErr
}
ckErr = wal.Copy(fs, destDir, log, visibleSeqNum, record.LogWriterConfig{
WriteWALSyncOffsets: func() bool { return formatVers > FormatWALSyncChunks },
})
if ckErr != nil {
return ckErr
}
}

Expand Down
2 changes: 1 addition & 1 deletion replay/testdata/replay
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ tree
508 000007.sst
133 MANIFEST-000001
checkpoint/
25 000004.log
11 000004.log
480 000005.sst
85 MANIFEST-000001
2769 OPTIONS-000003
Expand Down
2 changes: 1 addition & 1 deletion replay/testdata/replay_ingest
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ tree
172 MANIFEST-000001
209 MANIFEST-000008
checkpoint/
25 000002.log
11 000002.log
678 000004.sst
661 000005.sst
172 MANIFEST-000001
Expand Down
2 changes: 1 addition & 1 deletion replay/testdata/replay_paced
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ tree
checkpoint/
758 000005.sst
454 000007.sst
39 000008.log
11 000008.log
454 000009.sst
157 MANIFEST-000010
2769 OPTIONS-000003
Expand Down
2 changes: 1 addition & 1 deletion replay/testdata/replay_val_sep
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ tree
101 000006.blob
766 000008.sst
97 000009.blob
64 000011.log
11 000011.log
660 000012.sst
187 MANIFEST-000013
2906 OPTIONS-000003
Expand Down
18 changes: 9 additions & 9 deletions testdata/checkpoint
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ sync-data: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint1
close: checkpoints/checkpoint1
open: db/000006.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint1/000006.log
open: db/000006.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint1/000006.log
close: checkpoints/checkpoint1/000006.log
close: db/000006.log
Expand Down Expand Up @@ -198,8 +198,8 @@ sync-data: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint2
close: checkpoints/checkpoint2
open: db/000006.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint2/000006.log
open: db/000006.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint2/000006.log
close: checkpoints/checkpoint2/000006.log
close: db/000006.log
Expand Down Expand Up @@ -238,8 +238,8 @@ sync-data: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint3
close: checkpoints/checkpoint3
open: db/000006.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint3/000006.log
open: db/000006.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint3/000006.log
close: checkpoints/checkpoint3/000006.log
close: db/000006.log
Expand Down Expand Up @@ -563,8 +563,8 @@ sync-data: checkpoints/checkpoint4/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint4/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint4
close: checkpoints/checkpoint4
open: db/000008.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint4/000008.log
open: db/000008.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint4/000008.log
close: checkpoints/checkpoint4/000008.log
close: db/000008.log
Expand Down Expand Up @@ -672,8 +672,8 @@ sync-data: checkpoints/checkpoint5/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint5/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint5
close: checkpoints/checkpoint5
open: db/000008.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint5/000008.log
open: db/000008.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint5/000008.log
close: checkpoints/checkpoint5/000008.log
close: db/000008.log
Expand Down Expand Up @@ -773,8 +773,8 @@ sync-data: checkpoints/checkpoint6/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint6/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint6
close: checkpoints/checkpoint6
open: db/000008.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint6/000008.log
open: db/000008.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint6/000008.log
close: checkpoints/checkpoint6/000008.log
close: db/000008.log
Expand Down Expand Up @@ -1008,11 +1008,11 @@ sync-data: checkpoints/checkpoint8/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint8/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint8
close: checkpoints/checkpoint8
open: valsepdb/000004.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint8/000004.log
open: valsepdb/000004.log (options: *vfs.sequentialReadsOption)
close: valsepdb/000004.log
sync-data: checkpoints/checkpoint8/000004.log
close: checkpoints/checkpoint8/000004.log
close: valsepdb/000004.log
sync: checkpoints/checkpoint8
close: checkpoints/checkpoint8

Expand Down Expand Up @@ -1122,8 +1122,8 @@ sync-data: checkpoints/checkpoint9/marker.manifest.000001.MANIFEST-000001
close: checkpoints/checkpoint9/marker.manifest.000001.MANIFEST-000001
sync: checkpoints/checkpoint9
close: checkpoints/checkpoint9
open: valsepdb/000007.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint9/000007.log
open: valsepdb/000007.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint9/000007.log
close: checkpoints/checkpoint9/000007.log
close: valsepdb/000007.log
Expand Down
6 changes: 3 additions & 3 deletions testdata/checkpoint_shared
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ sync-data: checkpoints/checkpoint1/marker.remote-obj-catalog.000001.REMOTE-OBJ-C
close: checkpoints/checkpoint1/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001
sync: checkpoints/checkpoint1
close: checkpoints/checkpoint1
open: db/000006.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint1/000006.log
open: db/000006.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint1/000006.log
close: checkpoints/checkpoint1/000006.log
close: db/000006.log
Expand Down Expand Up @@ -205,8 +205,8 @@ sync-data: checkpoints/checkpoint2/marker.remote-obj-catalog.000001.REMOTE-OBJ-C
close: checkpoints/checkpoint2/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001
sync: checkpoints/checkpoint2
close: checkpoints/checkpoint2
open: db/000006.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint2/000006.log
open: db/000006.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint2/000006.log
close: checkpoints/checkpoint2/000006.log
close: db/000006.log
Expand Down Expand Up @@ -254,8 +254,8 @@ sync-data: checkpoints/checkpoint3/marker.remote-obj-catalog.000001.REMOTE-OBJ-C
close: checkpoints/checkpoint3/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001
sync: checkpoints/checkpoint3
close: checkpoints/checkpoint3
open: db/000006.log (options: *vfs.sequentialReadsOption)
create: checkpoints/checkpoint3/000006.log
open: db/000006.log (options: *vfs.sequentialReadsOption)
sync-data: checkpoints/checkpoint3/000006.log
close: checkpoints/checkpoint3/000006.log
close: db/000006.log
Expand Down
4 changes: 2 additions & 2 deletions testdata/event_listener
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,11 @@ sync-data: checkpoint/marker.manifest.000001.MANIFEST-000023
close: checkpoint/marker.manifest.000001.MANIFEST-000023
sync: checkpoint
close: checkpoint
open: wal/000021.log (options: *vfs.sequentialReadsOption)
create: checkpoint/000021.log
open: wal/000021.log (options: *vfs.sequentialReadsOption)
close: wal/000021.log
sync-data: checkpoint/000021.log
close: checkpoint/000021.log
close: wal/000021.log
sync: checkpoint
close: checkpoint

Expand Down
81 changes: 75 additions & 6 deletions wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,11 +247,16 @@ var _ Reader = (*virtualWALReader)(nil)
// are no more records. The reader returned becomes stale after the next
// NextRecord call, and should no longer be used.
func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
rec, _, off, err := r.nextRecord()
return rec, off, err
}

func (r *virtualWALReader) nextRecord() (io.Reader, batchrepr.Header, Offset, error) {
// On the first call, we need to open the first file.
if r.currIndex < 0 {
err := r.nextFile()
if err != nil {
return nil, Offset{}, err
return nil, batchrepr.Header{}, Offset{}, err
}
}

Expand All @@ -264,7 +269,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
// This file is exhausted; continue to the next.
err := r.nextFile()
if err != nil {
return nil, r.off, err
return nil, batchrepr.Header{}, r.off, err
}
continue
}
Expand All @@ -290,11 +295,11 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
// in-flight write at the time of process exit/crash. See #453.
if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 {
if err := r.nextFile(); err != nil {
return nil, r.off, err
return nil, batchrepr.Header{}, r.off, err
}
continue
} else if err != nil {
return nil, r.off, err
return nil, batchrepr.Header{}, r.off, err
}

// We may observe repeat records between the physical files that make up
Expand All @@ -314,7 +319,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
// corruption. We could return the record to the caller, allowing
// the caller to interpret it as corruption, but it seems safer to
// be explicit and surface the corruption error here.
return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
return nil, h, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex))
}

Expand All @@ -337,7 +342,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
continue
}
r.lastSeqNum = h.SeqNum
return &r.recordBuf, r.off, nil
return &r.recordBuf, h, r.off, nil
}
}

Expand Down Expand Up @@ -376,3 +381,67 @@ func (r *virtualWALReader) nextFile() error {
r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num))
return nil
}

// Copy copies the contents of the provided LogicalLog to a new WAL file on fs
// within dstDir. The copy will be a logical copy and will not include the
// source WAL's structure if split across multiple physical segment files. Copy
// will only copy the prefix of the WAL up until visibleSeqNum (exclusive).
//
// Copy does NOT sync the destination directory, and the caller must explicitly
// sync it if they require the new WAL file to be durably linked.
//
// The WAL file identified by ll may be written concurrently.
func Copy(
fs vfs.FS, dstDir string, ll LogicalLog, visibleSeqNum base.SeqNum, cfg record.LogWriterConfig,
) (err error) {
dstPath := fs.PathJoin(dstDir, makeLogFilename(ll.Num, 0))
var dstFile vfs.File
dstFile, err = fs.Create(dstPath, vfs.WriteCategoryUnspecified)
if err != nil {
return err
}
w := record.NewLogWriter(dstFile, base.DiskFileNum(ll.Num), cfg)

r := newVirtualWALReader(ll)
defer func() { err = errors.CombineErrors(err, r.Close()) }()
for {
rec, h, _, err := r.nextRecord()
if errors.Is(err, io.EOF) || errors.Is(err, record.ErrUnexpectedEOF) {
break
}
if err != nil {
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
}
// The visible sequence number indicates that all keys written with
// sequence numbers strictly less than visibleSeqNum are visible. Any
// batches with sequence numbers greater than or equal to visibleSeqNum
// are not visible and indicate we can stop.
if h.SeqNum >= visibleSeqNum {
break
}
// A batch assigns sequence numbers beginning at the header's SeqNum,
// increasing for successive internal keys. We expect that if the
// batch's first sequence number is visible, all the keys should be
// visible, because the visible sequence number is ratcheted up
// atomically when a batch is committed. If this doeesn't hold, error
// out.
batchLargestSeqNum := h.SeqNum + base.SeqNum(h.Count-1)
if batchLargestSeqNum >= visibleSeqNum {
return errors.AssertionFailedf("batch's sequence numbers %s-%s unexpectedly straddle the visible sequence number %s",
h.SeqNum, batchLargestSeqNum, visibleSeqNum)
}
b, err := io.ReadAll(rec)
if err != nil {
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
}
if _, err := w.WriteRecord(b); err != nil {
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
}
}
// Close on the LogWriter will sync the file (but not the containing
// directory).
if err := w.Close(); err != nil {
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
}
return err
}
Loading