-
|
Consider this code: Note: better example in the comments below use std::sync::Arc;
use datafusion::{
config::ConfigOptions,
error::DataFusionError,
physical_plan::{ExecutionPlan, displayable},
prelude::*,
};
fn repartition_subtree(
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let new_children = plan
.children()
.into_iter()
.map(|child| repartition_subtree(Arc::clone(child), config))
.collect::<Result<_, _>>()?;
let plan = plan.with_new_children(new_children)?;
// change partitions to 4
let repartitioned_plan = plan.repartitioned(4, config)?;
if let Some(new_plan) = repartitioned_plan {
Ok(new_plan)
} else {
Ok(plan)
}
}
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
let config = SessionConfig::new().with_target_partitions(2);
// register the table
let ctx = SessionContext::new_with_config(config);
ctx.register_parquet("example", "<path to datafusion repo>/datafusion/core/tests/data/test_statistics_per_partition/", ParquetReadOptions::new())
.await?;
// create a plan to run a SQL query
let df = ctx
.sql("SELECT id, MIN(0) FROM example GROUP BY id LIMIT 100")
.await?;
let plan = df.create_physical_plan().await.unwrap();
println!("plan: {}", displayable(plan.as_ref()).indent(false));
let state = ctx.state();
let new_plan = repartition_subtree(plan, state.config_options()).unwrap();
println!("new_plan: {}", displayable(new_plan.as_ref()).indent(false));
Ok(())
}The output is this: I plan the query with a Notice that the number of It seems to be because of this code: datafusion/datafusion/datasource/src/file_groups.rs Lines 192 to 199 in d24eb4a Even though, I'm scanning entire files in this case, the range is set in these file groups and repartitioning is skipped! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 6 replies
-
|
I don't think it's that ranges check, from what I can see it's due to this condition: datafusion/datafusion/datasource/src/file_groups.rs Lines 219 to 225 in 6ea305e Specifically, the datafusion/datafusion/datasource/src/source.rs Lines 250 to 263 in 6ea305e
Which is from this config: datafusion/datafusion/common/src/config.rs Lines 931 to 932 in 6ea305e If we set it to something smaller than let config = SessionConfig::new()
.with_target_partitions(2)
.with_repartition_file_min_size(1024); // HereOutput: |
Beta Was this translation helpful? Give feedback.
From what I can tell, I think it was mainly to simplify implementation; I can't see a specific reason we disallow repartitioning if there are existing ranges in terms of correctness. For reference, this PR introduced that logic so it is quite old: #5057
Feel free to raise an issue & PR if you feel like tackling it