Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions datafusion-examples/examples/async_udf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use datafusion::common::cast::as_string_view_array;
use datafusion::common::error::Result;
use datafusion::common::not_impl_err;
use datafusion::common::utils::take_function_args;
use datafusion::config::ConfigOptions;
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::async_udf::{AsyncScalarUDF, AsyncScalarUDFImpl};
use datafusion::logical_expr::{
Expand Down Expand Up @@ -195,11 +194,7 @@ impl AsyncScalarUDFImpl for AskLLM {
/// is processing the query, so you may wish to make actual network requests
/// on a different `Runtime`, as explained in the `thread_pools.rs` example
/// in this directory.
async fn invoke_async_with_args(
&self,
args: ScalarFunctionArgs,
_option: &ConfigOptions,
) -> Result<ArrayRef> {
async fn invoke_async_with_args(&self, args: ScalarFunctionArgs) -> Result<ArrayRef> {
// in a real UDF you would likely want to special case constant
// arguments to improve performance, but this example converts the
// arguments to arrays for simplicity.
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/sql_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub fn main() -> Result<()> {
let config = OptimizerContext::default().with_skip_failing_rules(false);
let analyzed_plan = Analyzer::new().execute_and_check(
logical_plan,
config.options(),
&config.options(),
observe_analyzer,
)?;
// Note that the Analyzer has added a CAST to the plan to align the types
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,7 +923,7 @@ impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions
}

/// A key value pair, with a corresponding description
#[derive(Debug)]
#[derive(Debug, Hash, PartialEq, Eq)]
pub struct ConfigEntry {
/// A unique string to identify this config value
pub key: String,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1646,7 +1646,7 @@ impl SessionContext {
/// [`ConfigOptions`]: crate::config::ConfigOptions
pub fn state(&self) -> SessionState {
let mut state = self.state.read().clone();
state.execution_props_mut().start_execution();
state.mark_start_execution();
state
}

Expand Down
16 changes: 11 additions & 5 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ impl SessionState {
// analyze & capture output of each rule
let analyzer_result = self.analyzer.execute_and_check(
e.plan.as_ref().clone(),
self.options(),
&self.options(),
|analyzed_plan, analyzer| {
let analyzer_name = analyzer.name().to_string();
let plan_type = PlanType::AnalyzedLogicalPlan { analyzer_name };
Expand Down Expand Up @@ -636,7 +636,7 @@ impl SessionState {
} else {
let analyzed_plan = self.analyzer.execute_and_check(
plan.clone(),
self.options(),
&self.options(),
|_, _| {},
)?;
self.optimizer.optimize(analyzed_plan, self, |_, _| {})
Expand Down Expand Up @@ -738,10 +738,16 @@ impl SessionState {
}

/// return the configuration options
pub fn config_options(&self) -> &ConfigOptions {
pub fn config_options(&self) -> &Arc<ConfigOptions> {
self.config.options()
}

/// Mark the start of the execution
pub fn mark_start_execution(&mut self) {
let config = Arc::clone(self.config.options());
self.execution_props.mark_start_execution(config);
}

/// Return the table options
pub fn table_options(&self) -> &TableOptions {
&self.table_options
Expand Down Expand Up @@ -1891,8 +1897,8 @@ impl OptimizerConfig for SessionState {
&self.execution_props.alias_generator
}

fn options(&self) -> &ConfigOptions {
self.config_options()
fn options(&self) -> Arc<ConfigOptions> {
Arc::clone(self.config.options())
}

fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/fuzz_cases/equivalence/ordering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::fuzz_cases::equivalence::utils::{
is_table_same_after_sort, TestScalarUDF,
};
use arrow::compute::SortOptions;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_expr::{Operator, ScalarUDF};
use datafusion_physical_expr::equivalence::{
Expand Down Expand Up @@ -110,6 +111,7 @@ fn test_ordering_satisfy_with_equivalence_complex_random() -> Result<()> {
Arc::clone(&test_fun),
vec![col_a],
&test_schema,
Arc::new(ConfigOptions::default()),
)?);
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/tests/fuzz_cases/equivalence/projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::fuzz_cases::equivalence::utils::{
is_table_same_after_sort, TestScalarUDF,
};
use arrow::compute::SortOptions;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Result;
use datafusion_expr::{Operator, ScalarUDF};
use datafusion_physical_expr::equivalence::ProjectionMapping;
Expand Down Expand Up @@ -49,6 +50,7 @@ fn project_orderings_random() -> Result<()> {
Arc::clone(&test_fun),
vec![col_a],
&test_schema,
Arc::new(ConfigOptions::default()),
)?);
// a + b
let a_plus_b = Arc::new(BinaryExpr::new(
Expand Down Expand Up @@ -122,6 +124,7 @@ fn ordering_satisfy_after_projection_random() -> Result<()> {
Arc::clone(&test_fun),
vec![col_a],
&test_schema,
Arc::new(ConfigOptions::default()),
)?) as PhysicalExprRef;
// a + b
let a_plus_b = Arc::new(BinaryExpr::new(
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/fuzz_cases/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use datafusion_physical_expr::expressions::{col, BinaryExpr};
use datafusion_physical_expr::{LexOrdering, ScalarFunctionExpr};
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

use datafusion_common::config::ConfigOptions;
use itertools::Itertools;

#[test]
Expand All @@ -49,6 +50,7 @@ fn test_find_longest_permutation_random() -> Result<()> {
Arc::clone(&test_fun),
vec![col_a],
&test_schema,
Arc::new(ConfigOptions::default()),
)?) as _;

let a_plus_b = Arc::new(BinaryExpr::new(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ fn test_sql(sql: &str) -> Result<LogicalPlan> {
let analyzer = Analyzer::new();
let optimizer = Optimizer::new();
// analyze and optimize the logical plan
let plan = analyzer.execute_and_check(plan, config.options(), |_, _| {})?;
let plan = analyzer.execute_and_check(plan, &config.options(), |_, _| {})?;
optimizer.optimize(plan, &config, |_, _| {})
}

Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use datafusion_expr::{col, lit, Expr};

use datafusion::datasource::physical_plan::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::filter::FilterExec;
Expand All @@ -55,7 +56,7 @@ async fn check_stats_precision_with_filter_pushdown() {
let table = get_listing_table(&table_path, None, &opt).await;

let (_, _, state) = get_cache_runtime_state();
let mut options = state.config().options().clone();
let mut options: ConfigOptions = state.config().options().as_ref().clone();
options.execution.parquet.pushdown_filters = true;

// Scan without filter, stats are exact
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ fn test_pushdown_volatile_functions_not_allowed() {
// Test that we do not push down filters with volatile functions
// Use random() as an example of a volatile function
let scan = TestScanBuilder::new(schema()).with_support(true).build();
let cfg = Arc::new(ConfigOptions::default());
let predicate = Arc::new(BinaryExpr::new(
Arc::new(Column::new_with_schema("a", &schema()).unwrap()),
Operator::Eq,
Expand All @@ -93,6 +94,7 @@ fn test_pushdown_volatile_functions_not_allowed() {
Arc::new(ScalarUDF::from(RandomFunc::new())),
vec![],
&schema(),
cfg,
)
.unwrap(),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ fn test_update_matching_exprs() -> Result<()> {
)),
],
Field::new("f", DataType::Int32, true).into(),
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -193,6 +194,7 @@ fn test_update_matching_exprs() -> Result<()> {
)),
],
Field::new("f", DataType::Int32, true).into(),
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 3))),
Expand Down Expand Up @@ -261,6 +263,7 @@ fn test_update_projected_exprs() -> Result<()> {
)),
],
Field::new("f", DataType::Int32, true).into(),
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d", 2))),
Expand Down Expand Up @@ -326,6 +329,7 @@ fn test_update_projected_exprs() -> Result<()> {
)),
],
Field::new("f", DataType::Int32, true).into(),
Arc::new(ConfigOptions::default()),
)),
Arc::new(CaseExpr::try_new(
Some(Arc::new(Column::new("d_new", 3))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use arrow::array::{
use arrow::compute::kernels::numeric::add;
use arrow::datatypes::{DataType, Field, Schema};
use arrow_schema::extension::{Bool8, CanonicalExtensionType, ExtensionType};
use arrow_schema::{ArrowError, FieldRef};
use arrow_schema::{ArrowError, FieldRef, SchemaRef};
use datafusion::common::test_util::batches_to_string;
use datafusion::execution::context::{FunctionFactory, RegisterFunction, SessionState};
use datafusion::prelude::*;
Expand Down Expand Up @@ -1786,3 +1786,58 @@ async fn test_extension_based_udf() -> Result<()> {
ctx.deregister_table("t")?;
Ok(())
}

#[tokio::test]
async fn test_config_options_work_for_scalar_func() -> Result<()> {
#[derive(Debug)]
struct TestScalarUDF {
signature: Signature,
}

impl ScalarUDFImpl for TestScalarUDF {
fn as_any(&self) -> &dyn Any {
self
}
fn name(&self) -> &str {
"TestScalarUDF"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
let tz = args.config_options.execution.time_zone.clone();
Ok(ColumnarValue::Scalar(ScalarValue::from(tz)))
}
}

let udf = ScalarUDF::from(TestScalarUDF {
signature: Signature::uniform(1, vec![DataType::Utf8], Volatility::Stable),
});

let mut config = SessionConfig::new();
config.options_mut().execution.time_zone = "AEST".into();

let ctx = SessionContext::new_with_config(config);

ctx.register_udf(udf.clone());

let df = ctx.read_empty()?;
let df = df.select(vec![udf.call(vec![lit("a")]).alias("a")])?;
let actual = df.collect().await?;

let expected_schema = Schema::new(vec![Field::new("a", DataType::Utf8, false)]);
let expected = RecordBatch::try_new(
SchemaRef::from(expected_schema),
vec![create_array!(Utf8, ["AEST"])],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

)?;

assert_eq!(expected, actual[0]);

Ok(())
}
Loading