Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/common/meta/src/instruction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,18 @@ pub struct FlushRegions {
pub region_ids: Vec<RegionId>,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GcRegions {
pub region_ids: Vec<RegionId>,
pub ts_millis: i64,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct CollectFileRefs {
pub region_id: RegionId,
pub ts_millis: i64,
}

#[derive(Debug, Clone, Serialize, Deserialize, Display, PartialEq)]
pub enum Instruction {
/// Opens a region.
Expand All @@ -245,6 +257,10 @@ pub enum Instruction {
FlushRegions(FlushRegions),
/// Flushes a single region.
FlushRegion(RegionId),
/// Triggers garbage collection for a table.
GcRegions(GcRegions),
/// Trigger datanode to collect and upload table reference to object storage.
CollectFileRefs(CollectFileRefs),
}

/// The reply of [UpgradeRegion].
Expand Down Expand Up @@ -276,6 +292,8 @@ pub enum InstructionReply {
UpgradeRegion(UpgradeRegionReply),
DowngradeRegion(DowngradeRegionReply),
FlushRegion(SimpleReply),
GcRegions(SimpleReply),
CollectFileRefs(SimpleReply),
}

impl Display for InstructionReply {
Expand All @@ -288,6 +306,10 @@ impl Display for InstructionReply {
write!(f, "InstructionReply::DowngradeRegion({})", reply)
}
Self::FlushRegion(reply) => write!(f, "InstructionReply::FlushRegion({})", reply),
Self::GcRegions(reply) => write!(f, "InstructionReply::GcRegions({})", reply),
Self::CollectFileRefs(reply) => {
write!(f, "InstructionReply::CollectFileRefs({})", reply)
}
}
}
}
Expand Down
41 changes: 40 additions & 1 deletion src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::sync::Arc;

use futures::stream::BoxStream;
use futures_util::TryStreamExt;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
Expand All @@ -33,8 +35,9 @@ use crate::key::{
};
use crate::kv_backend::txn::Txn;
use crate::kv_backend::KvBackendRef;
use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
use crate::rpc::router::{region_distribution, RegionRoute};
use crate::rpc::store::BatchGetRequest;
use crate::rpc::store::{BatchGetRequest, RangeRequest};

/// The key stores table routes
///
Expand Down Expand Up @@ -554,9 +557,41 @@ impl TableRouteManager {
pub fn table_route_storage(&self) -> &TableRouteStorage {
&self.storage
}

/// Returns a stream of all physical table ids.
pub fn physical_table_values(
&self,
) -> BoxStream<'static, Result<(TableId, PhysicalTableRouteValue)>> {
let key = TableRouteKey::range_prefix();
let req = RangeRequest::new().with_prefix(key);

let stream = PaginationStream::new(
self.storage.kv_backend().clone(),
req,
DEFAULT_PAGE_SIZE,
|kv| {
let key = TableRouteKey::from_bytes(&kv.key)?;
Ok(key.table_id)
},
)
.into_stream();
let storage = self.storage.clone();

Box::pin(stream.try_filter_map(move |table_id| {
let storage = storage.clone();
async move {
let table_route = storage.get_inner(table_id).await?;
match table_route {
Some(TableRouteValue::Physical(val)) => Ok(Some((table_id, val))),
_ => Ok(None),
}
}
}))
}
}

/// Low-level operations of [TableRouteValue].
#[derive(Clone)]
pub struct TableRouteStorage {
kv_backend: KvBackendRef,
}
Expand All @@ -568,6 +603,10 @@ impl TableRouteStorage {
Self { kv_backend }
}

pub fn kv_backend(&self) -> &KvBackendRef {
&self.kv_backend
}

/// Builds a create table route transaction,
/// it expected the `__table_route/{table_id}` wasn't occupied.
pub fn build_create_txn(
Expand Down
15 changes: 14 additions & 1 deletion src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use meta_client::MetaClientRef;
use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::{MitoEngine, MitoEngineBuilder};
use mito2::sst::file_purger::{FileReferenceManager, FileReferenceManagerRef};
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
Expand Down Expand Up @@ -238,8 +239,13 @@ impl DatanodeBuilder {
table_id_schema_cache,
schema_cache,
));
let file_ref_manager = Arc::new(FileReferenceManager::new(node_id));
let region_server = self
.new_region_server(schema_metadata_manager, region_event_listener)
.new_region_server(
schema_metadata_manager,
region_event_listener,
file_ref_manager,
)
.await?;

