Skip to content

Commit 75e9a56

Browse files
committed
PhysicalOptimizerRule now takes SessionConfig instead of ConfigOptions as the second argument to "optimizer()"
1 parent 35b2e35 commit 75e9a56

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+326
-208
lines changed

datafusion/core/benches/push_down_filter.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use arrow::array::RecordBatch;
1919
use arrow::datatypes::{DataType, Field, Schema};
2020
use bytes::{BufMut, BytesMut};
2121
use criterion::{criterion_group, criterion_main, Criterion};
22-
use datafusion::config::ConfigOptions;
2322
use datafusion::prelude::{ParquetReadOptions, SessionContext};
23+
use datafusion_execution::config::SessionConfig;
2424
use datafusion_execution::object_store::ObjectStoreUrl;
2525
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
2626
use datafusion_physical_optimizer::PhysicalOptimizerRule;
@@ -88,7 +88,7 @@ async fn create_plan() -> Arc<dyn ExecutionPlan> {
8888
#[derive(Clone)]
8989
struct BenchmarkPlan {
9090
plan: Arc<dyn ExecutionPlan>,
91-
config: ConfigOptions,
91+
config: SessionConfig,
9292
}
9393

9494
impl std::fmt::Display for BenchmarkPlan {
@@ -102,8 +102,8 @@ fn bench_push_down_filter(c: &mut Criterion) {
102102
let plan = tokio::runtime::Runtime::new()
103103
.unwrap()
104104
.block_on(create_plan());
105-
let mut config = ConfigOptions::default();
106-
config.execution.parquet.pushdown_filters = true;
105+
let mut config = SessionConfig::default();
106+
config.options_mut().execution.parquet.pushdown_filters = true;
107107
let plan = BenchmarkPlan { plan, config };
108108
let optimizer = FilterPushdown::new();
109109

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2118,7 +2118,7 @@ impl DefaultPhysicalPlanner {
21182118
for optimizer in optimizers {
21192119
let before_schema = new_plan.schema();
21202120
new_plan = optimizer
2121-
.optimize(new_plan, session_state.config_options())
2121+
.optimize(new_plan, session_state.config())
21222122
.map_err(|e| {
21232123
DataFusionError::Context(optimizer.name().to_string(), Box::new(e))
21242124
})?;
@@ -2441,7 +2441,6 @@ mod tests {
24412441
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
24422442
use arrow::datatypes::{DataType, Field, Int32Type};
24432443
use arrow_schema::SchemaRef;
2444-
use datafusion_common::config::ConfigOptions;
24452444
use datafusion_common::{
24462445
assert_contains, DFSchemaRef, TableReference, ToDFSchema as _,
24472446
};
@@ -3545,7 +3544,7 @@ digraph {
35453544
fn optimize(
35463545
&self,
35473546
plan: Arc<dyn ExecutionPlan>,
3548-
_config: &ConfigOptions,
3547+
_config: &SessionConfig,
35493548
) -> Result<Arc<dyn ExecutionPlan>> {
35503549
Ok(plan)
35513550
}

datafusion/core/tests/execution/coop.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -810,8 +810,7 @@ async fn query_yields(
810810
task_ctx: Arc<TaskContext>,
811811
) -> Result<(), Box<dyn Error>> {
812812
// Run plan through EnsureCooperative
813-
let optimized =
814-
EnsureCooperative::new().optimize(plan, task_ctx.session_config().options())?;
813+
let optimized = EnsureCooperative::new().optimize(plan, task_ctx.session_config())?;
815814

816815
// Get the stream
817816
let stream = physical_plan::execute_stream(optimized, task_ctx)?;

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
3838
use datafusion_expr::{col, lit, Expr};
3939

4040
use datafusion::datasource::physical_plan::FileScanConfig;
41-
use datafusion_common::config::ConfigOptions;
4241
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
4342
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4443
use datafusion_physical_plan::filter::FilterExec;
@@ -56,8 +55,13 @@ async fn check_stats_precision_with_filter_pushdown() {
5655
let table = get_listing_table(&table_path, None, &opt).await;
5756

5857
let (_, _, state) = get_cache_runtime_state();
59-
let mut options: ConfigOptions = state.config().options().as_ref().clone();
60-
options.execution.parquet.pushdown_filters = true;
58+
let mut session_config =
59+
SessionConfig::from(state.config().options().as_ref().clone());
60+
session_config
61+
.options_mut()
62+
.execution
63+
.parquet
64+
.pushdown_filters = true;
6165

6266
// Scan without filter, stats are exact
6367
let exec = table.scan(&state, None, &[], None).await.unwrap();
@@ -85,7 +89,7 @@ async fn check_stats_precision_with_filter_pushdown() {
8589
as Arc<dyn ExecutionPlan>;
8690

8791
let optimized_exec = FilterPushdown::new()
88-
.optimize(filtered_exec, &options)
92+
.optimize(filtered_exec, &session_config)
8993
.unwrap();
9094

9195
assert!(

datafusion/core/tests/physical_optimizer/aggregate_statistics.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use datafusion::datasource::source::DataSourceExec;
2727
use datafusion_common::cast::as_int64_array;
2828
use datafusion_common::config::ConfigOptions;
2929
use datafusion_common::Result;
30+
use datafusion_execution::config::SessionConfig;
3031
use datafusion_execution::TaskContext;
3132
use datafusion_expr::Operator;
3233
use datafusion_physical_expr::expressions::{self, cast};
@@ -67,7 +68,7 @@ async fn assert_count_optim_success(
6768
let task_ctx = Arc::new(TaskContext::default());
6869
let plan: Arc<dyn ExecutionPlan> = Arc::new(plan);
6970

70-
let config = ConfigOptions::new();
71+
let config = SessionConfig::new();
7172
let optimized = AggregateStatistics::new().optimize(Arc::clone(&plan), &config)?;
7273

7374
// A ProjectionExec is a sign that the count optimization was applied
@@ -264,7 +265,7 @@ async fn test_count_inexact_stat() -> Result<()> {
264265
Arc::clone(&schema),
265266
)?;
266267

267-
let conf = ConfigOptions::new();
268+
let conf = SessionConfig::new();
268269
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
269270

270271
// check that the original ExecutionPlan was not replaced
@@ -308,7 +309,7 @@ async fn test_count_with_nulls_inexact_stat() -> Result<()> {
308309
Arc::clone(&schema),
309310
)?;
310311

311-
let conf = ConfigOptions::new();
312+
let conf = SessionConfig::new();
312313
let optimized = AggregateStatistics::new().optimize(Arc::new(final_agg), &conf)?;
313314

314315
// check that the original ExecutionPlan was not replaced

datafusion/core/tests/physical_optimizer/combine_partial_final_agg.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::physical_optimizer::test_utils::parquet_exec;
2727

2828
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
2929
use datafusion_common::config::ConfigOptions;
30+
use datafusion_execution::config::SessionConfig;
3031
use datafusion_functions_aggregate::count::count_udaf;
3132
use datafusion_functions_aggregate::sum::sum_udaf;
3233
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
@@ -47,7 +48,7 @@ macro_rules! assert_optimized {
4748
($PLAN: expr, @ $EXPECTED_LINES: literal $(,)?) => {
4849
// run optimizer
4950
let optimizer = CombinePartialFinalAggregate {};
50-
let config = ConfigOptions::new();
51+
let config = SessionConfig::new();
5152
let optimized = optimizer.optimize($PLAN, &config)?;
5253
// Now format correctly
5354
let plan = displayable(optimized.as_ref()).indent(true).to_string();

datafusion/core/tests/physical_optimizer/enforce_distribution.rs

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -404,41 +404,56 @@ const SORT_DISTRIB_DISTRIB: [Run; 3] =
404404

405405
#[derive(Clone)]
406406
struct TestConfig {
407-
config: ConfigOptions,
407+
session_config: SessionConfig,
408408
}
409409

410410
impl Default for TestConfig {
411411
fn default() -> Self {
412-
Self {
413-
config: test_suite_default_config_options(),
414-
}
412+
let config = test_suite_default_config_options();
413+
let session_config = SessionConfig::from(config.clone());
414+
Self { session_config }
415415
}
416416
}
417417

418418
impl TestConfig {
419419
/// If preferred, will not repartition / resort data if it is already sorted.
420420
fn with_prefer_existing_sort(mut self) -> Self {
421-
self.config.optimizer.prefer_existing_sort = true;
421+
self.session_config
422+
.options_mut()
423+
.optimizer
424+
.prefer_existing_sort = true;
422425
self
423426
}
424427

425428
/// If preferred, will not attempt to convert Union to Interleave.
426429
fn with_prefer_existing_union(mut self) -> Self {
427-
self.config.optimizer.prefer_existing_union = true;
430+
self.session_config
431+
.options_mut()
432+
.optimizer
433+
.prefer_existing_union = true;
428434
self
429435
}
430436

431437
/// If preferred, will repartition file scans.
432438
/// Accepts a minimum file size to repartition.
433439
fn with_prefer_repartition_file_scans(mut self, file_min_size: usize) -> Self {
434-
self.config.optimizer.repartition_file_scans = true;
435-
self.config.optimizer.repartition_file_min_size = file_min_size;
440+
self.session_config
441+
.options_mut()
442+
.optimizer
443+
.repartition_file_scans = true;
444+
self.session_config
445+
.options_mut()
446+
.optimizer
447+
.repartition_file_min_size = file_min_size;
436448
self
437449
}
438450

439451
/// Set the preferred target partitions for query execution concurrency.
440452
fn with_query_execution_partitions(mut self, target_partitions: usize) -> Self {
441-
self.config.execution.target_partitions = target_partitions;
453+
self.session_config
454+
.options_mut()
455+
.execution
456+
.target_partitions = target_partitions;
442457
self
443458
}
444459

@@ -455,13 +470,18 @@ impl TestConfig {
455470

456471
// Add the ancillary output requirements operator at the start:
457472
let optimizer = OutputRequirements::new_add_mode();
458-
let mut optimized = optimizer.optimize(plan.clone(), &self.config)?;
473+
let mut optimized = optimizer.optimize(plan.clone(), &self.session_config)?;
459474

460475
// This file has 2 rules that use tree node, apply these rules to original plan consecutively
461476
// After these operations tree nodes should be in a consistent state.
462477
// This code block makes sure that these rules doesn't violate tree node integrity.
463478
{
464-
let adjusted = if self.config.optimizer.top_down_join_key_reordering {
479+
let adjusted = if self
480+
.session_config
481+
.options()
482+
.optimizer
483+
.top_down_join_key_reordering
484+
{
465485
// Run adjust_input_keys_ordering rule
466486
let plan_requirements =
467487
PlanWithKeyRequirements::new_default(plan.clone());
@@ -483,7 +503,10 @@ impl TestConfig {
483503
// Then run ensure_distribution rule
484504
DistributionContext::new_default(adjusted)
485505
.transform_up(|distribution_context| {
486-
ensure_distribution(distribution_context, &self.config)
506+
ensure_distribution(
507+
distribution_context,
508+
&self.session_config.options(),
509+
)
487510
})
488511
.data()
489512
.and_then(check_integrity)?;
@@ -494,18 +517,18 @@ impl TestConfig {
494517
optimized = match run {
495518
Run::Distribution => {
496519
let optimizer = EnforceDistribution::new();
497-
optimizer.optimize(optimized, &self.config)?
520+
optimizer.optimize(optimized, &self.session_config)?
498521
}
499522
Run::Sorting => {
500523
let optimizer = EnforceSorting::new();
501-
optimizer.optimize(optimized, &self.config)?
524+
optimizer.optimize(optimized, &self.session_config)?
502525
}
503526
};
504527
}
505528

506529
// Remove the ancillary output requirements operator when done:
507530
let optimizer = OutputRequirements::new_remove_mode();
508-
let optimized = optimizer.optimize(optimized, &self.config)?;
531+
let optimized = optimizer.optimize(optimized, &self.session_config)?;
509532

510533
// Now format correctly
511534
let actual_lines = get_plan_string(&optimized);
@@ -3340,10 +3363,13 @@ fn do_not_put_sort_when_input_is_invalid() -> Result<()> {
33403363
" DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], file_type=parquet",
33413364
];
33423365

3343-
let mut config = ConfigOptions::new();
3344-
config.execution.target_partitions = 10;
3345-
config.optimizer.enable_round_robin_repartition = true;
3346-
config.optimizer.prefer_existing_sort = false;
3366+
let mut config = SessionConfig::new();
3367+
config.options_mut().execution.target_partitions = 10;
3368+
config
3369+
.options_mut()
3370+
.optimizer
3371+
.enable_round_robin_repartition = true;
3372+
config.options_mut().optimizer.prefer_existing_sort = false;
33473373
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
33483374
assert_plan_txt!(expected, dist_plan);
33493375

@@ -3378,10 +3404,13 @@ fn put_sort_when_input_is_valid() -> Result<()> {
33783404
" DataSourceExec: file_groups={10 groups: [[x:0..20], [y:0..20], [x:20..40], [y:20..40], [x:40..60], [y:40..60], [x:60..80], [y:60..80], [x:80..100], [y:80..100]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet",
33793405
];
33803406

3381-
let mut config = ConfigOptions::new();
3382-
config.execution.target_partitions = 10;
3383-
config.optimizer.enable_round_robin_repartition = true;
3384-
config.optimizer.prefer_existing_sort = false;
3407+
let mut config = SessionConfig::new();
3408+
config.options_mut().execution.target_partitions = 10;
3409+
config
3410+
.options_mut()
3411+
.optimizer
3412+
.enable_round_robin_repartition = true;
3413+
config.options_mut().optimizer.prefer_existing_sort = false;
33853414
let dist_plan = EnforceDistribution::new().optimize(physical_plan, &config)?;
33863415
assert_plan_txt!(expected, dist_plan);
33873416

@@ -3503,7 +3532,11 @@ async fn test_distribute_sort_parquet() -> Result<()> {
35033532
let test_config: TestConfig =
35043533
TestConfig::default().with_prefer_repartition_file_scans(1000);
35053534
assert!(
3506-
test_config.config.optimizer.repartition_file_scans,
3535+
test_config
3536+
.session_config
3537+
.options()
3538+
.optimizer
3539+
.repartition_file_scans,
35073540
"should enable scans to be repartitioned"
35083541
);
35093542

@@ -3542,7 +3575,11 @@ async fn test_distribute_sort_memtable() -> Result<()> {
35423575
let test_config: TestConfig =
35433576
TestConfig::default().with_prefer_repartition_file_scans(1000);
35443577
assert!(
3545-
test_config.config.optimizer.repartition_file_scans,
3578+
test_config
3579+
.session_config
3580+
.options()
3581+
.optimizer
3582+
.repartition_file_scans,
35463583
"should enable scans to be repartitioned"
35473584
);
35483585

datafusion/core/tests/physical_optimizer/enforce_sorting.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use arrow::datatypes::{DataType, SchemaRef};
3434
use datafusion_common::config::ConfigOptions;
3535
use datafusion_common::tree_node::{TreeNode, TransformedResult};
3636
use datafusion_common::{Result, TableReference};
37+
use datafusion_execution::config::SessionConfig;
3738
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
3839
use datafusion_datasource::source::DataSourceExec;
3940
use datafusion_expr_common::operator::Operator;
@@ -110,8 +111,9 @@ impl EnforceSortingTest {
110111
/// Runs the enforce sorting test and returns a string with the input and
111112
/// optimized plan as strings for snapshot comparison using insta
112113
pub(crate) fn run(&self) -> String {
113-
let mut config = ConfigOptions::new();
114-
config.optimizer.repartition_sorts = self.repartition_sorts;
114+
let mut session_config = SessionConfig::new();
115+
session_config.options_mut().optimizer.repartition_sorts = self.repartition_sorts;
116+
let config = session_config.options();
115117

116118
// This file has 4 rules that use tree node, apply these rules as in the
117119
// EnforceSorting::optimize implementation
@@ -171,7 +173,7 @@ impl EnforceSortingTest {
171173

172174
// Run the actual optimizer
173175
let optimized_physical_plan = EnforceSorting::new()
174-
.optimize(Arc::clone(&self.plan), &config)
176+
.optimize(Arc::clone(&self.plan), &session_config)
175177
.expect("enforce_sorting failed");
176178

177179
// Get string representation of the plan
@@ -2277,7 +2279,7 @@ async fn test_commutativity() -> Result<()> {
22772279
DataSourceExec: partitions=1, partition_sizes=[0]
22782280
"#);
22792281

2280-
let config = ConfigOptions::new();
2282+
let config = SessionConfig::new();
22812283
let rules = vec![
22822284
Arc::new(EnforceDistribution::new()) as Arc<dyn PhysicalOptimizerRule>,
22832285
Arc::new(EnforceSorting::new()) as Arc<dyn PhysicalOptimizerRule>,

0 commit comments

Comments
 (0)