Skip to content

Commit 4aa6d56

Browse files
authored
Run OOO compaction after restart if there is OOO data from WBL (#320)
Signed-off-by: Ganesh Vernekar <[email protected]> Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 54196bb commit 4aa6d56

File tree

2 files changed

+89
-0
lines changed

2 files changed

+89
-0
lines changed

tsdb/db.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,11 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
859859
}
860860
}
861861

862+
if db.head.MinOOOTime() != int64(math.MaxInt64) {
863+
// Some OOO data was replayed from the disk that needs compaction and cleanup.
864+
db.oooWasEnabled.Store(true)
865+
}
866+
862867
go db.run()
863868

864869
return db, nil

tsdb/db_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5435,3 +5435,87 @@ func TestPanicOnApplyConfig(t *testing.T) {
54355435
})
54365436
require.NoError(t, err)
54375437
}
5438+
5439+
func TestDiskFillingUpAfterDisablingOOO(t *testing.T) {
5440+
dir := t.TempDir()
5441+
5442+
opts := DefaultOptions()
5443+
opts.OutOfOrderTimeWindow = 60 * time.Minute.Milliseconds()
5444+
opts.AllowOverlappingQueries = true
5445+
5446+
db, err := Open(dir, nil, nil, opts, nil)
5447+
require.NoError(t, err)
5448+
db.DisableCompactions()
5449+
t.Cleanup(func() {
5450+
require.NoError(t, db.Close())
5451+
})
5452+
5453+
series1 := labels.FromStrings("foo", "bar1")
5454+
var allSamples []tsdbutil.Sample
5455+
addSamples := func(fromMins, toMins int64) {
5456+
app := db.Appender(context.Background())
5457+
for min := fromMins; min <= toMins; min++ {
5458+
ts := min * time.Minute.Milliseconds()
5459+
_, err := app.Append(0, series1, ts, float64(ts))
5460+
require.NoError(t, err)
5461+
allSamples = append(allSamples, sample{t: ts, v: float64(ts)})
5462+
}
5463+
require.NoError(t, app.Commit())
5464+
}
5465+
5466+
// In-order samples.
5467+
addSamples(290, 300)
5468+
// OOO samples.
5469+
addSamples(250, 299)
5470+
5471+
// Restart DB with OOO disabled.
5472+
require.NoError(t, db.Close())
5473+
opts.OutOfOrderTimeWindow = 0
5474+
db, err = Open(db.dir, nil, prometheus.NewRegistry(), opts, nil)
5475+
require.NoError(t, err)
5476+
db.DisableCompactions()
5477+
5478+
ms := db.head.series.getByHash(series1.Hash(), series1)
5479+
require.Greater(t, len(ms.oooMmappedChunks), 0, "OOO mmap chunk was not replayed")
5480+
5481+
checkMmapFileContents := func(contains, notContains []string) {
5482+
mmapDir := mmappedChunksDir(db.head.opts.ChunkDirRoot)
5483+
files, err := os.ReadDir(mmapDir)
5484+
require.NoError(t, err)
5485+
5486+
fnames := make([]string, 0, len(files))
5487+
for _, f := range files {
5488+
fnames = append(fnames, f.Name())
5489+
}
5490+
5491+
for _, f := range contains {
5492+
require.Contains(t, fnames, f)
5493+
}
5494+
for _, f := range notContains {
5495+
require.NotContains(t, fnames, f)
5496+
}
5497+
}
5498+
5499+
// Add in-order samples until ready for compaction..
5500+
addSamples(301, 500)
5501+
5502+
// Check that m-map files gets deleted properly after compactions.
5503+
5504+
checkMmapFileContents([]string{"000001", "000002"}, nil)
5505+
require.NoError(t, db.Compact())
5506+
checkMmapFileContents([]string{"000002"}, []string{"000001"})
5507+
require.Equal(t, 0, len(ms.oooMmappedChunks), "OOO mmap chunk was not compacted")
5508+
5509+
addSamples(501, 650)
5510+
checkMmapFileContents([]string{"000002", "000003"}, []string{"000001"})
5511+
require.NoError(t, db.Compact())
5512+
checkMmapFileContents(nil, []string{"000001", "000002", "000003"})
5513+
5514+
// Verify that WBL is empty.
5515+
files, err := os.ReadDir(db.head.wbl.Dir())
5516+
require.NoError(t, err)
5517+
require.Len(t, files, 1) // Last empty file after compaction.
5518+
finfo, err := files[0].Info()
5519+
require.NoError(t, err)
5520+
require.Equal(t, int64(0), finfo.Size())
5521+
}

0 commit comments

Comments
 (0)