Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -1865,6 +1865,25 @@ func (h *Head) mmapHeadChunks() {
h.metrics.mmapChunksTotal.Add(float64(count))
}

func (h *Head) FsyncWLSegments() error {
if h.wal == nil {
return errors.New("wal not initialized")
}
err := h.wal.FsyncSegmentsUntilCurrent()
if err != nil {
return fmt.Errorf("could not fsync wal segments: %w", err)
}
if h.wbl == nil {
// WBL is not initialized, which is expected when OOO is disabled
return nil
}
err = h.wbl.FsyncSegmentsUntilCurrent()
if err != nil {
return fmt.Errorf("could not fsync wbl segments: %w", err)
}
return nil
}

// seriesHashmap lets TSDB find a memSeries by its label set, via a 64-bit hash.
// There is one map for the common case where the hash value is unique, and a
// second map for the case that two series have the same hash value.
Expand Down
90 changes: 90 additions & 0 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7414,6 +7414,96 @@ func testHeadAppendHistogramAndCommitConcurrency(t *testing.T, appendFn func(sto
wg.Wait()
}

func TestFsyncWLSegments(t *testing.T) {
t.Run("sync fails when WAL is not initialized", func(t *testing.T) {
h, err := NewHead(nil, nil, nil, nil, DefaultHeadOptions(), NewHeadStats())
require.NoError(t, err)
defer h.Close()

err = h.FsyncWLSegments()
require.EqualError(t, err, "wal not initialized")
})

t.Run("sync succeeds with WAL only", func(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32*1024, compression.None)
require.NoError(t, err)

h, err := NewHead(nil, nil, wal, nil, DefaultHeadOptions(), NewHeadStats())
require.NoError(t, err)
defer h.Close()

app := h.Appender(context.Background())
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 1, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())

err = h.FsyncWLSegments()
require.NoError(t, err)
})

t.Run("sync succeeds with both WAL and WBL", func(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32*1024, compression.None)
require.NoError(t, err)
wbl, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wbl"), 32*1024, compression.None)
require.NoError(t, err)

opts := DefaultHeadOptions()
opts.OutOfOrderTimeWindow.Store(1 * time.Hour.Milliseconds())
h, err := NewHead(nil, nil, wal, wbl, opts, NewHeadStats())
require.NoError(t, err)
defer h.Close()

app := h.Appender(context.Background())
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 1, 1)
require.NoError(t, err)
require.NoError(t, app.Commit())

app = h.Appender(context.Background())
_, err = app.Append(0, labels.FromStrings("foo", "bar"), 0, 0)
require.NoError(t, err)
require.NoError(t, app.Commit())

err = h.FsyncWLSegments()
require.NoError(t, err)
})

t.Run("sync fails when WAL sync fails", func(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32*1024, compression.None)
require.NoError(t, err)
// Close the WAL to make sync fail
require.NoError(t, wal.Close())

h, err := NewHead(nil, nil, wal, nil, DefaultHeadOptions(), NewHeadStats())
require.NoError(t, err)
defer h.Close()

err = h.FsyncWLSegments()
require.EqualError(t, err, "could not fsync wal segments: unable to fsync segments: write log is closed")
})

t.Run("sync fails when WBL sync fails", func(t *testing.T) {
dir := t.TempDir()
wal, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wal"), 32*1024, compression.None)
require.NoError(t, err)
wbl, err := wlog.NewSize(nil, nil, filepath.Join(dir, "wbl"), 32*1024, compression.None)
require.NoError(t, err)
// Close the WBL to make sync fail
require.NoError(t, wbl.Close())

opts := DefaultHeadOptions()
opts.OutOfOrderTimeWindow.Store(1 * time.Hour.Milliseconds())
h, err := NewHead(nil, nil, wal, wbl, opts, NewHeadStats())
require.NoError(t, err)
defer h.Close()

err = h.FsyncWLSegments()
require.EqualError(t, err, "could not fsync wbl segments: unable to fsync segments: write log is closed")
})
}

func TestHead_NumStaleSeries(t *testing.T) {
head, _ := newTestHead(t, 1000, compression.None, false)
t.Cleanup(func() {
Expand Down
33 changes: 33 additions & 0 deletions tsdb/wlog/wlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,3 +1046,36 @@ func (r *segmentBufReader) Read(b []byte) (n int, err error) {
func (w *WL) Size() (int64, error) {
return fileutil.DirSize(w.Dir())
}

// FsyncSegmentsUntilCurrent ensures all segments up to and including the current segment are fsynced.
// There may be more entries appended to the log after fsyncing completes and before FsyncSegmentsUntilCurrent returns. Those entries may not be fsynced.
func (w *WL) FsyncSegmentsUntilCurrent() error {
w.mtx.Lock()

if w.closed {
w.mtx.Unlock()
return errors.New("unable to fsync segments: write log is closed")
}

done := make(chan struct{})
// All previous segments before w.segment should either have been fsynced and closed or still in the actorc channel.
// The function we are adding below will only execute when all previous segments have been fsynced.
if w.segment != nil {
currSegment := w.segment
w.actorc <- func() {
if err := w.fsync(currSegment); err != nil {
w.logger.Error("unable to fsync current segment", "err", err)
}
close(done)
}
} else {
w.actorc <- func() {
close(done)
}
}

w.mtx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure about where we acquire and release the lock.

The potential problems i can think of right now are

  1. there's a race between taking a copy of w.segment, unlocking the mutex and running the fsync. it's possible another goroutine replaced the w.segment; so at that point we no longer have the current segment fsync'd. it may not be a huge problem since, there's still the same race right before returning from FsyncSegmentsUntilCurrent - we can get a new segment.
  2. We end up with a deadlock where pushing to the channel blocks because the currently executed function from actorc is trying to acquire the lock. This currently doesn't happen, but i don't see

i don't think it's possible to solve both. From reading the only other place which uses actorc and the places which do operations on the WL while holding the lock, I think that 2. is assumed to never happen. So I think we should release the lock only after getting done closed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something i forgot - if you end up taking a copy of w.segment, then you should do that while holding a lock inside the actor func instead of before creating the actor closure - this will help avoid races


<-done
return nil
}
32 changes: 32 additions & 0 deletions tsdb/wlog/wlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,3 +574,35 @@ func TestUnregisterMetrics(t *testing.T) {
require.NoError(t, wl.Close())
}
}

func TestSyncSegmentsUntilCurrent(t *testing.T) {
t.Run("sync succeeds with active segments", func(t *testing.T) {
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize, compression.None)
require.NoError(t, err)
defer w.Close()

// Write large records to create multiple segments
record := make([]byte, pageSize/2)
for i := 0; i < 10; i++ {
require.NoError(t, w.Log(record))
}

require.NoError(t, w.FsyncSegmentsUntilCurrent())
})

t.Run("sync fails when WAL is closed", func(t *testing.T) {
dir := t.TempDir()
w, err := NewSize(nil, nil, dir, pageSize, compression.None)
require.NoError(t, err)

record := make([]byte, pageSize/2)
for i := 0; i < 10; i++ {
require.NoError(t, w.Log(record))
}

require.NoError(t, w.Close())
err = w.FsyncSegmentsUntilCurrent()
require.EqualError(t, err, "unable to fsync segments: write log is closed")
})
}
Loading