Skip to content

Commit e4f16dd

Browse files
Omega359alamb
andauthored
feat: Add Arc<ConfigOptions> to ScalarFunctionArgs, don't copy ConfigOptions on each query (#16970)
* Add `ConfigOptions` to ExecutionProps when execution is started * Add ConfigOptions to ScalarFunctionArgs, refactor AsyncScalarUDF.invoke_async_with_args to remove ConfigOptions arg. * Updated OptimizerConfig.options() -> Arc<ConfigOptions> to eliminate clone() calls. Fixed an issue with SimplifyExpressions.rewrite(..) not adding config options to execution_props. Added test to verify it works * Test update. * Add note in upgrade guide * Use Arc all the way down * start_execution -> mark_start_execution * Update datafusion/expr/src/execution_props.rs Co-authored-by: Andrew Lamb <[email protected]> * Update comments * Avoid API breakage via #deprecated * Update upgrade guide for Arc<ConfigOptions> change * Apply suggestions from code review * fmt --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 6d00734 commit e4f16dd

File tree

91 files changed

+831
-426
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+831
-426
lines changed

datafusion-examples/examples/async_udf.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use datafusion::common::cast::as_string_view_array;
2929
use datafusion::common::error::Result;
3030
use datafusion::common::not_impl_err;
3131
use datafusion::common::utils::take_function_args;
32-
use datafusion::config::ConfigOptions;
3332
use datafusion::execution::SessionStateBuilder;
3433
use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
3534
use datafusion::logical_expr::{
@@ -195,11 +194,7 @@ impl AsyncScalarUDFImpl for AskLLM {
195194
/// is processing the query, so you may wish to make actual network requests
196195
/// on a different `Runtime`, as explained in the `thread_pools.rs` example
197196
/// in this directory.
198-
async fn invoke_async_with_args(
199-
&self,
200-
args: ScalarFunctionArgs,
201-
_option: &ConfigOptions,
202-
) -> Result<ArrayRef> {
197+
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ArrayRef> {
203198
// in a real UDF you would likely want to special case constant
204199
// arguments to improve performance, but this example converts the
205200
// arguments to arrays for simplicity.

datafusion-examples/examples/sql_frontend.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub fn main() -> Result<()> {
8383
let config = OptimizerContext::default().with_skip_failing_rules(false);
8484
let analyzed_plan = Analyzer::new().execute_and_check(
8585
logical_plan,
86-
config.options(),
86+
&config.options(),
8787
observe_analyzer,
8888
)?;
8989
// Note that the Analyzer has added a CAST to the plan to align the types

datafusion/common/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -923,7 +923,7 @@ impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions
923923
}
924924

925925
/// A key value pair, with a corresponding description
926-
#[derive(Debug)]
926+
#[derive(Debug, Hash, PartialEq, Eq)]
927927
pub struct ConfigEntry {
928928
/// A unique string to identify this config value
929929
pub key: String,

datafusion/core/src/execution/context/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1646,7 +1646,7 @@ impl SessionContext {
16461646
/// [`ConfigOptions`]: crate::config::ConfigOptions
16471647
pub fn state(&self) -> SessionState {
16481648
let mut state = self.state.read().clone();
1649-
state.execution_props_mut().start_execution();
1649+
state.mark_start_execution();
16501650
state
16511651
}
16521652

datafusion/core/src/execution/session_state.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ impl SessionState {
574574
// analyze & capture output of each rule
575575
let analyzer_result = self.analyzer.execute_and_check(
576576
e.plan.as_ref().clone(),
577-
self.options(),
577+
&self.options(),
578578
|analyzed_plan, analyzer| {
579579
let analyzer_name = analyzer.name().to_string();
580580
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
@@ -636,7 +636,7 @@ impl SessionState {
636636
} else {
637637
let analyzed_plan = self.analyzer.execute_and_check(
638638
plan.clone(),
639-
self.options(),
639+
&self.options(),
640640
|_, _| {},
641641
)?;
642642
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
@@ -738,10 +738,16 @@ impl SessionState {
738738
}
739739

740740
/// return the configuration options
741-
pub fn config_options(&self) -> &ConfigOptions {
741+
pub fn config_options(&self) -> &Arc<ConfigOptions> {
742742
self.config.options()
743743
}
744744

745+
/// Mark the start of the execution
746+
pub fn mark_start_execution(&mut self) {
747+
let config = Arc::clone(self.config.options());
748+
self.execution_props.mark_start_execution(config);
749+
}
750+
745751
/// Return the table options
746752
pub fn table_options(&self) -> &TableOptions {
747753
&self.table_options
@@ -1891,8 +1897,8 @@ impl OptimizerConfig for SessionState {
18911897
&self.execution_props.alias_generator
18921898
}
18931899

1894-
fn options(&self) -> &ConfigOptions {
1895-
self.config_options()
1900+
fn options(&self) -> Arc<ConfigOptions> {
1901+
Arc::clone(self.config.options())
18961902
}
18971903

18981904
fn function_registry(&self) -> Option<&dyn FunctionRegistry> {

datafusion/core/tests/fuzz_cases/equivalence/ordering.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::fuzz_cases::equivalence::utils::{
2121
is_table_same_after_sort, TestScalarUDF,
2222
};
2323
use arrow::compute::SortOptions;
24+
use datafusion_common::config::ConfigOptions;
2425
use datafusion_common::Result;
2526
use datafusion_expr::{Operator, ScalarUDF};
2627
use datafusion_physical_expr::equivalence::{
@@ -110,6 +111,7 @@ fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
110111
Arc::clone(&test_fun),
111112
vec![col_a],
112113
&test_schema,
114+
Arc::new(ConfigOptions::default()),
113115
)?);
114116
let a_plus_b = Arc::new(BinaryExpr::new(
115117
col("a", &test_schema)?,

datafusion/core/tests/fuzz_cases/equivalence/projection.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::fuzz_cases::equivalence::utils::{
2020
is_table_same_after_sort, TestScalarUDF,
2121
};
2222
use arrow::compute::SortOptions;
23+
use datafusion_common::config::ConfigOptions;
2324
use datafusion_common::Result;
2425
use datafusion_expr::{Operator, ScalarUDF};
2526
use datafusion_physical_expr::equivalence::ProjectionMapping;
@@ -49,6 +50,7 @@ fn project_orderings_random() -> Result<()> {
4950
Arc::clone(&test_fun),
5051
vec![col_a],
5152
&test_schema,
53+
Arc::new(ConfigOptions::default()),
5254
)?);
5355
// a + b
5456
let a_plus_b = Arc::new(BinaryExpr::new(
@@ -122,6 +124,7 @@ fn ordering_satisfy_after_projection_random() -> Result<()> {
122124
Arc::clone(&test_fun),
123125
vec![col_a],
124126
&test_schema,
127+
Arc::new(ConfigOptions::default()),
125128
)?) as PhysicalExprRef;
126129
// a + b
127130
let a_plus_b = Arc::new(BinaryExpr::new(

datafusion/core/tests/fuzz_cases/equivalence/properties.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use datafusion_physical_expr::expressions::{col, BinaryExpr};
2828
use datafusion_physical_expr::{LexOrdering, ScalarFunctionExpr};
2929
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3030

31+
use datafusion_common::config::ConfigOptions;
3132
use itertools::Itertools;
3233

3334
#[test]
@@ -49,6 +50,7 @@ fn test_find_longest_permutation_random() -> Result<()> {
4950
Arc::clone(&test_fun),
5051
vec![col_a],
5152
&test_schema,
53+
Arc::new(ConfigOptions::default()),
5254
)?) as _;
5355

5456
let a_plus_b = Arc::new(BinaryExpr::new(

datafusion/core/tests/optimizer/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
160160
let analyzer = Analyzer::new();
161161
let optimizer = Optimizer::new();
162162
// analyze and optimize the logical plan
163-
let plan = analyzer.execute_and_check(plan, config.options(), |_, _| {})?;
163+
let plan = analyzer.execute_and_check(plan, &config.options(), |_, _| {})?;
164164
optimizer.optimize(plan, &config, |_, _| {})
165165
}
166166

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
3838
use datafusion_expr::{col, lit, Expr};
3939

4040
use datafusion::datasource::physical_plan::FileScanConfig;
41+
use datafusion_common::config::ConfigOptions;
4142
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
4243
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4344
use datafusion_physical_plan::filter::FilterExec;
@@ -55,7 +56,7 @@ async fn check_stats_precision_with_filter_pushdown() {
5556
let table = get_listing_table(&table_path, None, &opt).await;
5657

5758
let (_, _, state) = get_cache_runtime_state();
58-
let mut options = state.config().options().clone();
59+
let mut options: ConfigOptions = state.config().options().as_ref().clone();
5960
options.execution.parquet.pushdown_filters = true;
6061

6162
// Scan without filter, stats are exact

0 commit comments

Comments
 (0)