Skip to content

Commit 69a3c4f

Browse files
committed
chore
1 parent 1cb85dd commit 69a3c4f

File tree

2 files changed

+34
-15
lines changed

2 files changed

+34
-15
lines changed

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,10 +1071,20 @@ fn replace_round_robin_repartition_with_on_demand(
10711071
if let Some(repartition) = context.plan.as_any().downcast_ref::<RepartitionExec>() {
10721072
if let Partitioning::RoundRobinBatch(n) = repartition.partitioning() {
10731073
let child_plan = Arc::clone(&context.children[0].plan);
1074-
context.plan = Arc::new(OnDemandRepartitionExec::try_new(
1075-
child_plan,
1076-
Partitioning::OnDemand(*n),
1077-
)?);
1074+
context.plan = if repartition.preserve_order() {
1075+
Arc::new(
1076+
OnDemandRepartitionExec::try_new(
1077+
child_plan,
1078+
Partitioning::OnDemand(*n),
1079+
)?
1080+
.with_preserve_order(),
1081+
)
1082+
} else {
1083+
Arc::new(OnDemandRepartitionExec::try_new(
1084+
child_plan,
1085+
Partitioning::OnDemand(*n),
1086+
)?)
1087+
};
10781088
return Ok(context);
10791089
}
10801090
}

datafusion/physical-plan/src/repartition/on_demand_repartition.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl OnDemandRepartitionExec {
118118
}
119119

120120
/// Get preserve_order flag of the RepartitionExecutor
121-
/// `true` means `SortPreservingRepartitionExec`, `false` means `RepartitionExec`
121+
/// `true` means `SortPreservingRepartitionExec`, `false` means `OnDemandRepartitionExec`
122122
pub fn preserve_order(&self) -> bool {
123123
self.base.preserve_order
124124
}
@@ -129,7 +129,7 @@ impl OnDemandRepartitionExec {
129129
/// operator can take advantage of it.
130130
///
131131
/// If the input is not ordered, or has only one partition, this is a no op,
132-
/// and the node remains a `RepartitionExec`.
132+
/// and the node remains a `OnDemandRepartitionExec`.
133133
pub fn with_preserve_order(mut self) -> Self {
134134
self.base = self.base.with_preserve_order();
135135
self
@@ -239,7 +239,6 @@ impl ExecutionPlan for OnDemandRepartitionExec {
239239

240240
let stream = futures::stream::once(async move {
241241
let num_input_partitions = input.output_partitioning().partition_count();
242-
243242
let input_captured = Arc::clone(&input);
244243
let metrics_captured = metrics.clone();
245244
let name_captured = name.clone();
@@ -483,8 +482,8 @@ impl OnDemandRepartitionExec {
483482
partition,
484483
buffer_tx,
485484
Arc::clone(&context),
486-
metrics.fetch_time,
487-
metrics.send_buffer_time,
485+
metrics.fetch_time.clone(),
486+
metrics.send_buffer_time.clone(),
488487
));
489488

490489
// While there are still outputs to send to, keep pulling inputs
@@ -621,10 +620,11 @@ impl Stream for OnDemandPerPartitionStream {
621620
cx: &mut Context<'_>,
622621
) -> Poll<Option<Self::Item>> {
623622
if !self.is_requested && !self.sender.is_closed() {
624-
self.sender.try_send(self.partition).map_err(|_| {
623+
self.sender.send_blocking(self.partition).map_err(|e| {
625624
internal_datafusion_err!(
626-
"Error sending partition number to the receiver for partition {}",
627-
self.partition
625+
"Error sending partition number to the receiver for partition {}: {}",
626+
self.partition,
627+
e
628628
)
629629
})?;
630630
self.is_requested = true;
@@ -693,10 +693,11 @@ impl Stream for OnDemandRepartitionStream {
693693
loop {
694694
// Send partition number to input partitions
695695
if !self.is_requested && !self.sender.is_closed() {
696-
self.sender.try_send(self.partition).map_err(|_| {
696+
self.sender.send_blocking(self.partition).map_err(|e| {
697697
internal_datafusion_err!(
698-
"Error sending partition number to the receiver for partition {}",
699-
self.partition
698+
"Error sending partition number to the receiver for partition {}: {}",
699+
self.partition,
700+
e
700701
)
701702
})?;
702703
self.is_requested = true;
@@ -891,24 +892,32 @@ mod tests {
891892
"| 1 |",
892893
"| 1 |",
893894
"| 1 |",
895+
"| 1 |",
896+
"| 2 |",
894897
"| 2 |",
895898
"| 2 |",
896899
"| 2 |",
897900
"| 3 |",
898901
"| 3 |",
899902
"| 3 |",
903+
"| 3 |",
900904
"| 4 |",
901905
"| 4 |",
902906
"| 4 |",
907+
"| 4 |",
908+
"| 5 |",
903909
"| 5 |",
904910
"| 5 |",
905911
"| 5 |",
906912
"| 6 |",
907913
"| 6 |",
908914
"| 6 |",
915+
"| 6 |",
909916
"| 7 |",
910917
"| 7 |",
911918
"| 7 |",
919+
"| 7 |",
920+
"| 8 |",
912921
"| 8 |",
913922
"| 8 |",
914923
"| 8 |",

0 commit comments

Comments
 (0)