Skip to content

Commit b93fa5c

Browse files
committed
feat: Added metric
1 parent 15caf95 commit b93fa5c

File tree

2 files changed

+81
-15
lines changed

2 files changed

+81
-15
lines changed

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use datafusion_physical_expr_common::sort_expr::LexOrdering;
5757
use futures::stream::Stream;
5858
use futures::{ready, FutureExt, StreamExt, TryStreamExt};
5959
use log::trace;
60-
use on_demand_repartition::OnDemandRepartitionExec;
60+
use on_demand_repartition::{OnDemandRepartitionExec, OnDemandRepartitionMetrics};
6161
use parking_lot::Mutex;
6262

6363
mod distributor_channels;
@@ -220,8 +220,6 @@ impl RepartitionExecState {
220220
})
221221
.collect();
222222

223-
let r_metrics = RepartitionMetrics::new(i, num_output_partitions, &metrics);
224-
225223
let input_task = if enable_pull_based {
226224
let partition_rx = if preserve_order {
227225
partition_receivers.clone().expect(
@@ -233,6 +231,9 @@ impl RepartitionExecState {
233231
"partition_receivers must be provided when preserve_order is disabled",
234232
)[0].clone()
235233
};
234+
let r_metrics =
235+
OnDemandRepartitionMetrics::new(i, num_output_partitions, &metrics);
236+
236237
SpawnedTask::spawn(OnDemandRepartitionExec::pull_from_input(
237238
Arc::clone(&input),
238239
i,
@@ -243,6 +244,9 @@ impl RepartitionExecState {
243244
Arc::clone(&context),
244245
))
245246
} else {
247+
let r_metrics =
248+
RepartitionMetrics::new(i, num_output_partitions, &metrics);
249+
246250
SpawnedTask::spawn(RepartitionExec::pull_from_input(
247251
Arc::clone(&input),
248252
i,
@@ -612,7 +616,7 @@ pub struct RepartitionExec {
612616
}
613617

614618
#[derive(Debug, Clone)]
615-
pub(crate) struct RepartitionMetrics {
619+
struct RepartitionMetrics {
616620
/// Time in nanos to execute child operator and fetch batches
617621
fetch_time: metrics::Time,
618622
/// Repartitioning elapsed time in nanos

datafusion/physical-plan/src/repartition/on_demand_repartition.rs

Lines changed: 73 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,11 @@ use std::{any::Any, vec};
3030
use super::metrics::{ExecutionPlanMetricsSet, MetricsSet};
3131
use super::{
3232
DisplayAs, ExecutionPlanProperties, MaybeBatch, RecordBatchStream,
33-
RepartitionExecBase, RepartitionMetrics, SendableRecordBatchStream,
33+
RepartitionExecBase, SendableRecordBatchStream,
3434
};
3535
use crate::common::SharedMemoryReservation;
3636
use crate::execution_plan::CardinalityEffect;
37-
use crate::metrics::BaselineMetrics;
37+
use crate::metrics::{self, BaselineMetrics, MetricBuilder};
3838
use crate::repartition::distributor_channels::{
3939
DistributionReceiver, DistributionSender,
4040
};
@@ -398,22 +398,38 @@ impl OnDemandRepartitionExec {
398398
partition: usize,
399399
buffer_tx: tokio::sync::mpsc::Sender<RecordBatch>,
400400
context: Arc<TaskContext>,
401+
fetch_time: metrics::Time,
402+
send_buffer_time: metrics::Time,
401403
) -> Result<()> {
404+
let timer = fetch_time.timer();
402405
let mut stream = input.execute(partition, context).map_err(|e| {
403406
internal_datafusion_err!(
404407
"Error executing input partition {} for on demand repartitioning: {}",
405408
partition,
406409
e
407410
)
408411
})?;
409-
while let Some(batch) = stream.next().await {
410-
buffer_tx.send(batch?).await.map_err(|e| {
411-
internal_datafusion_err!(
412-
"Error sending batch to buffer channel for partition {}: {}",
413-
partition,
414-
e
415-
)
416-
})?;
412+
timer.done();
413+
414+
loop {
415+
let timer = fetch_time.timer();
416+
let batch = stream.next().await;
417+
timer.done();
418+
419+
// send the batch to the buffer channel
420+
if let Some(batch) = batch {
421+
let timer = send_buffer_time.timer();
422+
buffer_tx.send(batch?).await.map_err(|e| {
423+
internal_datafusion_err!(
424+
"Error sending batch to buffer channel for partition {}: {}",
425+
partition,
426+
e
427+
)
428+
})?;
429+
timer.done();
430+
} else {
431+
break;
432+
}
417433
}
418434

419435
Ok(())
@@ -432,7 +448,7 @@ impl OnDemandRepartitionExec {
432448
>,
433449
partitioning: Partitioning,
434450
output_partition_rx: Receiver<usize>,
435-
metrics: RepartitionMetrics,
451+
metrics: OnDemandRepartitionMetrics,
436452
context: Arc<TaskContext>,
437453
) -> Result<()> {
438454
// execute the child operator in a separate task
@@ -442,6 +458,8 @@ impl OnDemandRepartitionExec {
442458
partition,
443459
buffer_tx,
444460
Arc::clone(&context),
461+
metrics.fetch_time,
462+
metrics.send_buffer_time,
445463
));
446464

447465
// While there are still outputs to send to, keep pulling inputs
@@ -501,6 +519,50 @@ impl OnDemandRepartitionExec {
501519
}
502520
}
503521

522+
#[derive(Debug, Clone)]
523+
pub(crate) struct OnDemandRepartitionMetrics {
524+
/// Time in nanos to execute child operator and fetch batches
525+
fetch_time: metrics::Time,
526+
/// Time in nanos for sending resulting batches to buffer channels.
527+
send_buffer_time: metrics::Time,
528+
/// Time in nanos for sending resulting batches to channels.
529+
///
530+
/// One metric per output partition.
531+
send_time: Vec<metrics::Time>,
532+
}
533+
534+
impl OnDemandRepartitionMetrics {
535+
pub fn new(
536+
input_partition: usize,
537+
num_output_partitions: usize,
538+
metrics: &ExecutionPlanMetricsSet,
539+
) -> Self {
540+
// Time in nanos to execute child operator and fetch batches
541+
let fetch_time =
542+
MetricBuilder::new(metrics).subset_time("fetch_time", input_partition);
543+
544+
// Time in nanos for sending resulting batches to channels
545+
let send_time = (0..num_output_partitions)
546+
.map(|output_partition| {
547+
let label =
548+
metrics::Label::new("outputPartition", output_partition.to_string());
549+
MetricBuilder::new(metrics)
550+
.with_label(label)
551+
.subset_time("send_time", input_partition)
552+
})
553+
.collect();
554+
555+
// Time in nanos for sending resulting batches to buffer channels
556+
let send_buffer_time =
557+
MetricBuilder::new(metrics).subset_time("send_buffer_time", input_partition);
558+
Self {
559+
fetch_time,
560+
send_time,
561+
send_buffer_time,
562+
}
563+
}
564+
}
565+
504566
/// This struct converts a receiver to a stream.
505567
/// Receiver receives data on an SPSC channel.
506568
struct OnDemandPerPartitionStream {

0 commit comments

Comments
 (0)