Skip to content

Commit 4f53358

Browse files
authored
fix: PlaceholderRowExec::partition_statistics (#16851)
1 parent 871d4b5 commit 4f53358

File tree

2 files changed

+39
-5
lines changed

2 files changed

+39
-5
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ mod test {
4040
};
4141
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
4242
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
43+
use datafusion_physical_plan::common::compute_record_batch_statistics;
4344
use datafusion_physical_plan::empty::EmptyExec;
4445
use datafusion_physical_plan::filter::FilterExec;
4546
use datafusion_physical_plan::joins::CrossJoinExec;
4647
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
48+
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
4749
use datafusion_physical_plan::projection::ProjectionExec;
4850
use datafusion_physical_plan::sorts::sort::SortExec;
4951
use datafusion_physical_plan::union::UnionExec;
@@ -728,4 +730,32 @@ mod test {
728730

729731
Ok(())
730732
}
733+
734+
#[tokio::test]
735+
async fn test_statistic_by_partition_of_placeholder_rows() -> Result<()> {
736+
let schema =
737+
Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
738+
let plan = Arc::new(PlaceholderRowExec::new(schema).with_partitions(2))
739+
as Arc<dyn ExecutionPlan>;
740+
let schema = plan.schema();
741+
742+
let ctx = TaskContext::default();
743+
let partitions = execute_stream_partitioned(Arc::clone(&plan), Arc::new(ctx))?;
744+
745+
let mut all_batches = vec![];
746+
for (i, partition_stream) in partitions.into_iter().enumerate() {
747+
let batches: Vec<RecordBatch> = partition_stream.try_collect().await?;
748+
let actual = plan.partition_statistics(Some(i))?;
749+
let expected =
750+
compute_record_batch_statistics(&[batches.clone()], &schema, None);
751+
assert_eq!(actual, expected);
752+
all_batches.push(batches);
753+
}
754+
755+
let actual = plan.partition_statistics(None)?;
756+
let expected = compute_record_batch_statistics(&all_batches, &schema, None);
757+
assert_eq!(actual, expected);
758+
759+
Ok(())
760+
}
731761
}

datafusion/physical-plan/src/placeholder_row.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,18 @@ impl ExecutionPlan for PlaceholderRowExec {
171171
}
172172

173173
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
174-
if partition.is_some() {
175-
return Ok(Statistics::new_unknown(&self.schema()));
176-
}
177-
let batch = self
174+
let batches = self
178175
.data()
179176
.expect("Create single row placeholder RecordBatch should not fail");
177+
178+
let batches = match partition {
179+
Some(_) => vec![batches],
180+
// entire plan
181+
None => vec![batches; self.partitions],
182+
};
183+
180184
Ok(common::compute_record_batch_statistics(
181-
&[batch],
185+
&batches,
182186
&self.schema,
183187
None,
184188
))

0 commit comments

Comments
 (0)