Skip to content

Conversation

jonathanc-n
Copy link
Contributor

@jonathanc-n jonathanc-n commented Jul 2, 2025

Rationale for this change

PiecewiseMergeJoin is a nice pre cursor to the implementation of ASOF, inequality, etc. joins (multiple range predicates). PiecewiseMergeJoin is specialized for when there is only one range filter and can perform much faster in this case especially for semi, anti, mark joins.

What changes are included in this PR?

PiecewiseMergeJoin implementation only, there is no physical planner -> PiecewiseMergeJoinExec.

ExecutionPlan has been implemented for PiecewiseMergeJoinExec

  • Currently compute_properties and swap_inputs is not implemented
  • Builds execution plan for piecewise merge join exec
  • Buffered plan gets built at this step.

PiecewiseMergeJoinStream has been implemented for the actual batch emission logic

  • Behaviour is different for regular joins and existence joins.

Examples have been provided for the PiecewiseMergeJoinExec and PiecewiseMergeJoinStream implementations.

Benchmark Results

The benchmarks were tested on a random batch of values (streamed side) against a sorted batch (buffered side).

When compared to NestedLoopJoin the queries for classic joins (left, right, inner, full) were about 10x faster 🚀

  • However, when larger batch sizes were equal, it performed slower than the Nested loop join.

For existence joins (semi, anti), the join performed about 1000 x faster 🚀

  • Just as a quick note to explain the ridiculous speedup, all we need to do instead of a cartesian product, is find the max/min value of the unsorted stream side, and do a O(n) scan of the sorted buffered side to find the first match and emit all rows after it.
Benchmark Results for normal joins

   joins/PiecewiseMergeJoin/l=1000_r=1000
                        time:   [345.69 µs 351.99 µs 361.11 µs]
                        change: [-4.0041% -2.4315% -0.4405%] (p = 0.01 < 0.05)
                        Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
  7 (7.00%) high mild
  3 (3.00%) high severe
joins/NestedLoopJoin/l=1000_r=1000
                        time:   [2.6237 ms 2.6518 ms 2.6870 ms]
                        change: [-4.0439% +0.5217% +4.4183%] (p = 0.84 > 0.05)
                        No change in performance detected.
Found 15 outliers among 100 measurements (15.00%)
  4 (4.00%) high mild
  11 (11.00%) high severe
