Skip to content

Commit 084b89b

Browse files
committed
db: ensure checkpoints copy a consistent snapshot
Adapt Checkpoint to copy a consistent snapshot of the database. Previously Checkpoint acquired a Version, ensuring a consistent snapshot of the sstables within the version, but copied the WALs arbitrarily. This allowed Checkpoint to race with inflight WAL writes and allowed for a few sources of inconsistency in checkpoints: - The checkpointing goroutine racing with the LogWriter could copy an old block from a WAL file's recycled history, followed by a new block containing recent writes. If a chunk in the new block indicated that the old block should've been durably synced, the WAL was considered corrupt on replay when the checkpoint was opened. - The checkpointing goroutine could copy WAL records containing batches that were committed after version edits (eg, ingests, excises) that were committed after the checkpointed Version. This would violate the checkpoint's consistency. Now Checkpoint reads the current visible sequence number and only copies the prefix of the WAL that is visible at the sequence number. This ensures that a Checkpoint's snapshot isolation semantics mirror that of an Iterator.
1 parent e52a9b1 commit 084b89b

File tree

11 files changed

+380
-42
lines changed

11 files changed

+380
-42
lines changed

checkpoint.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/pebble/record"
1717
"github.com/cockroachdb/pebble/vfs"
1818
"github.com/cockroachdb/pebble/vfs/atomicfs"
19+
"github.com/cockroachdb/pebble/wal"
1920
)
2021

2122
// checkpointOptions hold the optional parameters to construct checkpoint
@@ -207,6 +208,11 @@ func (d *DB) Checkpoint(
207208
// before our call to List.
208209
allLogicalLogs := d.mu.log.manager.List()
209210

211+
// Grab the visible sequence number. The checkpoint's view of the state of
212+
// the database will be equivalent to an iterator that acquired this same
213+
// visible sequence number.
214+
visibleSeqNum := d.mu.versions.visibleSeqNum.Load()
215+
210216
// Release the manifest and DB.mu so we don't block other operations on the
211217
// database.
212218
//
@@ -391,24 +397,29 @@ func (d *DB) Checkpoint(
391397
}
392398
}
393399

