Skip to content

Commit 798852e

Browse files
authored
commitlog: Improve error context (#3506)
The commitlog creates new segments atomically, returning EEXIST if the segment already exists. This is to break a retry loop in case the filesystem becomes unwritable. This error did not contain any context about what does not exist, so this patch adds some. Also, an unhandled edge case has been discovered: When opening an existing log, the commitlog will try to resume the last segment for writing. If it finds a corrupt commit in that segment, it won't resume, but instead create a new segment at the corrupt commit's offset + 1. However, if the first commit in the last segment is corrupted, the offset will be that of the last segment -- trying to start a new segment will thus fail with EEXIST. Without additional recovery mechanisms, it is not obvious what to do in this case: the segment could contain valid data after the initial commit, so we certainly don't want to throw it away. Instead, we now detect this case and return `InvalidData` with some context. # Expected complexity level and risk 1 # Testing - [ ] A (regression) test is included
1 parent f2cf5d1 commit 798852e

File tree

5 files changed

+96
-8
lines changed

5 files changed

+96
-8
lines changed

crates/commitlog/src/commitlog.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,25 @@ impl<R: Repo, T> Generic<R, T> {
6161
}
6262
let head = if let Some(last) = tail.pop() {
6363
debug!("resuming last segment: {last}");
64+
// Resume the last segment for writing, or create a new segment
65+
// starting from the last good commit + 1.
6466
repo::resume_segment_writer(&repo, opts, last)?.or_else(|meta| {
67+
// The first commit in the last segment being corrupt is an
68+
// edge case: we'd try to start a new segment with an offset
69+
// equal to the already existing one, which would fail.
70+
//
71+
// We cannot just skip it either, as we don't know the reason
72+
// for the corruption (there could be more, potentially
73+
// recoverable commits in the segment).
74+
//
75+
// Thus, provide some context about what is wrong and refuse to
76+
// start.
77+
if meta.tx_range.is_empty() {
78+
return Err(io::Error::new(
79+
io::ErrorKind::InvalidData,
80+
format!("repo {}: first commit in resumed segment {} is corrupt", repo, last),
81+
));
82+
}
6583
tail.push(meta.tx_range.start);
6684
repo::create_segment_writer(&repo, opts, meta.max_epoch, meta.tx_range.end)
6785
})?

crates/commitlog/src/repo/fs.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::fmt;
12
use std::fs::{self, File};
23
use std::io;
34
use std::sync::Arc;
@@ -76,6 +77,12 @@ impl Fs {
7677
}
7778
}
7879

80+
impl fmt::Display for Fs {
81+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82+
write!(f, "{}", self.root.display())
83+
}
84+
}
85+
7986
impl SegmentLen for File {}
8087

