Skip to content

Commit 1cb85dd

Browse files
committed
Fix: Passed all tests and make sure not roundrobinbatch after setting
1 parent b93fa5c commit 1cb85dd

File tree

1 file changed

+46
-21
lines changed

1 file changed

+46
-21
lines changed

datafusion/physical-plan/src/repartition/on_demand_repartition.rs

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use super::{
3535
use crate::common::SharedMemoryReservation;
3636
use crate::execution_plan::CardinalityEffect;
3737
use crate::metrics::{self, BaselineMetrics, MetricBuilder};
38+
use crate::projection::{all_columns, make_with_child, ProjectionExec};
3839
use crate::repartition::distributor_channels::{
3940
DistributionReceiver, DistributionSender,
4041
};
@@ -202,7 +203,7 @@ impl ExecutionPlan for OnDemandRepartitionExec {
202203
}
203204

204205
fn benefits_from_input_partitioning(&self) -> Vec<bool> {
205-
vec![matches!(self.partitioning(), Partitioning::Hash(_, _))]
206+
vec![false]
206207
}
207208

208209
fn maintains_input_order(&self) -> Vec<bool> {
@@ -247,10 +248,10 @@ impl ExecutionPlan for OnDemandRepartitionExec {
247248
.get_or_init(|| async move {
248249
let (txs, rxs) = if preserve_order {
249250
(0..num_input_partitions)
250-
.map(|_| async_channel::bounded(2))
251+
.map(|_| async_channel::unbounded())
251252
.unzip::<_, _, Vec<_>, Vec<_>>()
252253
} else {
253-
let (tx, rx) = async_channel::bounded(2);
254+
let (tx, rx) = async_channel::unbounded();
254255
(vec![tx], vec![rx])
255256
};
256257
Mutex::new((txs, rxs))
@@ -365,6 +366,30 @@ impl ExecutionPlan for OnDemandRepartitionExec {
365366
fn cardinality_effect(&self) -> CardinalityEffect {
366367
CardinalityEffect::Equal
367368
}
369+
370+
fn try_swapping_with_projection(
371+
&self,
372+
projection: &ProjectionExec,
373+
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
374+
// If the projection does not narrow the schema, we should not try to push it down.
375+
if projection.expr().len() >= projection.input().schema().fields().len() {
376+
return Ok(None);
377+
}
378+
379+
// If pushdown is not beneficial or applicable, break it.
380+
if projection.benefits_from_input_partitioning()[0]
381+
|| !all_columns(projection.expr())
382+
{
383+
return Ok(None);
384+
}
385+
386+
let new_projection = make_with_child(projection, self.input())?;
387+
388+
Ok(Some(Arc::new(OnDemandRepartitionExec::try_new(
389+
new_projection,
390+
self.partitioning().clone(),
391+
)?)))
392+
}
368393
}
369394

370395
impl OnDemandRepartitionExec {
@@ -396,7 +421,7 @@ impl OnDemandRepartitionExec {
396421
async fn process_input(
397422
input: Arc<dyn ExecutionPlan>,
398423
partition: usize,
399-
buffer_tx: tokio::sync::mpsc::Sender<RecordBatch>,
424+
buffer_tx: Sender<RecordBatch>,
400425
context: Arc<TaskContext>,
401426
fetch_time: metrics::Time,
402427
send_buffer_time: metrics::Time,
@@ -452,7 +477,7 @@ impl OnDemandRepartitionExec {
452477
context: Arc<TaskContext>,
453478
) -> Result<()> {
454479
// execute the child operator in a separate task
455-
let (buffer_tx, mut buffer_rx) = tokio::sync::mpsc::channel::<RecordBatch>(2);
480+
let (buffer_tx, buffer_rx) = async_channel::bounded::<RecordBatch>(2);
456481
let processing_task = SpawnedTask::spawn(Self::process_input(
457482
Arc::clone(&input),
458483
partition,
@@ -467,8 +492,8 @@ impl OnDemandRepartitionExec {
467492
while !output_channels.is_empty() {
468493
// When the input is done, break the loop
469494
let batch = match buffer_rx.recv().await {
470-
Some(result) => result,
471-
None => break,
495+
Ok(batch) => batch,
496+
_ => break,
472497
};
473498

474499
// Get the partition number from the output partition
@@ -595,13 +620,13 @@ impl Stream for OnDemandPerPartitionStream {
595620
mut self: Pin<&mut Self>,
596621
cx: &mut Context<'_>,
597622
) -> Poll<Option<Self::Item>> {
598-
if !self.is_requested {
599-
match self.sender.try_send(self.partition) {
600-
Ok(_) => {}
601-
Err(_) => {
602-
return Poll::Ready(None);
603-
}
604-
}
623+
if !self.is_requested && !self.sender.is_closed() {
624+
self.sender.try_send(self.partition).map_err(|_| {
625+
internal_datafusion_err!(
626+
"Error sending partition number to the receiver for partition {}",
627+
self.partition
628+
)
629+
})?;
605630
self.is_requested = true;
606631
}
607632

@@ -667,13 +692,13 @@ impl Stream for OnDemandRepartitionStream {
667692
) -> Poll<Option<Self::Item>> {
668693
loop {
669694
// Send partition number to input partitions
670-
if !self.is_requested {
671-
match self.sender.try_send(self.partition) {
672-
Ok(_) => {}
673-
Err(_) => {
674-
return Poll::Ready(None);
675-
}
676-
}
695+
if !self.is_requested && !self.sender.is_closed() {
696+
self.sender.try_send(self.partition).map_err(|_| {
697+
internal_datafusion_err!(
698+
"Error sending partition number to the receiver for partition {}",
699+
self.partition
700+
)
701+
})?;
677702
self.is_requested = true;
678703
}
679704

0 commit comments

Comments
 (0)