394-
// Copy the WAL files. We copy rather than link because WAL file recycling
395-
// will cause the WAL files to be reused which would invalidate the
396-
// checkpoint. It's possible allLogicalLogs includes logs that are not
397-
// relevant (beneath the version's MinUnflushedLogNum). These extra files
398-
// are harmless. The earlier (wal.Manager).List call will not include
399-
// obsolete logs that are sitting in the recycler or have already been
400-
// passed off to the cleanup manager for deletion.
400+
// Copy the WAL files. We copy rather than link for a few reasons:
401+
// - WAL file recycling will cause the WAL files to be reused which
402+
// would invalidate the checkpoint.
403+
// - While we're performing our checkpoint, the latest WAL may still be
404+
// receiving writes. We must exclude these writes, otherwise we'll
405+
// violate the guarantee that the checkpoint is a consistent snapshot
406+
// (we could copy a write that was committed after an ingest that was
407+
// committed after we acquired our version).
408+
//
409+
// It's possible allLogicalLogs includes logs that are not relevant (beneath
410+
// the version's MinUnflushedLogNum). These extra files are harmless. The
411+
// earlier (wal.Manager).List call will not include obsolete logs that are
412+
// sitting in the recycler or have already been passed off to the cleanup
413+
// manager for deletion.
401414
//
402415
// TODO(jackson): It would be desirable to copy all recycling and obsolete
403416
// WALs to aid corruption postmortem debugging should we need them.
404417
for _, log := range allLogicalLogs {
405-
for i := 0; i < log.NumSegments(); i++ {
406-
srcFS, srcPath := log.SegmentLocation(i)
407-
destPath := fs.PathJoin(destDir, srcFS.PathBase(srcPath))
408-
ckErr = vfs.CopyAcrossFS(srcFS, srcPath, fs, destPath)
409-
if ckErr != nil {
410-
return ckErr
411-
}
418+
ckErr = wal.Copy(fs, destDir, log, visibleSeqNum, record.LogWriterConfig{
419+
WriteWALSyncOffsets: func() bool { return formatVers > FormatWALSyncChunks },
420+
})
421+
if ckErr != nil {
422+
return ckErr
412423
}
413424
}
414425

replay/testdata/replay

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ tree
1818
508 000007.sst
1919
133 MANIFEST-000001
2020
checkpoint/
21-
25 000004.log
21+
11 000004.log
2222
480 000005.sst
2323
85 MANIFEST-000001
2424
2769 OPTIONS-000003

replay/testdata/replay_ingest

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ tree
2121
172 MANIFEST-000001
2222
209 MANIFEST-000008
2323
checkpoint/
24-
25 000002.log
24+
11 000002.log
2525
678 000004.sst
2626
661 000005.sst
2727
172 MANIFEST-000001

replay/testdata/replay_paced

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ tree
2323
checkpoint/
2424
758 000005.sst
2525
454 000007.sst
26-
39 000008.log
26+
11 000008.log
2727
454 000009.sst
2828
157 MANIFEST-000010
2929
2769 OPTIONS-000003

replay/testdata/replay_val_sep

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ tree
2929
101 000006.blob
3030
766 000008.sst
3131
97 000009.blob
32-
64 000011.log
32+
11 000011.log
3333
660 000012.sst
3434
187 MANIFEST-000013
3535
2906 OPTIONS-000003

testdata/checkpoint

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ sync-data: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001
155155
close: checkpoints/checkpoint1/marker.manifest.000001.MANIFEST-000001
156156
sync: checkpoints/checkpoint1
157157
close: checkpoints/checkpoint1
158-
open: db/000006.log (options: *vfs.sequentialReadsOption)
159158
create: checkpoints/checkpoint1/000006.log
159+
open: db/000006.log (options: *vfs.sequentialReadsOption)
160160
sync-data: checkpoints/checkpoint1/000006.log
161161
close: checkpoints/checkpoint1/000006.log
162162
close: db/000006.log
@@ -198,8 +198,8 @@ sync-data: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001
198198
close: checkpoints/checkpoint2/marker.manifest.000001.MANIFEST-000001
199199
sync: checkpoints/checkpoint2
200200
close: checkpoints/checkpoint2
201-
open: db/000006.log (options: *vfs.sequentialReadsOption)
202201
create: checkpoints/checkpoint2/000006.log
202+
open: db/000006.log (options: *vfs.sequentialReadsOption)
203203
sync-data: checkpoints/checkpoint2/000006.log
204204
close: checkpoints/checkpoint2/000006.log
205205
close: db/000006.log
@@ -238,8 +238,8 @@ sync-data: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001
238238
close: checkpoints/checkpoint3/marker.manifest.000001.MANIFEST-000001
239239
sync: checkpoints/checkpoint3
240240
close: checkpoints/checkpoint3
241-
open: db/000006.log (options: *vfs.sequentialReadsOption)
242241
create: checkpoints/checkpoint3/000006.log
242+
open: db/000006.log (options: *vfs.sequentialReadsOption)
243243
sync-data: checkpoints/checkpoint3/000006.log
244244
close: checkpoints/checkpoint3/000006.log
245245
close: db/000006.log
@@ -563,8 +563,8 @@ sync-data: checkpoints/checkpoint4/marker.manifest.000001.MANIFEST-000001
563563
close: checkpoints/checkpoint4/marker.manifest.000001.MANIFEST-000001
564564
sync: checkpoints/checkpoint4
565565
close: checkpoints/checkpoint4
566-
open: db/000008.log (options: *vfs.sequentialReadsOption)
567566
create: checkpoints/checkpoint4/000008.log
567+
open: db/000008.log (options: *vfs.sequentialReadsOption)
568568
sync-data: checkpoints/checkpoint4/000008.log
569569
close: checkpoints/checkpoint4/000008.log
570570
close: db/000008.log
@@ -672,8 +672,8 @@ sync-data: checkpoints/checkpoint5/marker.manifest.000001.MANIFEST-000001
672672
close: checkpoints/checkpoint5/marker.manifest.000001.MANIFEST-000001
673673
sync: checkpoints/checkpoint5
674674
close: checkpoints/checkpoint5
675-
open: db/000008.log (options: *vfs.sequentialReadsOption)
676675
create: checkpoints/checkpoint5/000008.log
676+
open: db/000008.log (options: *vfs.sequentialReadsOption)
677677
sync-data: checkpoints/checkpoint5/000008.log
678678
close: checkpoints/checkpoint5/000008.log
679679
close: db/000008.log
@@ -773,8 +773,8 @@ sync-data: checkpoints/checkpoint6/marker.manifest.000001.MANIFEST-000001
773773
close: checkpoints/checkpoint6/marker.manifest.000001.MANIFEST-000001
774774
sync: checkpoints/checkpoint6
775775
close: checkpoints/checkpoint6
776-
open: db/000008.log (options: *vfs.sequentialReadsOption)
777776
create: checkpoints/checkpoint6/000008.log
777+
open: db/000008.log (options: *vfs.sequentialReadsOption)
778778
sync-data: checkpoints/checkpoint6/000008.log
779779
close: checkpoints/checkpoint6/000008.log
780780
close: db/000008.log
@@ -1008,11 +1008,11 @@ sync-data: checkpoints/checkpoint8/marker.manifest.000001.MANIFEST-000001
10081008
close: checkpoints/checkpoint8/marker.manifest.000001.MANIFEST-000001
10091009
sync: checkpoints/checkpoint8
10101010
close: checkpoints/checkpoint8
1011-
open: valsepdb/000004.log (options: *vfs.sequentialReadsOption)
10121011
create: checkpoints/checkpoint8/000004.log
1012+
open: valsepdb/000004.log (options: *vfs.sequentialReadsOption)
1013+
close: valsepdb/000004.log
10131014
sync-data: checkpoints/checkpoint8/000004.log
10141015
close: checkpoints/checkpoint8/000004.log
1015-
close: valsepdb/000004.log
10161016
sync: checkpoints/checkpoint8
10171017
close: checkpoints/checkpoint8
10181018

@@ -1122,8 +1122,8 @@ sync-data: checkpoints/checkpoint9/marker.manifest.000001.MANIFEST-000001
11221122
close: checkpoints/checkpoint9/marker.manifest.000001.MANIFEST-000001
11231123
sync: checkpoints/checkpoint9
11241124
close: checkpoints/checkpoint9
1125-
open: valsepdb/000007.log (options: *vfs.sequentialReadsOption)
11261125
create: checkpoints/checkpoint9/000007.log
1126+
open: valsepdb/000007.log (options: *vfs.sequentialReadsOption)
11271127
sync-data: checkpoints/checkpoint9/000007.log
11281128
close: checkpoints/checkpoint9/000007.log
11291129
close: valsepdb/000007.log

testdata/checkpoint_shared

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ sync-data: checkpoints/checkpoint1/marker.remote-obj-catalog.000001.REMOTE-OBJ-C
152152
close: checkpoints/checkpoint1/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001
153153
sync: checkpoints/checkpoint1
154154
close: checkpoints/checkpoint1
155-
open: db/000006.log (options: *vfs.sequentialReadsOption)
156155
create: checkpoints/checkpoint1/000006.log
156+
open: db/000006.log (options: *vfs.sequentialReadsOption)
157157
sync-data: checkpoints/checkpoint1/000006.log
158158
close: checkpoints/checkpoint1/000006.log
159159
close: db/000006.log
@@ -205,8 +205,8 @@ sync-data: checkpoints/checkpoint2/marker.remote-obj-catalog.000001.REMOTE-OBJ-C
205205
close: checkpoints/checkpoint2/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001
206206
sync: checkpoints/checkpoint2
207207
close: checkpoints/checkpoint2
208-
open: db/000006.log (options: *vfs.sequentialReadsOption)
209208
create: checkpoints/checkpoint2/000006.log
209+
open: db/000006.log (options: *vfs.sequentialReadsOption)
210210
sync-data: checkpoints/checkpoint2/000006.log
211211
close: checkpoints/checkpoint2/000006.log
212212
close: db/000006.log
@@ -254,8 +254,8 @@ sync-data: checkpoints/checkpoint3/marker.remote-obj-catalog.000001.REMOTE-OBJ-C
254254
close: checkpoints/checkpoint3/marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001
255255
sync: checkpoints/checkpoint3
256256
close: checkpoints/checkpoint3
257-
open: db/000006.log (options: *vfs.sequentialReadsOption)
258257
create: checkpoints/checkpoint3/000006.log
258+
open: db/000006.log (options: *vfs.sequentialReadsOption)
259259
sync-data: checkpoints/checkpoint3/000006.log
260260
close: checkpoints/checkpoint3/000006.log
261261
close: db/000006.log

testdata/event_listener

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -500,11 +500,11 @@ sync-data: checkpoint/marker.manifest.000001.MANIFEST-000023
500500
close: checkpoint/marker.manifest.000001.MANIFEST-000023
501501
sync: checkpoint
502502
close: checkpoint
503-
open: wal/000021.log (options: *vfs.sequentialReadsOption)
504503
create: checkpoint/000021.log
504+
open: wal/000021.log (options: *vfs.sequentialReadsOption)
505+
close: wal/000021.log
505506
sync-data: checkpoint/000021.log
506507
close: checkpoint/000021.log
507-
close: wal/000021.log
508508
sync: checkpoint
509509
close: checkpoint
510510

wal/reader.go

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -247,11 +247,16 @@ var _ Reader = (*virtualWALReader)(nil)
247247
// are no more records. The reader returned becomes stale after the next
248248
// NextRecord call, and should no longer be used.
249249
func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
250+
rec, _, off, err := r.nextRecord()
251+
return rec, off, err
252+
}
253+
254+
func (r *virtualWALReader) nextRecord() (io.Reader, batchrepr.Header, Offset, error) {
250255
// On the first call, we need to open the first file.
251256
if r.currIndex < 0 {
252257
err := r.nextFile()
253258
if err != nil {
254-
return nil, Offset{}, err
259+
return nil, batchrepr.Header{}, Offset{}, err
255260
}
256261
}
257262

@@ -264,7 +269,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
264269
// This file is exhausted; continue to the next.
265270
err := r.nextFile()
266271
if err != nil {
267-
return nil, r.off, err
272+
return nil, batchrepr.Header{}, r.off, err
268273
}
269274
continue
270275
}
@@ -290,11 +295,11 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
290295
// in-flight write at the time of process exit/crash. See #453.
291296
if record.IsInvalidRecord(err) && r.currIndex < len(r.segments)-1 {
292297
if err := r.nextFile(); err != nil {
293-
return nil, r.off, err
298+
return nil, batchrepr.Header{}, r.off, err
294299
}
295300
continue
296301
} else if err != nil {
297-
return nil, r.off, err
302+
return nil, batchrepr.Header{}, r.off, err
298303
}
299304