// TODO(weny): Considering introducing a readonly kv_backend trait.
Expand Down Expand Up @@ -361,6 +367,7 @@ impl DatanodeBuilder {
&mut self,
schema_metadata_manager: SchemaMetadataManagerRef,
event_listener: RegionServerEventListenerRef,
file_ref_manager: FileReferenceManagerRef,
) -> Result<RegionServer> {
let opts: &DatanodeOptions = &self.opts;

Expand Down Expand Up @@ -399,6 +406,7 @@ impl DatanodeBuilder {
.build_store_engines(
object_store_manager,
schema_metadata_manager,
file_ref_manager,
self.plugins.clone(),
)
.await?;
Expand All @@ -419,6 +427,7 @@ impl DatanodeBuilder {
&mut self,
object_store_manager: ObjectStoreManagerRef,
schema_metadata_manager: SchemaMetadataManagerRef,
file_ref_manager: FileReferenceManagerRef,
plugins: Plugins,
) -> Result<Vec<RegionEngineRef>> {
let mut metric_engine_config = metric_engine::config::EngineConfig::default();
Expand All @@ -444,6 +453,7 @@ impl DatanodeBuilder {
object_store_manager.clone(),
mito_engine_config,
schema_metadata_manager.clone(),
file_ref_manager.clone(),
plugins.clone(),
)
.await?;
Expand All @@ -469,6 +479,7 @@ impl DatanodeBuilder {
object_store_manager: ObjectStoreManagerRef,
mut config: MitoConfig,
schema_metadata_manager: SchemaMetadataManagerRef,
file_ref_manager: FileReferenceManagerRef,
plugins: Plugins,
) -> Result<MitoEngine> {
let opts = &self.opts;
Expand All @@ -490,6 +501,7 @@ impl DatanodeBuilder {
log_store,
object_store_manager,
schema_metadata_manager,
file_ref_manager,
plugins,
);

Expand Down Expand Up @@ -530,6 +542,7 @@ impl DatanodeBuilder {
log_store,
object_store_manager,
schema_metadata_manager,
file_ref_manager,
plugins,
);

Expand Down
10 changes: 9 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ pub enum Error {
location: Location,
},

#[snafu(display("Failed to run gc for mito engine"))]
GcMitoEngine {
region_id: RegionId,
source: mito2::error::Error,
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Failed to build metric engine"))]
BuildMetricEngine {
source: metric_engine::error::Error,
Expand Down Expand Up @@ -451,7 +459,7 @@ impl ErrorExt for Error {
StopRegionEngine { source, .. } => source.status_code(),

FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } | GcMitoEngine { source, .. } => source.status_code(),
BuildMetricEngine { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
Expand Down
17 changes: 17 additions & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use store_api::storage::RegionId;
mod close_region;
mod downgrade_region;
mod flush_region;
mod gc_regions;
mod open_region;
mod upgrade_region;

Expand All @@ -40,6 +41,7 @@ pub struct RegionHeartbeatResponseHandler {
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
gc_tasks: TaskTracker<()>,
}

/// Handler of the instruction.
Expand All @@ -52,6 +54,7 @@ pub struct HandlerContext {
catchup_tasks: TaskTracker<()>,
downgrade_tasks: TaskTracker<()>,
flush_tasks: TaskTracker<()>,
gc_tasks: TaskTracker<()>,
}

impl HandlerContext {
Expand All @@ -66,6 +69,7 @@ impl HandlerContext {
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
gc_tasks: TaskTracker::new(),
}
}
}
Expand All @@ -78,6 +82,7 @@ impl RegionHeartbeatResponseHandler {
catchup_tasks: TaskTracker::new(),
downgrade_tasks: TaskTracker::new(),
flush_tasks: TaskTracker::new(),
gc_tasks: TaskTracker::new(),
}
}

Expand Down Expand Up @@ -105,6 +110,14 @@ impl RegionHeartbeatResponseHandler {
Instruction::FlushRegion(flush_region) => Ok(Box::new(move |handler_context| {
handler_context.handle_flush_region_instruction(flush_region)
})),
Instruction::GcRegions(gc_regions) => Ok(Box::new(move |handler_context| {
handler_context.handle_gc_regions_instruction(gc_regions)
})),
Instruction::CollectFileRefs(collect_file_refs) => {
Ok(Box::new(move |handler_context| {
handler_context.handle_collect_file_refs_instruction(collect_file_refs)
}))
}
}
}
}
Expand All @@ -120,6 +133,8 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
| Some((_, Instruction::UpgradeRegion { .. }))
| Some((_, Instruction::FlushRegion { .. }))
| Some((_, Instruction::FlushRegions { .. }))
| Some((_, Instruction::GcRegions { .. }))
| Some((_, Instruction::CollectFileRefs { .. }))
)
}

Expand All @@ -134,13 +149,15 @@ impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
let catchup_tasks = self.catchup_tasks.clone();
let downgrade_tasks = self.downgrade_tasks.clone();
let flush_tasks = self.flush_tasks.clone();
let gc_tasks = self.gc_tasks.clone();
let handler = Self::build_handler(instruction)?;
let _handle = common_runtime::spawn_global(async move {
let reply = handler(HandlerContext {
region_server,
catchup_tasks,
downgrade_tasks,
flush_tasks,
gc_tasks,
})
.await;

Expand Down
Loading
Loading