Skip to content

Commit 73d525b

Browse files
authored
feat(storage): support write anytime and spill to disk (#2919)
* implement write anytime and spill to disk * add unit tests for write any time
1 parent 1c467da commit 73d525b

File tree

9 files changed

+713
-260
lines changed

9 files changed

+713
-260
lines changed

src/storage/src/hummock/compactor.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ use super::{HummockResult, SSTableBuilder, SSTableIterator, SSTableIteratorType,
4141
use crate::hummock::compaction_executor::CompactionExecutor;
4242
use crate::hummock::iterator::ReadOptions;
4343
use crate::hummock::shared_buffer::shared_buffer_uploader::UploadTaskPayload;
44+
use crate::hummock::shared_buffer::{build_ordered_merge_iter, UncommittedData};
4445
use crate::hummock::sstable_store::SstableStoreRef;
46+
use crate::hummock::state_store::ForwardIter;
4547
use crate::hummock::utils::can_concat;
4648
use crate::hummock::vacuum::Vacuum;
4749
use crate::hummock::{CachePolicy, HummockError};
@@ -115,9 +117,11 @@ impl Compactor {
115117
pub async fn compact_shared_buffer(
116118
context: Arc<CompactorContext>,
117119
payload: &UploadTaskPayload,
118-
stats: Arc<StateStoreMetrics>,
119120
) -> HummockResult<Vec<(Sstable, Vec<VNodeBitmap>)>> {
120-
let mut start_user_keys = payload.iter().map(|m| m.start_user_key()).collect_vec();
121+
let mut start_user_keys = payload
122+
.iter()
123+
.flat_map(|data_list| data_list.iter().map(UncommittedData::start_user_key))
124+
.collect_vec();
121125
start_user_keys.sort();
122126
start_user_keys.dedup();
123127
let mut splits = Vec::with_capacity(start_user_keys.len());
@@ -158,6 +162,9 @@ impl Compactor {
158162
vnode_mappings: vec![],
159163
};
160164

165+
let sstable_store = context.sstable_store.clone();
166+
let stats = context.stats.clone();
167+
161168
let parallelism = compact_task.splits.len();
162169
let mut compact_success = true;
163170
let mut output_ssts = Vec::with_capacity(parallelism);
@@ -166,14 +173,17 @@ impl Compactor {
166173

167174
let vnode2unit: Arc<HashMap<u32, Vec<u32>>> = Arc::new(HashMap::new());
168175

176+
let mut local_stats = StoreLocalStatistic::default();
169177
for (split_index, _) in compact_task.splits.iter().enumerate() {
170178
let compactor = compactor.clone();
171-
let iter = {
172-
let iters = payload.iter().map(|m| {
173-
Box::new(m.clone().into_forward_iter()) as BoxedForwardHummockIterator
174-
});
175-
Box::new(MergeIterator::new(iters, stats.clone()))
176-
};
179+
let iter = build_ordered_merge_iter::<ForwardIter>(
180+
payload,
181+
sstable_store.clone(),
182+
stats.clone(),
183+
&mut local_stats,
184+
Arc::new(ReadOptions::default()),
185+
)
186+
.await? as BoxedForwardHummockIterator;
177187
let vnode2unit = vnode2unit.clone();
178188
let compaction_executor = compactor.context.compaction_executor.as_ref().cloned();
179189
let split_task = async move {
@@ -184,6 +194,7 @@ impl Compactor {
184194
let rx = Compactor::request_execution(compaction_executor, split_task)?;
185195
compaction_futures.push(rx);
186196
}
197+
local_stats.report(stats.as_ref());
187198

188199
let mut buffered = stream::iter(compaction_futures).buffer_unordered(parallelism);
189200
let mut err = None;

src/storage/src/hummock/iterator/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub use forward_merge::*;
2929
pub mod forward_user;
3030
mod merge_inner;
3131
pub use forward_user::*;
32+
pub use merge_inner::{OrderedMergeIteratorInner, UnorderedMergeIteratorInner};
3233

3334
#[cfg(test)]
3435
pub(crate) mod test_utils;

src/storage/src/hummock/local_version.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use risingwave_pb::hummock::{HummockVersion, Level};
2222
use tokio::sync::mpsc::UnboundedSender;
2323

2424
use super::shared_buffer::SharedBuffer;
25+
use crate::hummock::shared_buffer::shared_buffer_uploader::UploadTaskPayload;
26+
use crate::hummock::shared_buffer::{OrderIndex, UploadTaskType};
2527

2628
#[derive(Debug, Clone)]
2729
pub struct LocalVersion {
@@ -90,6 +92,16 @@ impl LocalVersion {
9092
pinned_version: self.pinned_version.clone(),
9193
}
9294
}
95+
96+
pub fn new_upload_task(
97+
&self,
98+
epoch: HummockEpoch,
99+
task_type: UploadTaskType,
100+
) -> Option<(OrderIndex, UploadTaskPayload)> {
101+
self.shared_buffer
102+
.get(&epoch)
103+
.and_then(|shared_buffer| shared_buffer.write().new_upload_task(task_type))
104+
}
93105
}
94106

95107
#[derive(Debug)]

src/storage/src/hummock/local_version_manager.rs

Lines changed: 108 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use super::SstableStoreRef;
3737
use crate::hummock::conflict_detector::ConflictDetector;
3838
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferItem;
3939
use crate::hummock::shared_buffer::shared_buffer_uploader::UploadTask;
40-
use crate::hummock::shared_buffer::SharedBuffer;
40+
use crate::hummock::shared_buffer::UploadTaskType::{FlushWriteBatch, SyncEpoch};
4141
use crate::hummock::utils::validate_table_key_range;
4242
use crate::hummock::{
4343
HummockEpoch, HummockError, HummockResult, HummockVersionId, INVALID_VERSION_ID,
@@ -231,7 +231,8 @@ impl LocalVersionManager {
231231

232232
let batch_size = SharedBufferBatch::measure_batch_size(&sorted_items);
233233
while !self.buffer_tracker.can_write() {
234-
self.sync_shared_buffer(None).await?;
234+
// TODO: may apply high-low memory threshold here to avoid always await here.
235+
self.flush_shared_buffer().await?;
235236
}
236237

237238
let batch = SharedBufferBatch::new_with_size(
@@ -265,90 +266,112 @@ impl LocalVersionManager {
265266
Ok(batch_size)
266267
}
267268

268-
pub async fn sync_shared_buffer(&self, epoch: Option<HummockEpoch>) -> HummockResult<()> {
269-
if self.buffer_tracker.is_empty() {
270-
return Ok(());
269+
pub async fn flush_shared_buffer(&self) -> HummockResult<()> {
270+
// The current implementation is a trivial one, which issue only one flush task and wait for
271+
// the task to finish.
272+
//
273+
// TODO: apply high-low threshold here and avoid always await here.
274+
let mut task = None;
275+
for (epoch, shared_buffer) in self.local_version.read().iter_shared_buffer() {
276+
if let Some((order_index, task_data)) =
277+
shared_buffer.write().new_upload_task(FlushWriteBatch)
278+
{
279+
// TODO: may apply different `is_local` according to whether local spill is enabled.
280+
task = Some(UploadTask::new(order_index, *epoch, task_data, true));
281+
break;
282+
}
271283
}
284+
let task = match task {
285+
Some(task) => task,
286+
None => return Ok(()),
287+
};
272288

273-
let mut tasks = vec![];
289+
let epoch = task.epoch;
274290

275-
let mut handle_epoch = |epoch: &HummockEpoch, shared_buffer: &Arc<RwLock<SharedBuffer>>| {
276-
let mut guard = shared_buffer.write();
277-
if let Some((task_id, task_data)) = guard.new_upload_task() {
278-
tasks.push(UploadTask::new(task_id, *epoch, task_data));
279-
}
280-
};
291+
let (tx, rx) = oneshot::channel();
292+
self.worker_context
293+
.shared_buffer_uploader_tx
294+
.send(UploadItem::new(vec![task], tx))
295+
.map_err(HummockError::shared_buffer_error)?;
296+
let ((_, order_index), task_result) = rx
297+
.await
298+
.map_err(HummockError::shared_buffer_error)?
299+
.pop_first()
300+
.expect("the result should not be empty");
281301

282-
{
283-
let guard = self.local_version.read();
284-
match epoch {
285-
Some(epoch) => match guard.get_shared_buffer(epoch) {
286-
None => return Ok(()),
287-
Some(shared_buffer) => {
288-
handle_epoch(&epoch, shared_buffer);
289-
}
290-
},
291-
None => {
292-
for (epoch, shared_buffer) in guard.iter_shared_buffer() {
293-
handle_epoch(epoch, shared_buffer);
294-
}
295-
}
302+
let local_version_guard = self.local_version.read();
303+
let mut shared_buffer_guard = local_version_guard
304+
.get_shared_buffer(epoch)
305+
.expect("shared buffer should exist since some uncommitted data is not committed yet")
306+
.write();
307+
308+
match task_result {
309+
Ok(ssts) => {
310+
shared_buffer_guard.succeed_upload_task(order_index, ssts);
311+
Ok(())
312+
}
313+
Err(e) => {
314+
shared_buffer_guard.fail_upload_task(order_index);
315+
Err(e)
296316
}
297317
}
318+
}
298319

299-
if tasks.is_empty() {
300-
return Ok(());
320+
pub async fn sync_shared_buffer(&self, epoch: Option<HummockEpoch>) -> HummockResult<()> {
321+
let epochs = match epoch {
322+
Some(epoch) => vec![epoch],
323+
None => self
324+
.local_version
325+
.read()
326+
.iter_shared_buffer()
327+
.map(|(epoch, _)| *epoch)
328+
.collect(),
329+
};
330+
for epoch in epochs {
331+
self.sync_shared_buffer_epoch(epoch).await?;
301332
}
333+
Ok(())
334+
}
335+
336+
pub async fn sync_shared_buffer_epoch(&self, epoch: HummockEpoch) -> HummockResult<()> {
337+
let task = {
338+
match self.local_version.read().new_upload_task(epoch, SyncEpoch) {
339+
Some((order_index, task_data)) => {
340+
UploadTask::new(order_index, epoch, task_data, false)
341+
}
342+
None => return Ok(()),
343+
}
344+
};
302345

303346
let (tx, rx) = oneshot::channel();
304347
self.worker_context
305348
.shared_buffer_uploader_tx
306-
.send(UploadItem::new(tasks, tx))
349+
.send(UploadItem::new(vec![task], tx))
307350
.map_err(HummockError::shared_buffer_error)?;
308-
let upload_result = rx.await.map_err(HummockError::shared_buffer_error)?;
309-
310-
let failed_epoch = upload_result
311-
.iter()
312-
.filter_map(
313-
|((epoch, _), result)| {
314-
if result.is_err() {
315-
Some(*epoch)
316-
} else {
317-
None
318-
}
319-
},
320-
)
321-
.collect_vec();
322-
323-
if failed_epoch.len() < upload_result.len() {
324-
// only acquire the lock when any of the upload task succeed.
325-
let guard = self.local_version.read();
326-
for ((epoch, task_id), result) in upload_result {
327-
match result {
328-
Ok(ssts) => {
329-
if let Some(shared_buffer) = guard.get_shared_buffer(epoch) {
330-
shared_buffer.write().succeed_upload_task(task_id, ssts);
331-
}
332-
if let Some(conflict_detector) = self.write_conflict_detector.as_ref() {
333-
conflict_detector.archive_epoch(epoch);
334-
}
335-
}
336-
Err(_) => {
337-
if let Some(shared_buffer) = guard.get_shared_buffer(epoch) {
338-
shared_buffer.write().fail_upload_task(task_id);
339-
}
340-
}
351+
let ((_, order_index), task_result) = rx
352+
.await
353+
.map_err(HummockError::shared_buffer_error)?
354+
.pop_first()
355+
.expect("the result should not be empty");
356+
357+
let local_version_guard = self.local_version.read();
358+
let mut shared_buffer_guard = local_version_guard
359+
.get_shared_buffer(epoch)
360+
.expect("shared buffer should exist since some uncommitted data is not committed yet")
361+
.write();
362+
363+
match task_result {
364+
Ok(ssts) => {
365+
shared_buffer_guard.succeed_upload_task(order_index, ssts);
366+
if let Some(conflict_detector) = self.write_conflict_detector.as_ref() {
367+
conflict_detector.archive_epoch(epoch);
341368
}
369+
Ok(())
370+
}
371+
Err(e) => {
372+
shared_buffer_guard.fail_upload_task(order_index);
373+
Err(e)
342374
}
343-
};
344-
345-
if failed_epoch.is_empty() {
346-
Ok(())
347-
} else {
348-
Err(HummockError::shared_buffer_error(format!(
349-
"Failed to sync epochs: {:?}",
350-
failed_epoch
351-
)))
352375
}
353376
}
354377

@@ -546,48 +569,21 @@ mod tests {
546569
use std::sync::Arc;
547570

548571
use bytes::Bytes;
549-
use risingwave_hummock_sdk::HummockSSTableId;
550572
use risingwave_meta::hummock::test_utils::setup_compute_env;
551573
use risingwave_meta::hummock::MockHummockMetaClient;
552-
use risingwave_pb::hummock::{HummockVersion, KeyRange, SstableInfo};
574+
use risingwave_pb::hummock::HummockVersion;
553575

554576
use super::LocalVersionManager;
555577
use crate::hummock::conflict_detector::ConflictDetector;
556-
use crate::hummock::iterator::test_utils::{iterator_test_key_of_epoch, mock_sstable_store};
578+
use crate::hummock::iterator::test_utils::mock_sstable_store;
557579
use crate::hummock::shared_buffer::shared_buffer_batch::SharedBufferBatch;
558-
use crate::hummock::test_utils::default_config_for_test;
580+
use crate::hummock::shared_buffer::UncommittedData;
581+
use crate::hummock::shared_buffer::UploadTaskType::SyncEpoch;
582+
use crate::hummock::test_utils::{
583+
default_config_for_test, gen_dummy_batch, gen_dummy_sst_info,
584+
};
559585
use crate::monitor::StateStoreMetrics;
560-
use crate::storage_value::{StorageValue, ValueMeta};
561-
562-
fn gen_dummy_batch(epoch: u64) -> Vec<(Bytes, StorageValue)> {
563-
vec![(
564-
iterator_test_key_of_epoch(0, epoch).into(),
565-
StorageValue::new_put(ValueMeta::default(), b"value1".to_vec()),
566-
)]
567-
}
568-
569-
fn gen_dummy_sst_info(id: HummockSSTableId, batches: Vec<SharedBufferBatch>) -> SstableInfo {
570-
let mut min_key: Vec<u8> = batches[0].start_key().to_vec();
571-
let mut max_key: Vec<u8> = batches[0].end_key().to_vec();
572-
for batch in batches.iter().skip(1) {
573-
if min_key.as_slice() > batch.start_key() {
574-
min_key = batch.start_key().to_vec();
575-
}
576-
if max_key.as_slice() < batch.end_key() {
577-
max_key = batch.end_key().to_vec();
578-
}
579-
}
580-
SstableInfo {
581-
id,
582-
key_range: Some(KeyRange {
583-
left: min_key,
584-
right: max_key,
585-
inf: false,
586-
}),
587-
file_size: batches.len() as u64,
588-
vnode_bitmaps: vec![],
589-
}
590-
}
586+
use crate::storage_value::StorageValue;
591587

592588
#[tokio::test]
593589
async fn test_update_pinned_version() {
@@ -733,11 +729,11 @@ mod tests {
733729
.get_shared_buffer(epochs[0])
734730
.unwrap()
735731
.write();
736-
let (task_id, mut payload) = shared_buffer_guard.new_upload_task().unwrap();
732+
let (task_id, payload) = shared_buffer_guard.new_upload_task(SyncEpoch).unwrap();
737733
{
738734
assert_eq!(1, payload.len());
739-
let batch = payload.pop().unwrap();
740-
assert_eq!(batch, batches[0]);
735+
assert_eq!(1, payload[0].len());
736+
assert_eq!(payload[0][0], UncommittedData::Batch(batches[0].clone()));
741737
}
742738
shared_buffer_guard.succeed_upload_task(task_id, vec![sst1.clone()]);
743739
}
@@ -781,11 +777,11 @@ mod tests {
781777
.get_shared_buffer(epochs[1])
782778
.unwrap()
783779
.write();
784-
let (task_id, mut payload) = shared_buffer_guard.new_upload_task().unwrap();
780+
let (task_id, payload) = shared_buffer_guard.new_upload_task(SyncEpoch).unwrap();
785781
{
786782
assert_eq!(1, payload.len());
787-
let batch = payload.pop().unwrap();
788-
assert_eq!(batch, batches[1]);
783+
assert_eq!(1, payload[0].len());
784+
assert_eq!(payload[0][0], UncommittedData::Batch(batches[1].clone()));
789785
}
790786
shared_buffer_guard.succeed_upload_task(task_id, vec![sst2.clone()]);
791787
}

0 commit comments

Comments
 (0)