300305
// We may observe repeat records between the physical files that make up
@@ -314,7 +319,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
314319
// corruption. We could return the record to the caller, allowing
315320
// the caller to interpret it as corruption, but it seems safer to
316321
// be explicit and surface the corruption error here.
317-
return nil, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
322+
return nil, h, r.off, base.CorruptionErrorf("pebble: corrupt log file logNum=%d, logNameIndex=%s: invalid batch",
318323
r.Num, errors.Safe(r.segments[r.currIndex].logNameIndex))
319324
}
320325

@@ -337,7 +342,7 @@ func (r *virtualWALReader) NextRecord() (io.Reader, Offset, error) {
337342
continue
338343
}
339344
r.lastSeqNum = h.SeqNum
340-
return &r.recordBuf, r.off, nil
345+
return &r.recordBuf, h, r.off, nil
341346
}
342347
}
343348

@@ -376,3 +381,52 @@ func (r *virtualWALReader) nextFile() error {
376381
r.currReader = record.NewReader(r.currFile, base.DiskFileNum(r.Num))
377382
return nil
378383
}
384+
385+
// Copy copies the contents of the provided LogicalLog to a new WAL file on fs
386+
// within dstDir. The copy will be a logical copy and will not include the
387+
// source WAL's structure if split across multiple physical segment files. Copy
388+
// will only copy the prefix of the WAL up until visibleSeqNum (exclusive).
389+
//
390+
// Copy does NOT sync the destination directory, and the caller must explicitly
391+
// sync it if they require the new WAL file to be durably linked.
392+
func Copy(
393+
fs vfs.FS, dstDir string, ll LogicalLog, visibleSeqNum base.SeqNum, cfg record.LogWriterConfig,
394+
) (err error) {
395+
dstPath := fs.PathJoin(dstDir, makeLogFilename(ll.Num, 0))
396+
var dstFile vfs.File
397+
dstFile, err = fs.Create(dstPath, vfs.WriteCategoryUnspecified)
398+
if err != nil {
399+
return err
400+
}
401+
w := record.NewLogWriter(dstFile, base.DiskFileNum(ll.Num), cfg)
402+
403+
r := newVirtualWALReader(ll)
404+
defer func() { err = errors.CombineErrors(err, r.Close()) }()
405+
for {
406+
rec, h, _, err := r.nextRecord()
407+
if errors.Is(err, io.EOF) || errors.Is(err, record.ErrUnexpectedEOF) {
408+
break
409+
}
410+
if err != nil {
411+
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
412+
}
413+
// A batch assigns sequence numbers beginning at the header's SeqNum,
414+
// increasing for successive internal keys. We only copy the batch if
415+
// the entirety of the batch's keys are visible.
416+
batchLargestSeqNum := h.SeqNum + base.SeqNum(h.Count-1)
417+
if !base.Visible(batchLargestSeqNum, visibleSeqNum, base.SeqNumMax) {
418+
break
419+
}
420+
b, err := io.ReadAll(rec)
421+
if err != nil {
422+
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
423+
}
424+
if _, err := w.WriteRecord(b); err != nil {
425+
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
426+
}
427+
}
428+
if err := w.Close(); err != nil {
429+
return errors.Wrapf(err, "copying WAL file %s", ll.Num)
430+
}
431+
return err
432+
}

0 commit comments

Comments
 (0)