Skip to content
Merged
13 changes: 8 additions & 5 deletions src/catalog/src/system_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,21 +137,24 @@ impl DataSource for SystemTableDataSource {
&self,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
let projection = request.projection.clone();
let projected_schema = match &projection {
let projected_schema = match &request.projection {
Some(projection) => self.try_project(projection)?,
None => self.table.schema(),
};

let projection = request.projection.clone();
let stream = self
.table
.to_stream(request)
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)
.map_err(BoxedError::new)?
.map(move |batch| match &projection {
Some(p) => batch.and_then(|b| b.try_project(p)),
None => batch,
.map(move |batch| match (&projection, batch) {
// Some tables (e.g., inspect tables) already honor projection in their inner stream;
// others ignore it and return full rows. We will only apply projection here if the
// inner batch width doesn't match the projection size.
(Some(p), Ok(b)) if b.num_columns() != p.len() => b.try_project(p),
(_, res) => res,
});

let stream = RecordBatchStreamWrapper {
Expand Down
18 changes: 18 additions & 0 deletions src/catalog/src/system_schema/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod region_peers;
mod region_statistics;
mod runtime_metrics;
pub mod schemata;
mod ssts;
mod table_constraints;
mod table_names;
pub mod tables;
Expand Down Expand Up @@ -66,6 +67,9 @@ use crate::system_schema::information_schema::partitions::InformationSchemaParti
use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
use crate::system_schema::information_schema::ssts::{
InformationSchemaSstsManifest, InformationSchemaSstsStorage,
};
use crate::system_schema::information_schema::table_constraints::InformationSchemaTableConstraints;
use crate::system_schema::information_schema::tables::InformationSchemaTables;
use crate::system_schema::memory_table::MemoryTable;
Expand Down Expand Up @@ -253,6 +257,12 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
.process_manager
.as_ref()
.map(|p| Arc::new(InformationSchemaProcessList::new(p.clone())) as _),
SSTS_MANIFEST => Some(Arc::new(InformationSchemaSstsManifest::new(
self.catalog_manager.clone(),
)) as _),
SSTS_STORAGE => Some(Arc::new(InformationSchemaSstsStorage::new(
self.catalog_manager.clone(),
)) as _),
_ => None,
}
}
Expand Down Expand Up @@ -324,6 +334,14 @@ impl InformationSchemaProvider {
REGION_STATISTICS.to_string(),
self.build_table(REGION_STATISTICS).unwrap(),
);
tables.insert(
SSTS_MANIFEST.to_string(),
self.build_table(SSTS_MANIFEST).unwrap(),
);
tables.insert(
SSTS_STORAGE.to_string(),
self.build_table(SSTS_STORAGE).unwrap(),
);
}

tables.insert(TABLES.to_string(), self.build_table(TABLES).unwrap());
Expand Down
142 changes: 142 additions & 0 deletions src/catalog/src/system_schema/information_schema/ssts.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::{Arc, Weak};

use common_catalog::consts::{
INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID, INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID,
};
use common_error::ext::BoxedError;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::adapter::AsyncRecordBatchStreamAdapter;
use datatypes::schema::SchemaRef;
use snafu::ResultExt;
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::{ScanRequest, TableId};

use crate::CatalogManager;
use crate::error::{ProjectSchemaSnafu, Result};
use crate::information_schema::{
DatanodeInspectKind, DatanodeInspectRequest, InformationTable, SSTS_MANIFEST, SSTS_STORAGE,
};
use crate::system_schema::utils;

/// Information schema table for sst manifest.
pub struct InformationSchemaSstsManifest {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaSstsManifest {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: ManifestSstEntry::schema(),
catalog_manager,
}
}
}

impl InformationTable for InformationSchemaSstsManifest {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID
}

fn table_name(&self) -> &'static str {
SSTS_MANIFEST
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = if let Some(p) = &request.projection {
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
} else {
self.schema.clone()
};
let info_ext = utils::information_extension(&self.catalog_manager)?;
let req = DatanodeInspectRequest {
kind: DatanodeInspectKind::SstManifest,
scan: request,
};

let future = async move {
info_ext
.inspect_datanode(req)
.await
.map_err(BoxedError::new)
.context(common_recordbatch::error::ExternalSnafu)
};
Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
schema,
Box::pin(future),
)))
}
}

/// Information schema table for sst storage.
pub struct InformationSchemaSstsStorage {
schema: SchemaRef,
catalog_manager: Weak<dyn CatalogManager>,
}

impl InformationSchemaSstsStorage {
pub(super) fn new(catalog_manager: Weak<dyn CatalogManager>) -> Self {
Self {
schema: StorageSstEntry::schema(),
catalog_manager,
}
}
}

impl InformationTable for InformationSchemaSstsStorage {
fn table_id(&self) -> TableId {
INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID
}

fn table_name(&self) -> &'static str {
SSTS_STORAGE
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
let schema = if let Some(p) = &request.projection {
Arc::new(self.schema.try_project(p).context(ProjectSchemaSnafu)?)
} else {
self.schema.clone()
};

let info_ext = utils::information_extension(&self.catalog_manager)?;
let req = DatanodeInspectRequest {
kind: DatanodeInspectKind::SstStorage,
scan: request,
};

let future = async move {
info_ext
.inspect_datanode(req)
.await
.map_err(BoxedError::new)
.context(common_recordbatch::error::ExternalSnafu)
};
Ok(Box::pin(AsyncRecordBatchStreamAdapter::new(
schema,
Box::pin(future),
)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ pub const FLOWS: &str = "flows";
pub const PROCEDURE_INFO: &str = "procedure_info";
pub const REGION_STATISTICS: &str = "region_statistics";
pub const PROCESS_LIST: &str = "process_list";
pub const SSTS_MANIFEST: &str = "ssts_manifest";
pub const SSTS_STORAGE: &str = "ssts_storage";
4 changes: 4 additions & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ pub const INFORMATION_SCHEMA_PROCEDURE_INFO_TABLE_ID: u32 = 34;
pub const INFORMATION_SCHEMA_REGION_STATISTICS_TABLE_ID: u32 = 35;
/// id for information_schema.process_list
pub const INFORMATION_SCHEMA_PROCESS_LIST_TABLE_ID: u32 = 36;
/// id for information_schema.ssts_manifest
pub const INFORMATION_SCHEMA_SSTS_MANIFEST_TABLE_ID: u32 = 37;
/// id for information_schema.ssts_storage
pub const INFORMATION_SCHEMA_SSTS_STORAGE_TABLE_ID: u32 = 38;

// ----- End of information_schema tables -----

Expand Down
Loading
Loading