8188
impl FileLike for NamedTempFile {
@@ -101,11 +108,18 @@ impl Repo for Fs {
101108
.or_else(|e| {
102109
if e.kind() == io::ErrorKind::AlreadyExists {
103110
debug!("segment {offset} already exists");
111+
// If the segment is completely empty, we can resume writing.
104112
let file = self.open_segment_writer(offset)?;
105113
if file.metadata()?.len() == 0 {
106114
debug!("segment {offset} is empty");
107115
return Ok(file);
108116
}
117+
118+
// Otherwise, provide some context.
119+
return Err(io::Error::new(
120+
io::ErrorKind::AlreadyExists,
121+
format!("repo {}: segment {} already exists and is non-empty", self, offset),
122+
));
109123
}
110124

111125
Err(e)
@@ -131,7 +145,10 @@ impl Repo for Fs {
131145

132146
fn remove_segment(&self, offset: u64) -> io::Result<()> {
133147
let _ = self.remove_offset_index(offset).map_err(|e| {
134-
warn!("failed to remove offset index for segment {offset}, error: {e}");
148+
warn!(
149+
"repo {}: failed to remove offset index for segment {}: {}",
150+
self, offset, e
151+
);
135152
});
136153
fs::remove_file(self.segment_path(offset))
137154
}

crates/commitlog/src/repo/mem.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::{
22
collections::{btree_map, BTreeMap},
3-
io,
3+
fmt, io,
44
sync::{Arc, RwLock, RwLockWriteGuard},
55
};
66

@@ -213,6 +213,12 @@ impl Memory {
213213
}
214214
}
215215

216+
impl fmt::Display for Memory {
217+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218+
f.write_str("<memory>")
219+
}
220+
}
221+
216222
impl Repo for Memory {
217223
type SegmentWriter = Segment;
218224
type SegmentReader = io::BufReader<Segment>;

crates/commitlog/src/repo/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::io;
1+
use std::{fmt, io};
22

33
use log::{debug, warn};
44

@@ -59,7 +59,10 @@ impl<T: FileLike + io::Read + io::Write + SegmentLen + Send + Sync> SegmentWrite
5959
///
6060
/// This is mainly an internal trait to allow testing against an in-memory
6161
/// representation.
62-
pub trait Repo: Clone {
62+
///
63+
/// The [fmt::Display] should provide context about the location of the repo,
64+
/// e.g. the root directory for a filesystem-based implementation.
65+
pub trait Repo: Clone + fmt::Display {
6366
/// The type of log segments managed by this repo, which must behave like a file.
6467
type SegmentWriter: SegmentWriter + 'static;
6568
type SegmentReader: SegmentReader + 'static;

crates/commitlog/src/tests/partial.rs

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
use std::{
22
cmp,
3-
fmt::Debug,
3+
fmt::{self, Debug},
44
io::{self, Seek as _, SeekFrom},
5-
iter::repeat,
5+
iter::{self, repeat},
6+
num::NonZeroU16,
67
sync::RwLockWriteGuard,
78
};
89

910
use log::debug;
11+
use pretty_assertions::assert_matches;
1012

1113
use crate::{
1214
commitlog, error, payload,
1315
repo::{self, Repo, SegmentLen},
14-
segment::FileLike,
15-
tests::helpers::enable_logging,
16+
segment::{self, FileLike},
17+
tests::helpers::{enable_logging, fill_log_with},
1618
Commit, Encode, Options, DEFAULT_LOG_FORMAT_VERSION,
1719
};
1820

@@ -140,6 +142,42 @@ fn overwrite_reopen() {
140142
);
141143
}
142144

145+
/// Edge case surfaced in production:
146+
///
147+
/// If the first commit in the last segment is corrupt, creating a new segment
148+
/// would fail because the `tx_range` is the same as the corrupt segment.
149+
///
150+
/// We don't automatically recover from that, but test that `open` returns an
151+
/// error providing some context.
152+
#[test]
153+
fn first_commit_in_last_segment_corrupt() {
154+
enable_logging();
155+
156+
let repo = repo::Memory::new();
157+
let options = Options {
158+
max_segment_size: 512,
159+
max_records_in_commit: NonZeroU16::new(1).unwrap(),
160+
..<_>::default()
161+
};
162+
{
163+
let mut log = commitlog::Generic::open(repo.clone(), options).unwrap();
164+
fill_log_with(&mut log, iter::once([b'x'; 64]).cycle().take(9));
165+
}
166+
let segments = repo.existing_offsets().unwrap();
167+
assert_eq!(2, segments.len(), "repo should contain 2 segments");
168+
169+
{
170+
let last_segment = repo.open_segment_writer(*segments.last().unwrap()).unwrap();
171+
let mut data = last_segment.buf_mut();
172+
data[segment::Header::LEN + 1..].fill(0);
173+
}
174+
175+
assert_matches!(
176+
commitlog::Generic::<_, [u8; 64]>::open(repo, options),
177+
Err(e) if e.kind() == io::ErrorKind::InvalidData,
178+
);
179+
}
180+
143181
fn open_log<T>(repo: ShortMem) -> commitlog::Generic<ShortMem, T> {
144182
commitlog::Generic::open(
145183
repo,
@@ -229,6 +267,12 @@ impl ShortMem {
229267
}
230268
}
231269

270+
impl fmt::Display for ShortMem {
271+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
272+
fmt::Display::fmt(&self.inner, f)
273+
}
274+
}
275+
232276
impl Repo for ShortMem {
233277
type SegmentWriter = ShortSegment;
234278
type SegmentReader = io::BufReader<repo::mem::Segment>;

0 commit comments

Comments
 (0)