Benchmarking joins/PiecewiseMergeJoin/l=10000_r=10000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 52.4s, or reduce sample count to 10.
joins/PiecewiseMergeJoin/l=10000_r=10000
                        time:   [490.26 ms 501.24 ms 513.75 ms]
                        change: [-14.807% -9.4227% -4.1141%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  8 (8.00%) high mild
  4 (4.00%) high severe
Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.9s, or reduce sample count to 10.
joins/NestedLoopJoin/l=10000_r=10000
                        time:   [325.74 ms 330.41 ms 335.76 ms]
                        change: [-30.701% -25.545% -20.089%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) high mild
  8 (8.00%) high severe
joins/PiecewiseMergeJoin/l=100000_r=1000
                        time:   [46.738 ms 47.037 ms 47.348 ms]
                        change: [+6.8565% +7.8729% +8.8987%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
Benchmarking joins/NestedLoopJoin/l=100000_r=1000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 34.7s, or reduce sample count to 10.
joins/NestedLoopJoin/l=100000_r=1000
                        time:   [337.92 ms 355.00 ms 375.33 ms]
                        change: [+3.4274% +8.8931% +15.219%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 17 outliers among 100 measurements (17.00%)
  4 (4.00%) high mild
  13 (13.00%) high severe
joins/PiecewiseMergeJoin/l=10000_r=100
                        time:   [353.07 µs 356.19 µs 359.16 µs]
                        change: [-20.427% -19.045% -17.788%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) low mild
joins/NestedLoopJoin/l=10000_r=100
                        time:   [2.4624 ms 2.4690 ms 2.4759 ms]
                        change: [-35.277% -26.644% -17.558%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
joins/PiecewiseMergeJoin/l=1000000_r=100
                        time:   [49.569 ms 49.788 ms 50.071 ms]
                        change: [-11.268% -8.9464% -7.0861%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  4 (4.00%) high mild
  2 (2.00%) high severe
Benchmarking joins/NestedLoopJoin/l=1000000_r=100: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.6s, or reduce sample count to 10.
joins/NestedLoopJoin/l=1000000_r=100
                        time:   [318.73 ms 321.16 ms 324.12 ms]
                        change: [-2.3069% -0.7454% +0.6191%] (p = 0.35 > 0.05)
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

Benchmark Results for existence joins

joins/PiecewiseMergeJoin/l=1000_r=1000
                        time:   [17.562 µs 17.856 µs 18.368 µs]
                        change: [-95.034% -94.834% -94.578%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  5 (5.00%) high severe
joins/NestedLoopJoin/l=1000_r=1000
                        time:   [2.5747 ms 2.6143 ms 2.6718 ms]
                        change: [-3.5382% -1.4140% +1.1788%] (p = 0.24 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe
joins/PiecewiseMergeJoin/l=10000_r=10000
                        time:   [126.97 µs 130.34 µs 133.60 µs]
                        change: [-99.975% -99.974% -99.973%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 21 outliers among 100 measurements (21.00%)
  16 (16.00%) low mild
  5 (5.00%) high mild
Benchmarking joins/NestedLoopJoin/l=10000_r=10000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 32.1s, or reduce sample count to 10.
joins/NestedLoopJoin/l=10000_r=10000
                        time:   [324.45 ms 329.32 ms 335.64 ms]
                        change: [-2.5195% -0.3276% +2.0344%] (p = 0.79 > 0.05)
                        No change in performance detected.
Found 8 outliers among 100 measurements (8.00%)
  3 (3.00%) high mild
  5 (5.00%) high severe
Benchmarking joins/PiecewiseMergeJoin/l=100000_r=1000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.7s, enable flat sampling, or reduce sample count to 60.
  
   joins/PiecewiseMergeJoin/l=1000_r=10000
                        time:   [21.704 µs 21.851 µs 22.039 µs]
                        change: [-99.951% -99.951% -99.951%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  3 (3.00%) high severe
joins/NestedLoopJoin/l=1000_r=10000
                        time:   [26.852 ms 27.166 ms 27.482 ms]
                        change: [-34.057% -28.293% -22.241%] (p = 0.00 < 0.05)
                        Performance has improved.
joins/PiecewiseMergeJoin/l=100_r=100000
                        time:   [74.249 µs 74.381 µs 74.516 µs]
                        change: [-99.952% -99.952% -99.951%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe
joins/NestedLoopJoin/l=100_r=100000
                        time:   [25.960 ms 26.343 ms 26.807 ms]
                        change: [-1.0541% +0.8379% +2.8866%] (p = 0.41 > 0.05)
                        No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high severe
joins/PiecewiseMergeJoin/l=1000_r=100000
                        time:   [82.470 µs 83.025 µs 83.761 µs]
                        change: [-99.996% -99.996% -99.996%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
  6 (6.00%) high mild
  6 (6.00%) high severe
Benchmarking joins/NestedLoopJoin/l=1000_r=100000: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 33.0s, or reduce sample count to 10.
joins/NestedLoopJoin/l=1000_r=100000
                        time:   [322.35 ms 323.86 ms 325.53 ms]
                        change: [-33.068% -26.620% -19.778%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  1 (1.00%) high mild
  1 (1.00%) high severe
  

If you want to replicate:

Code

Here’s the hidden content that you can put Markdown in,
including lists, code blocks, images, etc.

use std::sync::Arc;

use arrow::array::{
  ArrayRef, Date32Builder, Decimal128Builder, Int32Builder, RecordBatch, StringBuilder,
};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion};
use datafusion_common::{JoinSide, Result};
use datafusion_execution::TaskContext;
use datafusion_expr::{JoinType, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column};

use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::joins::utils::{ColumnIndex, JoinFilter};
use datafusion_physical_plan::joins::{NestedLoopJoinExec, PiecewiseMergeJoinExec};
use datafusion_physical_plan::test::TestMemoryExec;
use datafusion_physical_plan::{collect, ExecutionPlan};
use rand::{rng, Rng};
use tokio::runtime::Runtime;

/// Creates a RecordBatch of `num_rows` with completely random values in [0, 100_000].
pub fn create_random_batch(num_rows: usize) -> RecordBatch {
  let schema = Arc::new(Schema::new(vec![
      Field::new("c0", DataType::Int32, true),
      Field::new("c1", DataType::Utf8, true),
      Field::new("c2", DataType::Date32, true),
      Field::new("c3", DataType::Decimal128(11, 2), true),
  ]));

  let mut rng = rng();
  let mut a = Int32Builder::new();
  let mut b = StringBuilder::new();
  let mut c = Date32Builder::new();
  let mut d = Decimal128Builder::new()
      .with_precision_and_scale(11, 2)
      .unwrap();

  for _ in 0..num_rows {
      let int_val = rng.random_range(0..=100_000);
      a.append_value(int_val);
      b.append_value(format!("string_{int_val}"));
      c.append_value(int_val);
      let dec_val = (rng.random_range(0..=100_000) as i128) * 100;
      d.append_value(dec_val);
  }

  let a = Arc::new(a.finish()) as ArrayRef;
  let b = Arc::new(b.finish()) as ArrayRef;
  let c = Arc::new(c.finish()) as ArrayRef;
  let d = Arc::new(d.finish()) as ArrayRef;

  RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap()
}

pub fn create_sorted_batch(num_rows: usize, max_increment: i32) -> RecordBatch {
  let schema = Arc::new(Schema::new(vec![
      Field::new("c0", DataType::Int32, true),
      Field::new("c1", DataType::Utf8, true),
      Field::new("c2", DataType::Date32, true),
      Field::new("c3", DataType::Decimal128(11, 2), true),
  ]));

  let mut rng = rng();
  let mut a = Int32Builder::new();
  let mut b = StringBuilder::new();
  let mut c = Date32Builder::new();
  let mut d = Decimal128Builder::new()
      .with_precision_and_scale(11, 2)
      .unwrap();

  let mut current = rng.random_range(0..=max_increment);
  for _ in 0..num_rows {
      let inc = rng.random_range(0..=max_increment);
      current = current.saturating_add(inc);
      a.append_value(current);
      b.append_value(format!("string_{current}"));
      c.append_value(current);
      d.append_value((current as i128) * 100);
  }

  let a = Arc::new(a.finish()) as ArrayRef;
  let b = Arc::new(b.finish()) as ArrayRef;
  let c = Arc::new(c.finish()) as ArrayRef;
  let d = Arc::new(d.finish()) as ArrayRef;

  RecordBatch::try_new(schema.clone(), vec![a, b, c, d]).unwrap()
}

fn make_memory_execs(
  left_rows: usize,
  right_rows: usize,
) -> (Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>, SchemaRef) {
  let left_batch = create_random_batch(left_rows);
  let schema = left_batch.schema();
  let left_partitions = vec![vec![left_batch]];

  let right_batch = create_sorted_batch(right_rows, 10);
  let right_partitions = vec![vec![right_batch]];

  let left_mem =
      TestMemoryExec::try_new_exec(&left_partitions, schema.clone(), None).unwrap();
  let right_mem =
      TestMemoryExec::try_new_exec(&right_partitions, schema.clone(), None).unwrap();

  (left_mem, right_mem, schema)
}

fn build_two_joins(
  left: Arc<dyn ExecutionPlan>,
  right: Arc<dyn ExecutionPlan>,
) -> Result<(
  Arc<dyn ExecutionPlan>, // pwmj
  Arc<dyn ExecutionPlan>, // nlj
)> {
  let left_on: Arc<dyn PhysicalExpr> = Arc::new(
      Column::new_with_schema("c0", &left.schema())
          .expect("left schema must contain 'c0'"),
  );
  let right_on: Arc<dyn PhysicalExpr> = Arc::new(
      Column::new_with_schema("c0", &right.schema())
          .expect("right schema must contain 'c0'"),
  );

  let hj = PiecewiseMergeJoinExec::try_new(
      left.clone(),
      right.clone(),
      (left_on.clone(), right_on.clone()),
      Operator::Lt,
      JoinType::Left,
  )?;

  let filter_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
      left_on.clone(),
      Operator::Lt,
      right_on.clone(),
  ));

  let column_indices = vec![
      ColumnIndex {
          index: 0,
          side: JoinSide::Left,
      },
      ColumnIndex {
          index: 0,
          side: JoinSide::Right,
      },
  ];

  let intermediate_schema = Arc::new(Schema::new(vec![
      Field::new("c0_left", DataType::Int32, false),
      Field::new("c0_right", DataType::Int32, false),
  ]));

  let join_filter = JoinFilter::new(filter_expr, column_indices, intermediate_schema); // :contentReference[oaicite
  let nlj = NestedLoopJoinExec::try_new(
      left,
      right,
      Some(join_filter),
      &JoinType::Left,
      None,
  )?;

  Ok((Arc::new(hj), Arc::new(nlj)))
}

fn bench_joins(c: &mut Criterion) {
  let rt = Runtime::new().unwrap();
  let mut group = c.benchmark_group("joins");

  // row pairs for each side in benchmarks
  let size_pairs = &[
      (1000, 1000),
      (10000, 10000),
      (100000, 1000),
      (10000, 100),
      (1000000, 100),
      (1000, 10000),
      (100, 100000),
      (1000, 100000),
  ];

  for &(left_rows, right_rows) in size_pairs.iter() {
      let (left_mem, right_mem, _schema) = make_memory_execs(left_rows, right_rows);

      let (pwmj_join, nested_loop_join) =
          build_two_joins(left_mem.clone(), right_mem.clone()).unwrap();

      group.bench_with_input(
          BenchmarkId::new(
              "PiecewiseMergeJoin",
              format!("l={}_r={}", left_rows, right_rows),
          ),
          &pwmj_join,
          |b, plan| {
              b.iter_batched(
                  || (),
                  |_setup| {
                      let ctx = TaskContext::default();
                      let fut = collect(plan.clone(), Arc::new(ctx));
                      rt.block_on(async {
                          let _ = fut.await.unwrap();
                      });
                  },
                  BatchSize::SmallInput,
              )
          },
      );

      group.bench_with_input(
          BenchmarkId::new(
              "NestedLoopJoin",
              format!("l={}_r={}", left_rows, right_rows),
          ),
          &nested_loop_join,
          |b, plan| {
              b.iter_batched(
                  || (),
                  |_setup| {
                      let ctx = TaskContext::default();
                      let fut = collect(plan.clone(), Arc::new(ctx));
                      rt.block_on(async {
                          let _ = fut.await.unwrap();
                      });
                  },
                  BatchSize::SmallInput,
              )
          },
      );
  }

  group.finish();
}

criterion_group!(benches, bench_joins);
criterion_main!(benches);

Next Steps

Pull request was getting large, here are the following steps for this:

  • Serialization
  • Mark join support
  • physical planner
  • fuzz tests
  • Refactor to compare on same key similar to sort merge join

Are these changes tested?

Yes unit tests

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jul 2, 2025
@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Jul 5, 2025

cc @alamb @ozankabak Seems like you guys were part of the discussion for range joins, this is a nice start to it? @Dandandan @comphead @my-vegetable-has-exploded might be interested

@alamb
Copy link
Contributor

alamb commented Jul 7, 2025

I will try and review this Pr later this week

@comphead
Copy link
Contributor

comphead commented Jul 7, 2025

Thanks @jonathanc-n let me first get familiar with this kind of join

let left_indices = (0..left_size as u64).collect::<UInt64Array>();
let right_indices = (0..left_size)
.map(|idx| left_bit_map.get_bit(idx).then_some(0))
.collect::<UInt32Array>();
return (left_indices, right_indices);
}
let left_indices = if join_type == JoinType::LeftSemi {
let left_indices = if join_type == JoinType::LeftSemi
|| (join_type == JoinType::RightSemi && piecewise)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so piecewise works only together with RightSemi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it falls through with left semi as well. In left and right semi/anti/mark join we use the bitmap to mark all matched sides on the buffered side (this is done in process_unmatched_buffered_batch), we use the flag to only allow right semi/anti/mark to follow through when calling from the piecewise join. Usually the bitmap is only used to mark the unmatched rows on left side, which is why it originally only holds support for Left semi/anti/mark. I'll add a comment at the beginning of process unmatched buffered batch to explain this.

@alamb
Copy link
Contributor

alamb commented Aug 7, 2025

Thanks @jonathanc-n -- unfortunately I am not likely to have time to review this as my focus hasn't been on the join implementation

@comphead do you know anyone who is more focused joins these days who might be able to help review this?

@comphead
Copy link
Contributor

comphead commented Aug 7, 2025

Hey, sorry I missed this, this join is quite interesting concept, I'm planning to finish review #16996 this week, and switch to this PR next.

@2010YOUY01
Copy link
Contributor

Thanks for this great work, as always! I got some high-level questions.

Is it possible that IE Join could be faster for the same workload where PMG performs well? Should we implement IE Join directly? (I’m willing to help review or implement it at this point)

There was an old attempt (#12754), but unfortunately it didn’t get reviewed at the time.

@jonathanc-n
Copy link
Contributor Author

Yes PMG should perform better than IE join. they are used to tackle different things regardless. IE joins are used on multi range while PMG is for a single range. You can see that DuckDB does the same thing here https://github.com/duckdb/duckdb/tree/main/src/execution/operator/join

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great doc explaining the high level ideas! I took a read and left some comments on that, looking forward to your feedbacks.

I'll continue and review the implementations soon.

/// predicate.
///
/// # Execution Plan Inputs
/// For `PiecewiseMergeJoin` we label all left inputs as the `streamed' side and the right outputs as the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now in DF all join operators seem to use left side as the buffer side 🤔 we should follow this convention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes this should be done, I had mixed them up in my head. Is it fine to do this in a follow up pull request? Shouldn't be too big of an issue for review as the inputs are labelled as streamed and buffered throughout the code.

/// For `PiecewiseMergeJoin` we label all left inputs as the `streamed' side and the right outputs as the
/// 'buffered' side.
///
/// `PiecewiseMergeJoin` takes a sorted input for the side to be buffered and is able to sort streamed record
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a clarification, this means for the buffer side: its input is a SortExec, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

/// For Left Semi, Anti, and Mark joins we swap the inputs so that the marked side is on the buffered side.
///
/// Here is an example:
/// We perform a `JoinType::Left` with these two batches and the operator being `Operator::Lt`(<). Because
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line's join type seems a typo? It should be one of the existence join.

One suggestion to make this example even clearer is to include the join expression, such as t1 LEFT JOIN t2 ON (t1.c1 < t2.c1), and also mark the column names in the figures.

@2010YOUY01
Copy link
Contributor

Please bear with me — I might need more time to digest and outline the tasks required to get this great operator merged. Here are some updates.

I looked through the implementation, I think the high-level idea is great! It's definitely efficient for joins with IE predicate.

# High-level control flow for PiecewiseMergeJoin

Buffer all buffer-side batches

for probe_batch in probe_side_input:
	sort(probe_batch)
    for probe_row in probe_batch:
		do linear scan in the buffer side to find the pivot
		(since both side is sorted, we can remeber the previous position in buffer side, only 1 scan is needed in both side!)
		output (probe_row x range_in_buffer_side(like [0, pivot]))

Question

Theoretically, as long as both the probe and build sides are sorted on the compare key (regardless of ASC or DESC), this operator should be able to execute, right?
I think the current implementation is designed to enforce a certain ordering according to the inequality join predicate (by inserting a SortExec during planning).
However, the buffer-side input might have the opposite order. For example, PMJ requires t1 to be ORDER BY c1 ASC, but the existing order might be ORDER BY c1 DESC.

It would be more efficient to preserve any existing order, though it would require more logic in the implementation, which can be hard to wrap your head around. I’ll think about whether there’s a simpler way to implement this idea.

Suggestion

I think the biggest problem in the implementation right now is that it might be buffering too many output results.
In the ProcessStreamBatch state, it is currently buffering all the pairs in buffered_batches × single_right_batch that pass the join predicate. I think we should change it so that it can yield output incrementally.

At the moment, this operator only tracks the memory size of all buffered batches, so the extra memory usage must be around constant * single_batch_mem_size.
For single_probe_side_row × buffered_batches, the extra memory usage for materializing the output result can be O(buffer_side_total_mem_size). For instance, if there are 100k rows in the buffer side and the join predicate is not very selective (e.g., 50%), joining it with a single probe-side row will output a 50k-row batch.

One approach to solve this is to add some state management after joining a single row in the probe side: for example, if the join result is probe_row × buffered_batch[0..pivot], we can use a state to incrementally process this range, up to 8192 rows at a time, and put the final result inside the output buffer. Once the output buffer reaches 8192 rows, eagerly output it. Possibly it can jump to a new state for incremental output.

This util can be helpful: https://docs.rs/arrow/latest/arrow/compute/struct.BatchCoalescer.html

Summary

I still need some time to understand and think about the existence join cases, but so far I suggest to include those two changes in the initial PR, I think they would be very hard to do as follow-up patches (needs significant structural changes)

  1. Incremental result output mentioned above
  2. Support additional join predicates feat: Support PiecewiseMergeJoin to speed up single range predicate joins #16660 (comment)

@jonathanc-n
Copy link
Contributor Author

Theoretically, as long as both the probe and build sides are sorted on the compare key (regardless of ASC or DESC), this operator should be able to execute, right? I think the current implementation is designed to enforce a certain ordering according to the inequality join predicate (by inserting a SortExec during planning). However, the buffer-side input might have the opposite order. For example, PMJ requires t1 to be ORDER BY c1 ASC, but the existing order might be ORDER BY c1 DESC.

I think this is a good idea, the logic may be a bit difficult for someone to look into the implementation to follow.

@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Aug 11, 2025

Support additional join predicates #16660 (comment)

I dont think this should be done. I looked into it, and the overhead + complexity brought by checking selectivity just for a workload which is very unlikely (equijoin filters less than the single filter). DuckDB also doesnt support this if that is an indicator at all.

@2010YOUY01
Copy link
Contributor

Support additional join predicates #16660 (comment)

I dont think this should be done. I looked into it, and the overhead + complexity brought by checking selectivity just for a workload which is very unlikely (equijoin filters less than the single filter). DuckDB also doesnt support this if that is an indicator at all.

Perhaps I didn't express it clearly -- the idea was not to check the selectivity and reorder the filters, it's always evaluate the primary join predicate first (equality for HJ, and single IE for PMJ), then support optional remaining ANDed filters:
let's say we're doing

 SELECT *
          FROM generate_series(10000) AS t1(v1)
          JOIN generate_series(10000) AS t2(v1)
          ON ((t1.v1 + t2.v1) % 1000 = 0) AND (t1.v1 > t2.v1);

Then it should be executed as a PMJ with IE predicate t1.v1 > t2.v1, after this IE predicate is evaluated, PMJ operator can support continue filtering with the remaining ANDed filter (t1.v1 + t2.v1) % 1000 = 0)

The SMJ and HJ in DataFusion are all implemented this way, they're fusing the general join conditions into the operator, however DuckDB is breaking this post-filtering step into a separate filter. Probably DuckDB approach is a good idea to simplify the join operators.

Example: DuckDB is evaluating the remaining join filter outside the join operator, however DF is evaluating it inside the join operator

D explain SELECT *
          FROM generate_series(10000) AS t1(v1)
          JOIN generate_series(10000) AS t2(v1)
          ON ((t1.v1 + t2.v1) % 1000 = 0) AND (t1.v1 > t2.v1);

┌─────────────────────────────┐
│┌───────────────────────────┐│
││       Physical Plan       ││
│└───────────────────────────┘│
└─────────────────────────────┘
┌───────────────────────────┐
│           FILTER          │
│    ────────────────────   │
│  (((v1 + v1) % 1000) = 0) │
│                           │
│        ~215472 Rows       │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│    PIECEWISE_MERGE_JOIN   │
│    ────────────────────   │
│      Join Type: INNER     │
│    Conditions: v1 > v1    ├──────────────┐
│                           │              │
│        ~215472 Rows       │              │
└─────────────┬─────────────┘              │
┌─────────────┴─────────────┐┌─────────────┴─────────────┐
│      GENERATE_SERIES      ││      GENERATE_SERIES      │
│    ────────────────────   ││    ────────────────────   │
│         Function:         ││         Function:         │
│      GENERATE_SERIES      ││      GENERATE_SERIES      │
│                           ││                           │
│        ~10001 Rows        ││        ~10001 Rows        │
└───────────────────────────┘└───────────────────────────┘
> explain SELECT *
          FROM generate_series(10000) AS t1(v1)
          JOIN generate_series(10000) AS t2(v1)
          ON (t1.v1=t2.v1) AND (t1.v1 > t2.v1);
+---------------+------------------------------------------------------------+
| plan_type     | plan                                                       |
+---------------+------------------------------------------------------------+
| physical_plan | ┌───────────────────────────┐                              |
|               | │    CoalesceBatchesExec    │                              |
|               | │    --------------------   │                              |
|               | │     target_batch_size:    │                              |
|               | │            8192           │                              |
|               | └─────────────┬─────────────┘                              |
|               | ┌─────────────┴─────────────┐                              |
|               | │        HashJoinExec       │                              |
|               | │    --------------------   ├──────────────┐               |
|               | │       on: (v1 = v1)       │              │               |
|               | └─────────────┬─────────────┘              │               |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │    CoalesceBatchesExec    ││    CoalesceBatchesExec    │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │     target_batch_size:    ││     target_batch_size:    │ |
|               | │            8192           ││            8192           │ |
|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │      RepartitionExec      ││      RepartitionExec      │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │ partition_count(in->out): ││ partition_count(in->out): │ |
|               | │          14 -> 14         ││          14 -> 14         │ |
|               | │                           ││                           │ |
|               | │    partitioning_scheme:   ││    partitioning_scheme:   │ |
|               | │      Hash([v1@0], 14)     ││      Hash([v1@0], 14)     │ |
|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │      RepartitionExec      ││      RepartitionExec      │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │ partition_count(in->out): ││ partition_count(in->out): │ |
|               | │          1 -> 14          ││          1 -> 14          │ |
|               | │                           ││                           │ |
|               | │    partitioning_scheme:   ││    partitioning_scheme:   │ |
|               | │    RoundRobinBatch(14)    ││    RoundRobinBatch(14)    │ |
|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │       ProjectionExec      ││       ProjectionExec      │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │         v1: value         ││         v1: value         │ |
|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
|               | │       LazyMemoryExec      ││       LazyMemoryExec      │ |
|               | │    --------------------   ││    --------------------   │ |
|               | │     batch_generators:     ││     batch_generators:     │ |
|               | │ generate_series: start=0, ││ generate_series: start=0, │ |
|               | │    end=10000, batch_size  ││    end=10000, batch_size  │ |
|               | │           =8192           ││           =8192           │ |
|               | └───────────────────────────┘└───────────────────────────┘ |
|               |                                                            |
+---------------+------------------------------------------------------------+
1 row(s) fetched.
Elapsed 0.005 seconds.

@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Aug 12, 2025

@2010YOUY01 Wouldn't you need to use a cost model to estimate which one to use though when both are viable? For example, the Hash Join (do the equi-condition then the residual filter) vs. PWMJ (do the filter, then the equi condition residual). You could estimate the selectivity for the equi predicate vs. residual predicate, factor in whether the key is sorted, etc. for making the decision. Sorry if i misinterpreted this, thanks for bearing with me.

@2010YOUY01
Copy link
Contributor

@2010YOUY01 Wouldn't you need to use a cost model to estimate which one to use though when both are viable? For example, the Hash Join (do the equi-condition then the residual filter) vs. PWMJ (do the filter, then the equi condition residual). You could estimate the selectivity for the equi predicate vs. residual predicate, factor in whether the key is sorted, etc. for making the decision. Sorry if i misinterpreted this, thanks for bearing with me.

Ah, I get it now. How about using the following simple heuristic:

If predicate contains equality check: e.g. (t1.c1 = t1.c1) AND (t1.c2 > t2.c2) --> Hash Join
Else if predicate contains inequality check: e.g.(t1.c1 > t1.c1) AND ((t1.c2 + t2.c2)%10 = 1) --> PWMJ
Otherwise --> NLJ

I was thinking PWMJ should cover more cases originally handled by NLJ, since in the general case it should be faster than NLJ. I don't want to implement some cost model beyond the above rule at the moment.

@jonathanc-n
Copy link
Contributor Author

Yes I think so too, I don't know if it will be worth the complexity though since this is a very niche workload (single range filter + higher selectivity for filter)

I think the plan for now is that I can implement support for AND expressions in this pull request. But when we include the planner changes in a follow up PR we can discuss this there.

@2010YOUY01
Copy link
Contributor

Yes I think so too, I don't know if it will be worth the complexity though since this is a very niche workload (single range filter + higher selectivity for filter)

I think the plan for now is that I can implement support for AND expressions in this pull request. But when we include the planner changes in a follow up PR we can discuss this there.

I think in-the-wild join workloads most commonly involve lengthy ANDed expressions, so it’s indeed challenging to make a smart planner.
It’s a good idea to have this PR focus on execution, and handle the planning in another PR.

@comphead
Copy link
Contributor

@jonathanc-n please correct my understanding of PMJ join, its fairly new to me.

The PiecewiseMergeJoin is specifically designed for scenarios with only one range filter using operators like <, <=, >, and > >=. It achieves significant performance improvements by:

Buffering one side: The right side (buffered) is loaded into memory and must be sorted
Streaming the other side: The left side (streamed) is processed incrementally and sorted during executions

On a separate note would that possible to find a formula to calculate cost ? Reg to https://cs186berkeley.net/resources/static/notes/n09-Joins.pdf
for SMJ it is

average I/O cost is: cost to sort R + cost to sort S +
([R] + [S]) (though it is important to note that this is not the worst case!). In the worst case, if
each record of R matches every record of S, the last term becomes |R|∗[S]. The worst case cost is
then: cost to sort R + cost to sort S + ([R] + |R|∗[S]). That generally doesn’t happen, though).

for simple case NLJ, without optimizations(no left prebuffering, lookup S on every row from R)

The I/O cost of this would then be [R]+|R|[S],
where [R] is the number of pages in R and |R| is the number of records in R

Having a cost would give people more understanding the benefits of using PMJ

get_final_indices_from_shared_bitmap(visited_left_side, self.join_type);

let (left_side, right_side) = get_final_indices_from_shared_bitmap(
visited_left_side,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visited_left_side awesome name

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jonathanc-n I made some initial review, thanks for comments, again you and @2010YOUY01 saved reviewing hours by commenting on the logic.

Before going forward we def need to include PMJ
into fuzz testing join_fuzz.rs, do you think it can be also tested by slt?

// Tracks the state of the `PiecewiseMergeJoin`
state: PiecewiseMergeJoinStreamState,
// Flag for whehter or not the join_type is an existence join.
existence_join: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need this flag? it can be calculated on fly, or would it be too expensive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not expensive, but it'd just make more sense to use as the calculation is done everywhere + you get that little bit of speed up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to calculate it on the fly (or at least point to the util function in the comment), I think it's easier to follow the logic, and it can't be the bottleneck.

@jonathanc-n
Copy link
Contributor Author

jonathanc-n commented Aug 20, 2025

@comphead @2010YOUY01 I have added incremental processing, left/right sides have been swapped to correspond to buffer/streamed, performance cost docs, and diagrams in the tests to verify correctness easier. I will make the incremental processing batch limit to be configurable as a runtime config once planner is implemented.

I will do the following as follow-up PRs + put it in an EPIC issue (this pull request was getting quite large and cluttered, probably better to merge this for now if it looks ready):

  • Fuzz test
  • Planner + AND predicate handling -> SLT tests (i'd like to do the AND expression + planner together as right now Datafusion doesn't have too good of a method to separate IE predicates and equal predicates)
  • Mark joins
  • Serialization

Thank you very much so far for the reviews, really appreciate it.

@2010YOUY01
Copy link
Contributor

Awesome! It's on my list.

To ensure the correctness, I recommend to write a POC planner first to let this operator be able to run through SQL interface, and ensure extended test passes. (planner part is expected to be made in a separate PR)

INCLUDE_SQLITE=true cargo test --profile release-nonlto --test sqllogictests

I think the strongest test for join edge cases so far is the sqlite test suite -- From my recent PR, it passed all DF test+fuzz test, but the sqlite test suite found 3 additional bugs.

@jonathanc-n
Copy link
Contributor Author

@2010YOUY01 I'll verify if it works on my local. Then make a pull request after this merge

Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have went through part of it and left some suggestions. Will continue soon.

/// - |R|, |S|: number of tuples in `R` and `S`
/// - `B`: number of buffer pages
///
/// # Performance (cost)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this cost model is not correct -- In ancient times because disk are super slow, so the number of page fetches is used to model the performance. However today for OLAP systems, the bottleneck has shifted to the CPU, so I think it's better to use the work done by CPU to model the performance.
e.g.
NLJ cost = buffer-side-scan * probe-side-row-count
PWMJ cost = buffer-side-scan * probe-side-batch-count

// For left existence joins the inputs will be swapped so the sort
// options are switched
if is_right_existence_join(join_type) {
SortOptions::new(false, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it defines a null order, I think those nulls should be special handled later, let's add a comment to point to the location that those nulls are handled.

// Tracks the state of the `PiecewiseMergeJoin`
state: PiecewiseMergeJoinStreamState,
// Flag for whehter or not the join_type is an existence join.
existence_join: bool,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to calculate it on the fly (or at least point to the util function in the comment), I think it's easier to follow the logic, and it can't be the bottleneck.

};

/// Batch emits this number of rows when processing
pub const DEFAULT_INCREMENTAL_BATCH_VALUE: usize = 8192;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to use the batch_size configuration from the context

@2010YOUY01
Copy link
Contributor

I tend to think it's better to include the planner part into the initial PR, the reason is if we do it in two steps, the executor can be incompatible with other operators, so the follow-on PR would also have a large diff.
e.g. I think projections are required (like left input has 2 columns a, b, right input has 2 columns c, d, the output might only contain a, c, since b and d are only used to evaluate join condition but not required in the output), but it's not implemented now.
Also, if we have SQL interface to run, there are many existing test cases to cover it, which make it easier to get merged.

To make this task easier we want to shrink this PR, here are some ideas

  • Some preparations to setup the planner can be split to individual PRs?
  • I think the execution logic for existence joins (semi/anti/mark) is fundamentally different from traditional joins. It might be cleaner to split them into a separate stream implementation -- using a unified execution path can make the state management complex. For the initial PR, we could focus on including only one.
PiecewiseMergeJoinExec
--existence-join?--> ExistencePWMJStream
--not-existence-join?-->TraditionalPWMJStream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants