diff --git a/docs/features.md b/docs/features.md index 9f6f88ff..49d859f7 100644 --- a/docs/features.md +++ b/docs/features.md @@ -105,7 +105,8 @@ let kite_sql = DataBaseBuilder::path("./data") - [x] View - Drop - [x] Table - - [ ] Index + - [x] Index + - Tips: `Drop Index table_name.index_name` - [x] View - Alert - [x] Add Column diff --git a/src/binder/drop_index.rs b/src/binder/drop_index.rs new file mode 100644 index 00000000..17bd2c45 --- /dev/null +++ b/src/binder/drop_index.rs @@ -0,0 +1,35 @@ +use crate::binder::{lower_ident, Binder}; +use crate::errors::DatabaseError; +use crate::planner::operator::drop_index::DropIndexOperator; +use crate::planner::operator::Operator; +use crate::planner::{Childrens, LogicalPlan}; +use crate::storage::Transaction; +use crate::types::value::DataValue; +use sqlparser::ast::ObjectName; +use std::sync::Arc; + +impl> Binder<'_, '_, T, A> { + pub(crate) fn bind_drop_index( + &mut self, + name: &ObjectName, + if_exists: &bool, + ) -> Result { + let table_name = name + .0 + .first() + .ok_or(DatabaseError::InvalidTable(name.to_string()))?; + let index_name = name.0.get(1).ok_or(DatabaseError::InvalidIndex)?; + + let table_name = Arc::new(lower_ident(table_name)); + let index_name = lower_ident(index_name); + + Ok(LogicalPlan::new( + Operator::DropIndex(DropIndexOperator { + table_name, + index_name, + if_exists: *if_exists, + }), + Childrens::None, + )) + } +} diff --git a/src/binder/expr.rs b/src/binder/expr.rs index 890d6d62..a7bd9ebf 100644 --- a/src/binder/expr.rs +++ b/src/binder/expr.rs @@ -7,7 +7,7 @@ use sqlparser::ast::{ BinaryOperator, CharLengthUnits, DataType, Expr, Function, FunctionArg, FunctionArgExpr, Ident, Query, UnaryOperator, Value, }; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::slice; use std::sync::Arc; @@ -293,6 +293,7 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T self.args, Some(self), ); + let mut sub_query = binder.bind_query(subquery)?; let sub_query_schema = sub_query.output_schema(); @@ -368,7 +369,15 @@ impl<'a, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, '_, T try_default!(&full_name.0, full_name.1); } if let Some(table) = full_name.0.or(bind_table_name) { - let source = self.context.bind_source(&table)?; + let (source, is_parent) = self.context.bind_source::(self.parent, &table, false)?; + + if is_parent { + self.parent_table_col + .entry(Arc::new(table.clone())) + .or_default() + .insert(full_name.1.clone()); + } + let schema_buf = self.table_schema_buf.entry(Arc::new(table)).or_default(); Ok(ScalarExpression::ColumnRef( diff --git a/src/binder/mod.rs b/src/binder/mod.rs index e70c4d2d..2f845b9f 100644 --- a/src/binder/mod.rs +++ b/src/binder/mod.rs @@ -8,6 +8,7 @@ mod create_view; mod delete; mod describe; mod distinct; +mod drop_index; mod drop_table; mod drop_view; mod explain; @@ -275,12 +276,19 @@ impl<'a, T: Transaction> BinderContext<'a, T> { Ok(source) } - pub fn bind_source<'b: 'a>(&self, table_name: &str) -> Result<&Source, DatabaseError> { + pub fn bind_source<'b: 'a, A: AsRef<[(&'static str, DataValue)]>>( + &self, + parent: Option<&'a Binder<'a, 'b, T, A>>, + table_name: &str, + is_parent: bool, + ) -> Result<(&'b Source, bool), DatabaseError> { if let Some(source) = self.bind_table.iter().find(|((t, alias, _), _)| { t.as_str() == table_name || matches!(alias.as_ref().map(|a| a.as_str() == table_name), Some(true)) }) { - Ok(source.1) + Ok((source.1, is_parent)) + } else if let Some(binder) = parent { + binder.context.bind_source(binder.parent, table_name, true) } else { Err(DatabaseError::InvalidTable(table_name.into())) } @@ -322,6 +330,7 @@ pub struct Binder<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> args: &'a A, with_pk: Option, pub(crate) parent: Option<&'b Binder<'a, 'b, T, A>>, + pub(crate) parent_table_col: HashMap>, } impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, 'b, T, A> { @@ -336,6 +345,7 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, ' args, with_pk: None, parent, + parent_table_col: Default::default(), } } @@ -375,6 +385,7 @@ impl<'a, 'b, T: Transaction, A: AsRef<[(&'static str, DataValue)]>> Binder<'a, ' match object_type { ObjectType::Table => self.bind_drop_table(&names[0], if_exists)?, ObjectType::View => self.bind_drop_view(&names[0], if_exists)?, + ObjectType::Index => self.bind_drop_index(&names[0], if_exists)?, _ => { return Err(DatabaseError::UnsupportedStmt( "only `Table` and `View` are allowed to be Dropped".to_string(), diff --git a/src/db.rs b/src/db.rs index ff08a4fa..29aed351 100644 --- a/src/db.rs +++ b/src/db.rs @@ -166,13 +166,18 @@ impl State { let best_plan = Self::default_optimizer(source_plan) .find_best(Some(&transaction.meta_loader(meta_cache)))?; - // println!("best_plan plan: {:#?}", best_plan); + //println!("best_plan plan: {:#?}", best_plan); Ok(best_plan) } pub(crate) fn default_optimizer(source_plan: LogicalPlan) -> HepOptimizer { HepOptimizer::new(source_plan) + .batch( + "Correlated Subquery".to_string(), + HepBatchStrategy::once_topdown(), + vec![NormalizationRuleImpl::CorrelateSubquery], + ) .batch( "Column Pruning".to_string(), HepBatchStrategy::once_topdown(), diff --git a/src/execution/ddl/drop_index.rs b/src/execution/ddl/drop_index.rs new file mode 100644 index 00000000..f0d5f316 --- /dev/null +++ b/src/execution/ddl/drop_index.rs @@ -0,0 +1,43 @@ +use crate::execution::{Executor, WriteExecutor}; +use crate::planner::operator::drop_index::DropIndexOperator; +use crate::storage::{StatisticsMetaCache, TableCache, Transaction, ViewCache}; +use crate::throw; +use crate::types::tuple_builder::TupleBuilder; + +pub struct DropIndex { + op: DropIndexOperator, +} + +impl From for DropIndex { + fn from(op: DropIndexOperator) -> Self { + Self { op } + } +} + +impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for DropIndex { + fn execute_mut( + self, + (table_cache, _, _): (&'a TableCache, &'a ViewCache, &'a StatisticsMetaCache), + transaction: *mut T, + ) -> Executor<'a> { + Box::new( + #[coroutine] + move || { + let DropIndexOperator { + table_name, + index_name, + if_exists, + } = self.op; + + throw!(unsafe { &mut (*transaction) }.drop_index( + table_cache, + table_name, + &index_name, + if_exists + )); + + yield Ok(TupleBuilder::build_result(index_name.to_string())); + }, + ) + } +} diff --git a/src/execution/ddl/mod.rs b/src/execution/ddl/mod.rs index 23769f7c..294c6c39 100644 --- a/src/execution/ddl/mod.rs +++ b/src/execution/ddl/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod create_index; pub(crate) mod create_table; pub(crate) mod create_view; pub(crate) mod drop_column; +pub(crate) mod drop_index; pub(crate) mod drop_table; pub(crate) mod drop_view; pub(crate) mod truncate; diff --git a/src/execution/dml/analyze.rs b/src/execution/dml/analyze.rs index 69be1397..3dfd1c90 100644 --- a/src/execution/dml/analyze.rs +++ b/src/execution/dml/analyze.rs @@ -19,6 +19,7 @@ use std::fmt::Formatter; use std::fs::DirEntry; use std::ops::Coroutine; use std::ops::CoroutineState; +use std::path::PathBuf; use std::pin::Pin; use std::sync::Arc; use std::{fmt, fs}; @@ -98,10 +99,7 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { } drop(coroutine); let mut values = Vec::with_capacity(builders.len()); - let dir_path = dirs::home_dir() - .expect("Your system does not have a Config directory!") - .join(DEFAULT_STATISTICS_META_PATH) - .join(table_name.as_str()); + let dir_path = Self::build_statistics_meta_path(&table_name); // For DEBUG // println!("Statistics Path: {:#?}", dir_path); throw!(fs::create_dir_all(&dir_path).map_err(DatabaseError::IO)); @@ -149,6 +147,15 @@ impl<'a, T: Transaction + 'a> WriteExecutor<'a, T> for Analyze { } } +impl Analyze { + pub fn build_statistics_meta_path(table_name: &TableName) -> PathBuf { + dirs::home_dir() + .expect("Your system does not have a Config directory!") + .join(DEFAULT_STATISTICS_META_PATH) + .join(table_name.as_str()) + } +} + impl fmt::Display for AnalyzeOperator { fn fmt(&self, f: &mut Formatter) -> fmt::Result { let indexes = self.index_metas.iter().map(|index| &index.name).join(", "); diff --git a/src/execution/dql/filter.rs b/src/execution/dql/filter.rs index 21ce815a..e55c0f83 100644 --- a/src/execution/dql/filter.rs +++ b/src/execution/dql/filter.rs @@ -35,12 +35,15 @@ impl<'a, T: Transaction + 'a> ReadExecutor<'a, T> for Filter { let schema = input.output_schema().clone(); + //println!("{:#?}114514'\n'1919810{:#?}",predicate,schema); + let mut coroutine = build_read(input, cache, transaction); while let CoroutineState::Yielded(tuple) = Pin::new(&mut coroutine).resume(()) { let tuple = throw!(tuple); - + //println!("-> Coroutine returned: {:?}", tuple); if throw!(throw!(predicate.eval(Some((&tuple, &schema)))).is_true()) { + //println!("-> throw!: {:?}", tuple); yield Ok(tuple); } } diff --git a/src/execution/mod.rs b/src/execution/mod.rs index 9c4ed8c6..1df7b16c 100644 --- a/src/execution/mod.rs +++ b/src/execution/mod.rs @@ -10,6 +10,7 @@ use crate::execution::ddl::create_index::CreateIndex; use crate::execution::ddl::create_table::CreateTable; use crate::execution::ddl::create_view::CreateView; use crate::execution::ddl::drop_column::DropColumn; +use crate::execution::ddl::drop_index::DropIndex; use crate::execution::ddl::drop_table::DropTable; use crate::execution::ddl::drop_view::DropView; use crate::execution::ddl::truncate::Truncate; @@ -194,6 +195,7 @@ pub fn build_write<'a, T: Transaction + 'a>( Operator::CreateView(op) => CreateView::from(op).execute_mut(cache, transaction), Operator::DropTable(op) => DropTable::from(op).execute_mut(cache, transaction), Operator::DropView(op) => DropView::from(op).execute_mut(cache, transaction), + Operator::DropIndex(op) => DropIndex::from(op).execute_mut(cache, transaction), Operator::Truncate(op) => Truncate::from(op).execute_mut(cache, transaction), Operator::CopyFromFile(op) => CopyFromFile::from(op).execute_mut(cache, transaction), Operator::CopyToFile(op) => { diff --git a/src/optimizer/heuristic/graph.rs b/src/optimizer/heuristic/graph.rs index f6de8c61..6695bfdc 100644 --- a/src/optimizer/heuristic/graph.rs +++ b/src/optimizer/heuristic/graph.rs @@ -79,7 +79,7 @@ impl HepGraph { source_id: HepNodeId, children_option: Option, new_node: Operator, - ) { + ) -> HepNodeId { let new_index = self.graph.add_node(new_node); let mut order = self.graph.edges(source_id).count(); @@ -95,6 +95,7 @@ impl HepGraph { self.graph.add_edge(source_id, new_index, order); self.version += 1; + new_index } pub fn replace_node(&mut self, source_id: HepNodeId, new_node: Operator) { diff --git a/src/optimizer/rule/normalization/column_pruning.rs b/src/optimizer/rule/normalization/column_pruning.rs index bd8eeddb..9626c62d 100644 --- a/src/optimizer/rule/normalization/column_pruning.rs +++ b/src/optimizer/rule/normalization/column_pruning.rs @@ -152,6 +152,7 @@ impl ColumnPruning { | Operator::CreateView(_) | Operator::DropTable(_) | Operator::DropView(_) + | Operator::DropIndex(_) | Operator::Truncate(_) | Operator::ShowTable | Operator::ShowView diff --git a/src/optimizer/rule/normalization/compilation_in_advance.rs b/src/optimizer/rule/normalization/compilation_in_advance.rs index e2b2923b..589c1e0e 100644 --- a/src/optimizer/rule/normalization/compilation_in_advance.rs +++ b/src/optimizer/rule/normalization/compilation_in_advance.rs @@ -104,6 +104,7 @@ impl ExpressionRemapper { | Operator::CreateView(_) | Operator::DropTable(_) | Operator::DropView(_) + | Operator::DropIndex(_) | Operator::Truncate(_) | Operator::CopyFromFile(_) | Operator::CopyToFile(_) @@ -212,6 +213,7 @@ impl EvaluatorBind { | Operator::CreateView(_) | Operator::DropTable(_) | Operator::DropView(_) + | Operator::DropIndex(_) | Operator::Truncate(_) | Operator::CopyFromFile(_) | Operator::CopyToFile(_) diff --git a/src/optimizer/rule/normalization/correlated_subquery.rs b/src/optimizer/rule/normalization/correlated_subquery.rs new file mode 100644 index 00000000..32eaca39 --- /dev/null +++ b/src/optimizer/rule/normalization/correlated_subquery.rs @@ -0,0 +1,245 @@ +use crate::catalog::{ColumnRef, TableName}; +use crate::errors::DatabaseError; +use crate::expression::visitor::Visitor; +use crate::expression::HasCountStar; +use crate::optimizer::core::pattern::{Pattern, PatternChildrenPredicate}; +use crate::optimizer::core::rule::{MatchPattern, NormalizationRule}; +use crate::optimizer::heuristic::graph::{HepGraph, HepNodeId}; +use crate::planner::operator::join::{JoinCondition, JoinOperator, JoinType}; +use crate::planner::operator::table_scan::TableScanOperator; +use crate::planner::operator::Operator; +use crate::planner::operator::Operator::{Join, TableScan}; +use crate::types::index::IndexInfo; +use crate::types::ColumnId; +use itertools::Itertools; +use std::collections::BTreeMap; +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, LazyLock}; + +static CORRELATED_SUBQUERY_RULE: LazyLock = LazyLock::new(|| Pattern { + predicate: |op| matches!(op, Join(_)), + children: PatternChildrenPredicate::None, +}); + +#[derive(Clone)] +pub struct CorrelatedSubquery; + +macro_rules! trans_references { + ($columns:expr) => {{ + let mut column_references = HashSet::with_capacity($columns.len()); + for column in $columns { + column_references.insert(column); + } + column_references + }}; +} + +impl CorrelatedSubquery { + fn _apply( + column_references: HashSet<&ColumnRef>, + scan_columns: HashMap, HashMap, Vec)>, + node_id: HepNodeId, + graph: &mut HepGraph, + ) -> Result< + HashMap, HashMap, Vec)>, + DatabaseError, + > { + let operator = &graph.operator(node_id).clone(); + + match operator { + Operator::Aggregate(op) => { + let is_distinct = op.is_distinct; + let referenced_columns = operator.referenced_columns(false); + let mut new_column_references = trans_references!(&referenced_columns); + // on distinct + if is_distinct { + for summary in column_references { + new_column_references.insert(summary); + } + } + + Self::recollect_apply(new_column_references, scan_columns, node_id, graph) + } + Operator::Project(op) => { + let mut has_count_star = HasCountStar::default(); + for expr in &op.exprs { + has_count_star.visit(expr)?; + } + let referenced_columns = operator.referenced_columns(false); + let new_column_references = trans_references!(&referenced_columns); + + Self::recollect_apply(new_column_references, scan_columns, node_id, graph) + } + Operator::TableScan(op) => { + let table_column: HashSet<&ColumnRef> = op.columns.values().collect(); + let mut new_scan_columns = scan_columns.clone(); + new_scan_columns.insert( + op.table_name.clone(), + ( + op.primary_keys.clone(), + op.columns + .iter() + .map(|(num, col)| (col.id().unwrap(), *num)) + .collect(), + op.index_infos.clone(), + ), + ); + let mut parent_col = HashMap::new(); + for col in column_references { + match ( + table_column.contains(col), + scan_columns.get(col.table_name().unwrap_or(&Arc::new("".to_string()))), + ) { + (false, Some(..)) => { + parent_col + .entry(col.table_name().unwrap()) + .or_insert(HashSet::new()) + .insert(col); + } + _ => continue, + } + } + for (table_name, table_columns) in parent_col { + let table_columns = table_columns.into_iter().collect_vec(); + let (primary_keys, columns, index_infos) = + scan_columns.get(table_name).unwrap(); + let map: BTreeMap = table_columns + .into_iter() + .map(|col| (*columns.get(&col.id().unwrap()).unwrap(), col.clone())) + .collect(); + let left_operator = graph.operator(node_id).clone(); + let right_operator = TableScan(TableScanOperator { + table_name: table_name.clone(), + primary_keys: primary_keys.clone(), + columns: map, + limit: (None, None), + index_infos: index_infos.clone(), + with_pk: false, + }); + let join_operator = Join(JoinOperator { + on: JoinCondition::None, + join_type: JoinType::Cross, + }); + + match &left_operator { + TableScan(_) => { + graph.replace_node(node_id, join_operator); + graph.add_node(node_id, None, left_operator); + graph.add_node(node_id, None, right_operator); + } + Join(_) => { + let left_id = graph.eldest_child_at(node_id).unwrap(); + let left_id = graph.add_node(node_id, Some(left_id), join_operator); + graph.add_node(left_id, None, right_operator); + } + _ => unreachable!(), + } + } + Ok(new_scan_columns) + } + Operator::Sort(_) | Operator::Limit(_) | Operator::Filter(_) | Operator::Union(_) => { + let mut new_scan_columns = scan_columns.clone(); + let temp_columns = operator.referenced_columns(false); + // why? + let mut column_references = column_references; + for column in temp_columns.iter() { + column_references.insert(column); + } + for child_id in graph.children_at(node_id).collect_vec() { + let copy_references = column_references.clone(); + let copy_scan = scan_columns.clone(); + if let Ok(scan) = Self::_apply(copy_references, copy_scan, child_id, graph) { + new_scan_columns.extend(scan); + }; + } + Ok(new_scan_columns) + } + Operator::Join(_) => { + let mut new_scan_columns = scan_columns.clone(); + for child_id in graph.children_at(node_id).collect_vec() { + let copy_references = column_references.clone(); + let copy_scan = new_scan_columns.clone(); + if let Ok(scan) = Self::_apply(copy_references, copy_scan, child_id, graph) { + new_scan_columns.extend(scan); + }; + } + Ok(new_scan_columns) + } + // Last Operator + Operator::Dummy | Operator::Values(_) | Operator::FunctionScan(_) => Ok(scan_columns), + Operator::Explain => { + if let Some(child_id) = graph.eldest_child_at(node_id) { + Self::_apply(column_references, scan_columns, child_id, graph) + } else { + unreachable!() + } + } + // DDL Based on Other Plan + Operator::Insert(_) + | Operator::Update(_) + | Operator::Delete(_) + | Operator::Analyze(_) => { + let referenced_columns = operator.referenced_columns(false); + let new_column_references = trans_references!(&referenced_columns); + + if let Some(child_id) = graph.eldest_child_at(node_id) { + Self::recollect_apply(new_column_references, scan_columns, child_id, graph) + } else { + unreachable!(); + } + } + // DDL Single Plan + Operator::CreateTable(_) + | Operator::CreateIndex(_) + | Operator::CreateView(_) + | Operator::DropTable(_) + | Operator::DropView(_) + | Operator::DropIndex(_) + | Operator::Truncate(_) + | Operator::ShowTable + | Operator::ShowView + | Operator::CopyFromFile(_) + | Operator::CopyToFile(_) + | Operator::AddColumn(_) + | Operator::DropColumn(_) + | Operator::Describe(_) => Ok(scan_columns), + } + } + + fn recollect_apply( + referenced_columns: HashSet<&ColumnRef>, + scan_columns: HashMap, HashMap, Vec)>, + node_id: HepNodeId, + graph: &mut HepGraph, + ) -> Result< + HashMap, HashMap, Vec)>, + DatabaseError, + > { + let mut new_scan_columns = scan_columns.clone(); + for child_id in graph.children_at(node_id).collect_vec() { + let copy_references = referenced_columns.clone(); + let copy_scan = scan_columns.clone(); + + if let Ok(scan) = Self::_apply(copy_references, copy_scan, child_id, graph) { + new_scan_columns.extend(scan); + }; + } + Ok(new_scan_columns) + } +} + +impl MatchPattern for CorrelatedSubquery { + fn pattern(&self) -> &Pattern { + &CORRELATED_SUBQUERY_RULE + } +} + +impl NormalizationRule for CorrelatedSubquery { + fn apply(&self, node_id: HepNodeId, graph: &mut HepGraph) -> Result<(), DatabaseError> { + Self::_apply(HashSet::new(), HashMap::new(), node_id, graph)?; + // mark changed to skip this rule batch + graph.version += 1; + + Ok(()) + } +} diff --git a/src/optimizer/rule/normalization/mod.rs b/src/optimizer/rule/normalization/mod.rs index 8c30fd4a..b7fc6d4b 100644 --- a/src/optimizer/rule/normalization/mod.rs +++ b/src/optimizer/rule/normalization/mod.rs @@ -10,6 +10,7 @@ use crate::optimizer::rule::normalization::combine_operators::{ use crate::optimizer::rule::normalization::compilation_in_advance::{ EvaluatorBind, ExpressionRemapper, }; +use crate::optimizer::rule::normalization::correlated_subquery::CorrelatedSubquery; use crate::optimizer::rule::normalization::pushdown_limit::{ LimitProjectTranspose, PushLimitIntoScan, PushLimitThroughJoin, }; @@ -21,6 +22,7 @@ use crate::optimizer::rule::normalization::simplification::SimplifyFilter; mod column_pruning; mod combine_operators; mod compilation_in_advance; +mod correlated_subquery; mod pushdown_limit; mod pushdown_predicates; mod simplification; @@ -32,6 +34,7 @@ pub enum NormalizationRuleImpl { CollapseProject, CollapseGroupByAgg, CombineFilter, + CorrelateSubquery, // PushDown limit LimitProjectTranspose, PushLimitThroughJoin, @@ -55,6 +58,7 @@ impl MatchPattern for NormalizationRuleImpl { NormalizationRuleImpl::CollapseProject => CollapseProject.pattern(), NormalizationRuleImpl::CollapseGroupByAgg => CollapseGroupByAgg.pattern(), NormalizationRuleImpl::CombineFilter => CombineFilter.pattern(), + NormalizationRuleImpl::CorrelateSubquery => CorrelatedSubquery.pattern(), NormalizationRuleImpl::LimitProjectTranspose => LimitProjectTranspose.pattern(), NormalizationRuleImpl::PushLimitThroughJoin => PushLimitThroughJoin.pattern(), NormalizationRuleImpl::PushLimitIntoTableScan => PushLimitIntoScan.pattern(), @@ -75,6 +79,7 @@ impl NormalizationRule for NormalizationRuleImpl { NormalizationRuleImpl::CollapseProject => CollapseProject.apply(node_id, graph), NormalizationRuleImpl::CollapseGroupByAgg => CollapseGroupByAgg.apply(node_id, graph), NormalizationRuleImpl::CombineFilter => CombineFilter.apply(node_id, graph), + NormalizationRuleImpl::CorrelateSubquery => CorrelatedSubquery.apply(node_id, graph), NormalizationRuleImpl::LimitProjectTranspose => { LimitProjectTranspose.apply(node_id, graph) } diff --git a/src/optimizer/rule/normalization/simplification.rs b/src/optimizer/rule/normalization/simplification.rs index a220c065..02d66ef5 100644 --- a/src/optimizer/rule/normalization/simplification.rs +++ b/src/optimizer/rule/normalization/simplification.rs @@ -298,7 +298,7 @@ mod test { op: BinaryOperator::Plus, left_expr: Box::new(ScalarExpression::ColumnRef(ColumnRef::from( c1_col - ))), + ),)), right_expr: Box::new(ScalarExpression::Constant(DataValue::Int32(1))), evaluator: None, ty: LogicalType::Integer, @@ -306,7 +306,7 @@ mod test { evaluator: None, ty: LogicalType::Integer, }), - right_expr: Box::new(ScalarExpression::ColumnRef(ColumnRef::from(c2_col))), + right_expr: Box::new(ScalarExpression::ColumnRef(ColumnRef::from(c2_col),)), evaluator: None, ty: LogicalType::Boolean, } diff --git a/src/planner/mod.rs b/src/planner/mod.rs index 1220af74..c6f20e90 100644 --- a/src/planner/mod.rs +++ b/src/planner/mod.rs @@ -221,6 +221,9 @@ impl LogicalPlan { Operator::DropView(_) => SchemaOutput::Schema(vec![ColumnRef::from( ColumnCatalog::new_dummy("DROP VIEW SUCCESS".to_string()), )]), + Operator::DropIndex(_) => SchemaOutput::Schema(vec![ColumnRef::from( + ColumnCatalog::new_dummy("DROP INDEX SUCCESS".to_string()), + )]), Operator::Truncate(_) => SchemaOutput::Schema(vec![ColumnRef::from( ColumnCatalog::new_dummy("TRUNCATE TABLE SUCCESS".to_string()), )]), diff --git a/src/planner/operator/drop_index.rs b/src/planner/operator/drop_index.rs new file mode 100644 index 00000000..695cd18f --- /dev/null +++ b/src/planner/operator/drop_index.rs @@ -0,0 +1,43 @@ +use crate::catalog::TableName; +use crate::planner::operator::Operator; +use crate::planner::{Childrens, LogicalPlan}; +use kite_sql_serde_macros::ReferenceSerialization; +use std::fmt; +use std::fmt::Formatter; + +#[derive(Debug, PartialEq, Eq, Clone, Hash, ReferenceSerialization)] +pub struct DropIndexOperator { + pub table_name: TableName, + pub index_name: String, + pub if_exists: bool, +} + +impl DropIndexOperator { + pub fn build( + table_name: TableName, + index_name: String, + if_exists: bool, + childrens: Childrens, + ) -> LogicalPlan { + LogicalPlan::new( + Operator::DropIndex(DropIndexOperator { + table_name, + index_name, + if_exists, + }), + childrens, + ) + } +} + +impl fmt::Display for DropIndexOperator { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Drop Index {} On {}, If Exists: {}", + self.index_name, self.table_name, self.if_exists + )?; + + Ok(()) + } +} diff --git a/src/planner/operator/mod.rs b/src/planner/operator/mod.rs index 620c7b31..ed6d5bd1 100644 --- a/src/planner/operator/mod.rs +++ b/src/planner/operator/mod.rs @@ -8,6 +8,7 @@ pub mod create_table; pub mod create_view; pub mod delete; pub mod describe; +pub mod drop_index; pub mod drop_table; pub mod drop_view; pub mod filter; @@ -39,6 +40,7 @@ use crate::planner::operator::create_table::CreateTableOperator; use crate::planner::operator::create_view::CreateViewOperator; use crate::planner::operator::delete::DeleteOperator; use crate::planner::operator::describe::DescribeOperator; +use crate::planner::operator::drop_index::DropIndexOperator; use crate::planner::operator::drop_table::DropTableOperator; use crate::planner::operator::drop_view::DropViewOperator; use crate::planner::operator::function_scan::FunctionScanOperator; @@ -85,6 +87,7 @@ pub enum Operator { CreateView(CreateViewOperator), DropTable(DropTableOperator), DropView(DropViewOperator), + DropIndex(DropIndexOperator), Truncate(TruncateOperator), // Copy CopyFromFile(CopyFromFileOperator), @@ -148,7 +151,7 @@ impl Operator { schema_ref .iter() .cloned() - .map(ScalarExpression::ColumnRef) + .map(|col| ScalarExpression::ColumnRef(col)) .collect_vec(), ), Operator::FunctionScan(op) => Some( @@ -174,6 +177,7 @@ impl Operator { | Operator::CreateView(_) | Operator::DropTable(_) | Operator::DropView(_) + | Operator::DropIndex(_) | Operator::Truncate(_) | Operator::CopyFromFile(_) | Operator::CopyToFile(_) => None, @@ -248,6 +252,7 @@ impl Operator { | Operator::CreateView(_) | Operator::DropTable(_) | Operator::DropView(_) + | Operator::DropIndex(_) | Operator::Truncate(_) | Operator::CopyFromFile(_) | Operator::CopyToFile(_) => vec![], @@ -283,6 +288,7 @@ impl fmt::Display for Operator { Operator::CreateView(op) => write!(f, "{}", op), Operator::DropTable(op) => write!(f, "{}", op), Operator::DropView(op) => write!(f, "{}", op), + Operator::DropIndex(op) => write!(f, "{}", op), Operator::Truncate(op) => write!(f, "{}", op), Operator::CopyFromFile(op) => write!(f, "{}", op), Operator::CopyToFile(op) => write!(f, "{}", op), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 67aad752..f40f43c5 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,6 +4,7 @@ pub(crate) mod table_codec; use crate::catalog::view::View; use crate::catalog::{ColumnCatalog, ColumnRef, TableCatalog, TableMeta, TableName}; use crate::errors::DatabaseError; +use crate::execution::dml::analyze::Analyze; use crate::expression::range_detacher::Range; use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta}; use crate::serdes::ReferenceTables; @@ -16,10 +17,10 @@ use crate::utils::lru::SharedLruCache; use itertools::Itertools; use std::collections::{BTreeMap, Bound}; use std::io::Cursor; -use std::mem; use std::ops::SubAssign; use std::sync::Arc; use std::vec::IntoIter; +use std::{fs, mem}; use ulid::Generator; pub(crate) type StatisticsMetaCache = SharedLruCache<(TableName, IndexId), StatisticsMeta>; @@ -302,7 +303,7 @@ pub trait Transaction: Sized { self.remove(&index_meta_key)?; let (index_min, index_max) = - unsafe { &*self.table_codec() }.index_bound(table_name, &index_meta.id)?; + unsafe { &*self.table_codec() }.index_bound(table_name, index_meta.id)?; self._drop_data(index_min, index_max)?; self.remove_table_meta(meta_cache, table_name, index_meta.id)?; @@ -410,13 +411,55 @@ pub trait Transaction: Sized { Ok(()) } + fn drop_index( + &mut self, + table_cache: &TableCache, + table_name: TableName, + index_name: &str, + if_exists: bool, + ) -> Result<(), DatabaseError> { + let table = self + .table(table_cache, table_name.clone())? + .ok_or(DatabaseError::TableNotFound)?; + let Some(index_meta) = table.indexes.iter().find(|index| index.name == index_name) else { + if if_exists { + return Ok(()); + } else { + return Err(DatabaseError::TableNotFound); + } + }; + match index_meta.ty { + IndexType::PrimaryKey { .. } | IndexType::Unique => { + return Err(DatabaseError::InvalidIndex) + } + IndexType::Normal | IndexType::Composite => (), + } + + let index_id = index_meta.id; + let index_meta_key = + unsafe { &*self.table_codec() }.encode_index_meta_key(table_name.as_str(), index_id)?; + self.remove(&index_meta_key)?; + + let (index_min, index_max) = + unsafe { &*self.table_codec() }.index_bound(table_name.as_str(), index_id)?; + self._drop_data(index_min, index_max)?; + + let statistics_min_key = unsafe { &*self.table_codec() } + .encode_statistics_path_key(table_name.as_str(), index_id); + self.remove(&statistics_min_key)?; + + table_cache.remove(&table_name); + // When dropping Index, the statistics file corresponding to the Index is not cleaned up and is processed uniformly by the Analyze Table. + + Ok(()) + } + fn drop_table( &mut self, table_cache: &TableCache, table_name: TableName, if_exists: bool, ) -> Result<(), DatabaseError> { - self.drop_name_hash(&table_name)?; if self.table(table_cache, table_name.clone())?.is_none() { if if_exists { return Ok(()); @@ -424,6 +467,7 @@ pub trait Transaction: Sized { return Err(DatabaseError::TableNotFound); } } + self.drop_name_hash(&table_name)?; self.drop_data(table_name.as_str())?; let (column_min, column_max) = @@ -437,6 +481,8 @@ pub trait Transaction: Sized { self.remove(&unsafe { &*self.table_codec() }.encode_root_table_key(table_name.as_str()))?; table_cache.remove(&table_name); + let _ = fs::remove_dir(Analyze::build_statistics_meta_path(&table_name)); + Ok(()) } @@ -1143,7 +1189,7 @@ impl Iter for IndexIter<'_, T> { unsafe { &*self.params.table_codec() }.tuple_bound(table_name) } else { unsafe { &*self.params.table_codec() } - .index_bound(table_name, &index_meta.id)? + .index_bound(table_name, index_meta.id)? }; let mut encode_min = bound_encode(min, false)?; check_bound(&mut encode_min, bound_min); @@ -1548,6 +1594,32 @@ mod test { dbg!(value); assert!(iter.try_next()?.is_none()); } + match transaction.drop_index(&table_cache, Arc::new("t1".to_string()), "pk_index", false) { + Err(DatabaseError::InvalidIndex) => (), + _ => unreachable!(), + } + transaction.drop_index(&table_cache, Arc::new("t1".to_string()), "i1", false)?; + { + let table = transaction + .table(&table_cache, Arc::new("t1".to_string()))? + .unwrap(); + let i2_meta = table.indexes[1].clone(); + assert_eq!(i2_meta.id, 2); + assert_eq!(i2_meta.column_ids, vec![c3_column_id, c2_column_id]); + assert_eq!(i2_meta.table_name, Arc::new("t1".to_string())); + assert_eq!(i2_meta.pk_ty, LogicalType::Integer); + assert_eq!(i2_meta.name, "i2".to_string()); + assert_eq!(i2_meta.ty, IndexType::Composite); + + let (min, max) = table_codec.index_meta_bound("t1"); + let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; + + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + let (_, value) = iter.try_next()?.unwrap(); + dbg!(value); + assert!(iter.try_next()?.is_none()); + } Ok(()) } @@ -1638,7 +1710,7 @@ mod test { assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[2]); assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[1]); - let (min, max) = table_codec.index_bound("t1", &1)?; + let (min, max) = table_codec.index_bound("t1", 1)?; let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; let (_, value) = iter.try_next()?.unwrap(); @@ -1656,7 +1728,7 @@ mod test { assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[2]); assert_eq!(index_iter.next_tuple()?.unwrap(), tuples[1]); - let (min, max) = table_codec.index_bound("t1", &1)?; + let (min, max) = table_codec.index_bound("t1", 1)?; let mut iter = transaction.range(Bound::Included(min), Bound::Included(max))?; let (_, value) = iter.try_next()?.unwrap(); diff --git a/src/storage/table_codec.rs b/src/storage/table_codec.rs index 8df9e511..3d99c0f0 100644 --- a/src/storage/table_codec.rs +++ b/src/storage/table_codec.rs @@ -171,13 +171,13 @@ impl TableCodec { pub fn index_bound( &self, table_name: &str, - index_id: &IndexId, + index_id: IndexId, ) -> Result<(BumpBytes, BumpBytes), DatabaseError> { let op = |bound_id| -> Result { let mut key_prefix = self.key_prefix(CodecType::Index, table_name); key_prefix.write_all(&[BOUND_MIN_TAG])?; - key_prefix.write_all(&index_id.to_be_bytes()[..])?; + key_prefix.write_all(&index_id.to_le_bytes()[..])?; key_prefix.write_all(&[bound_id])?; Ok(key_prefix) }; @@ -293,6 +293,18 @@ impl TableCodec { Tuple::deserialize_from(table_types, pk_indices, projections, schema, bytes, with_pk) } + pub fn encode_index_meta_key( + &self, + table_name: &str, + index_id: IndexId, + ) -> Result { + let mut key_prefix = self.key_prefix(CodecType::IndexMeta, table_name); + + key_prefix.write_all(&[BOUND_MIN_TAG])?; + key_prefix.write_all(&index_id.to_le_bytes()[..])?; + Ok(key_prefix) + } + /// Key: {TableName}{INDEX_META_TAG}{BOUND_MIN_TAG}{IndexID} /// Value: IndexMeta pub fn encode_index_meta( @@ -300,15 +312,12 @@ impl TableCodec { table_name: &str, index_meta: &IndexMeta, ) -> Result<(BumpBytes, BumpBytes), DatabaseError> { - let mut key_prefix = self.key_prefix(CodecType::IndexMeta, table_name); - - key_prefix.write_all(&[BOUND_MIN_TAG])?; - key_prefix.write_all(&index_meta.id.to_be_bytes()[..])?; + let key_bytes = self.encode_index_meta_key(table_name, index_meta.id)?; let mut value_bytes = BumpBytes::new_in(&self.arena); index_meta.encode(&mut value_bytes, true, &mut ReferenceTables::new())?; - Ok((key_prefix, value_bytes)) + Ok((key_bytes, value_bytes)) } pub fn decode_index_meta(bytes: &[u8]) -> Result { @@ -347,7 +356,7 @@ impl TableCodec { ) -> Result { let mut key_prefix = self.key_prefix(CodecType::Index, name); key_prefix.push(BOUND_MIN_TAG); - key_prefix.extend_from_slice(&index.id.to_be_bytes()); + key_prefix.extend_from_slice(&index.id.to_le_bytes()); key_prefix.push(BOUND_MIN_TAG); index.value.memcomparable_encode(&mut key_prefix)?; @@ -900,7 +909,7 @@ mod tests { println!("{:#?}", set); - let (min, max) = table_codec.index_bound(&table_catalog.name, &1).unwrap(); + let (min, max) = table_codec.index_bound(&table_catalog.name, 1).unwrap(); println!("{:?}", min); println!("{:?}", max); diff --git a/tests/slt/correlated_subquery.slt b/tests/slt/correlated_subquery.slt new file mode 100644 index 00000000..1a6c0d62 --- /dev/null +++ b/tests/slt/correlated_subquery.slt @@ -0,0 +1,112 @@ +statement ok +CREATE TABLE t1 (id INT PRIMARY KEY, v1 VARCHAR(50), v2 INT); + +statement ok +CREATE TABLE t2 (id INT PRIMARY KEY, v1 VARCHAR(50), v2 INT); + +statement ok +CREATE TABLE t3 (id INT PRIMARY KEY, v1 INT, v2 INT); + +statement ok +insert into t1(id, v1, v2) values (1,'a',9) + +statement ok +insert into t1(id, v1, v2) values (2,'b',6) + +statement ok +insert into t1(id, v1, v2) values (3,'c',11) + +statement ok +insert into t2(id, v1, v2) values (1,'A',10) + +statement ok +insert into t2(id, v1, v2) values (2,'B',11) + +statement ok +insert into t2(id, v1, v2) values (3,'C',9) + +statement ok +insert into t3(id, v1, v2) values (1,6,10) + +statement ok +insert into t3(id, v1, v2) values (2,5,10) + +statement ok +insert into t3(id, v1, v2) values (3,4,10) + +query IT rowsort +SELECT id, v1 FROM t1 WHERE id IN ( SELECT t2.id FROM t2 WHERE t2.v2 < t1.v2 ) +---- +1 a +3 c + +query I rowsort +SELECT v1 FROM t1 WHERE EXISTS ( SELECT 1 FROM t3 WHERE t3.id = t1.id ) +---- +a +b +c + +query TT rowsort +SELECT t1.v1, t2.v1 FROM t1 JOIN t2 ON t1.id = t2.id WHERE t2.v2 > ( SELECT AVG(v2) FROM t1 ) +---- +a A +b B +c C + +query IT rowsort +SELECT id, v1 FROM t1 WHERE NOT EXISTS ( SELECT 1 FROM t2 WHERE t2.id = t1.id AND t2.v2 = t1.v2 ) +---- +1 a +2 b +3 c + +query IT rowsort +SELECT id, v1 FROM t1 WHERE v2 > ( SELECT MIN(v2) FROM t2 WHERE t2.id = t1.id ) +---- +3 c + +query IT rowsort +SELECT id, v1 FROM t1 WHERE EXISTS ( SELECT 1 FROM t2 WHERE t2.id = t1.id AND EXISTS ( SELECT 1 FROM t3 WHERE t3.id = t1.id ) ) +---- +1 a +2 b +3 c + +query IT rowsort +SELECT id, v1 FROM t1 WHERE v2 - 5 > ( SELECT AVG(v1) FROM t3 WHERE t3.id <= t1.id ) +---- +3 c + +query IT rowsort +SELECT id, v1 FROM t1 WHERE id NOT IN ( SELECT t2.id FROM t2 WHERE t2.v2 > t1.v2 ) +---- + + +query IT rowsort +SELECT id, v1 FROM t1 WHERE EXISTS ( SELECT 1 FROM t2 WHERE t2.id = t1.id AND t2.v2 + t1.v2 > 15 ) +---- +1 a +2 b +3 c + +query IT rowsort +SELECT id, v1 FROM t1 WHERE v2 = ( SELECT MAX(v2) FROM t2 WHERE t2.id <= t1.id ) ORDER BY id +---- +3 c + +query IT rowsort +SELECT id, v1 FROM t1 WHERE ( SELECT COUNT(*) FROM t2 WHERE t2.v2 = t1.v2 ) = 2 +---- +1 a +2 b +3 c + +statement ok +DROP TABLE t1; + +statement ok +DROP TABLE t2; + +statement ok +DROP TABLE t3; \ No newline at end of file diff --git a/tests/slt/create_index.slt b/tests/slt/create_index.slt index 546fe9a2..666032ff 100644 --- a/tests/slt/create_index.slt +++ b/tests/slt/create_index.slt @@ -4,6 +4,9 @@ create table t(id int primary key, v1 int, v2 int, v3 int); statement ok create index index_1 on t (v1); +statement ok +create index if not exists index_1 on t (v1); + statement error create index index_1 on t (v1); @@ -24,5 +27,17 @@ select * from t; ---- 0 0 0 0 +statement ok +drop index t.index_1 + +statement ok +drop index t.index_2 + +statement error +drop index t.pk_index + +statement error +drop index t.index_3 + statement ok drop table t \ No newline at end of file