-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Rewrite Nested Loop Join executor for 5× speed and 1% memory usage #16996
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Draft PR and early discussions: #16889 Thanks @UBarney @ding-young and @jonathanc-n for the help 🙏🏼 |
01)NestedLoopJoinExec: join_type=Inner, filter=a@0 < x@1 | ||
02)--DataSourceExec: partitions=1, partition_sizes=[0] | ||
03)--SortExec: expr=[x@0 ASC NULLS LAST], preserve_partitioning=[false] | ||
01)SortExec: expr=[x@3 ASC NULLS LAST], preserve_partitioning=[false] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the PR write-up's 'breaking change' section for the reason.
Memory Usage Benchmarkcurrent
nlj-rewrite
(reproducer branch & comments to be added) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @2010YOUY01 this is a great PR, I'm planning to have a look on next week
original_left_array.as_ref(), | ||
build_side_index, | ||
)?; | ||
scalar_value.to_array_of_size(filtered_probe_batch.num_rows())? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be neat if we can avoid generating this array and directly apply the filter based on the scalar, rather than converting to an array of the same size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was expecting this optimization to bring > 10% speedup to the NLJ micro-bench, based on the flamegraphs.
Though I think this change requires some transformation to JoinFilter
, and we don't have such utility function yet. I'll open an issue to track this idea later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but it only gets slightly faster, I think this optimization might not worth the extra complexity.
The reason for not getting significantly faster:
- This step is not the bottleneck (for this simple NLJ micro-bench, now the actual bottleneck is evaluating the expression, especially modulo)
- I used a utility in the optimizer to rewrite the join filter, this step is slow (6% in total time)
Benchmark result:
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ nlj-rewrite ┃ optimize-join-filter ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1 │ 87.14 ms │ 83.12 ms │ no change │
│ QQuery 2 │ 118.03 ms │ 110.51 ms │ +1.07x faster │
│ QQuery 3 │ 164.04 ms │ 160.60 ms │ no change │
│ QQuery 4 │ 373.67 ms │ 358.43 ms │ no change │
│ QQuery 5 │ 232.66 ms │ 259.75 ms │ 1.12x slower │
│ QQuery 6 │ 1708.19 ms │ 1633.82 ms │ no change │
│ QQuery 7 │ 236.27 ms │ 257.24 ms │ 1.09x slower │
│ QQuery 8 │ 1710.80 ms │ 1634.83 ms │ no change │
│ QQuery 9 │ 270.86 ms │ 261.67 ms │ no change │
│ QQuery 10 │ 499.70 ms │ 493.96 ms │ no change │
└──────────────┴─────────────┴──────────────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary ┃ ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (nlj-rewrite) │ 5401.37ms │
│ Total Time (optimize-join-filter) │ 5253.94ms │
│ Average Time (nlj-rewrite) │ 540.14ms │
│ Average Time (optimize-join-filter) │ 525.39ms │
│ Queries Faster │ 1 │
│ Queries Slower │ 2 │
│ Queries with No Change │ 7 │
│ Queries with Failure │ 0 │
└─────────────────────────────────────┴───────────┘
My experiment branch: https://github.com/2010YOUY01/arrow-datafusion/tree/optimize-join-filter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried but it only gets slightly faster, I think this optimization might not worth the extra complexity.
The reason for not getting significantly faster:
- This step is not the bottleneck (for this simple NLJ micro-bench, now the actual bottleneck is evaluating the expression, especially modulo)
- I used a utility in the optimizer to rewrite the join filter, this step is slow (6% in total time)
Benchmark result:
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓ ┃ Query ┃ nlj-rewrite ┃ optimize-join-filter ┃ Change ┃ ┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩ │ QQuery 1 │ 87.14 ms │ 83.12 ms │ no change │ │ QQuery 2 │ 118.03 ms │ 110.51 ms │ +1.07x faster │ │ QQuery 3 │ 164.04 ms │ 160.60 ms │ no change │ │ QQuery 4 │ 373.67 ms │ 358.43 ms │ no change │ │ QQuery 5 │ 232.66 ms │ 259.75 ms │ 1.12x slower │ │ QQuery 6 │ 1708.19 ms │ 1633.82 ms │ no change │ │ QQuery 7 │ 236.27 ms │ 257.24 ms │ 1.09x slower │ │ QQuery 8 │ 1710.80 ms │ 1634.83 ms │ no change │ │ QQuery 9 │ 270.86 ms │ 261.67 ms │ no change │ │ QQuery 10 │ 499.70 ms │ 493.96 ms │ no change │ └──────────────┴─────────────┴──────────────────────┴───────────────┘ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓ ┃ Benchmark Summary ┃ ┃ ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩ │ Total Time (nlj-rewrite) │ 5401.37ms │ │ Total Time (optimize-join-filter) │ 5253.94ms │ │ Average Time (nlj-rewrite) │ 540.14ms │ │ Average Time (optimize-join-filter) │ 525.39ms │ │ Queries Faster │ 1 │ │ Queries Slower │ 2 │ │ Queries with No Change │ 7 │ │ Queries with Failure │ 0 │ └─────────────────────────────────────┴───────────┘
My experiment branch: https://github.com/2010YOUY01/arrow-datafusion/tree/optimize-join-filter
Very nice, thanks for trying.
I wonder if we can try with a different expression, e.g. greater than?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For such simple expressions, it's 20% faster on top of this PR. I think we should include this optimization as a follow-up.
After it, the bottleneck shifts to filter + concating the final output, this TODO might help
https://github.com/apache/arrow-rs/blob/a9b6077b2d8d5b5aee0e97e7d4335878e8da1876/arrow-select/src/coalesce.rs#L206
Hi @2010YOUY01 Thanks for creating this amazing PR and for the detailed explanation on why we don't need to maintain the right_side order. This is a great optimization! Just to provide some context for future collaboration, this approach (joining only one left row at a time) was actually the initial direction I considered. Critically, I was hesitant to proceed because I wanted to avoid making what seemed like a potential breaking change without getting confirmation. I believe we should be very cautious about such changes. That's precisely why I raised this concern and @-mentioned you in my comment , seeking clarification. Since I didn't get a response, I proceeded with the alternative implementation to keep things moving. A quick clarification on that point back then would have saved the effort on my merged PR and allowed us to get to this optimal solution directly. No worries, the main thing is we've landed the best solution for the project. Just thought this feedback on the process could be helpful for our future interactions. Great work on this! |
@UBarney I'm so sorry about that — I completely let that issue discussion slip through. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @2010YOUY01 I'll continue later, I'm out of my mental capacity :)
Appreciate for the comments it was much easier to navigate.
Good sign we got fuzz testing working with the new implementation.
For the post filtering it would be probably possible to split filter stage evalulation for certain types of join.
Inner, outer joins filter can be evaluated early whereas SEMI, ANTI on late stage like now.
/// unmatched build-side data. | ||
/// ## 1. Buffering Left Input | ||
/// - The operator eagerly buffers all left-side input batches into memory, | ||
/// unless a memory limit is reached (see 'Memory-limited Execution'). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would happen if memory limit is reached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the comment to explain it.
pub(crate) join_metrics: BuildProbeJoinMetrics, | ||
|
||
/// `batch_size` from configuration | ||
cfg_batch_size: usize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use just batch_size
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually want to enforce this naming style at the project scope.
For rare configuration options that has been propagated many layers to some utility function, it would be easier to figure out the purpose of such argument by prefixing cfg_
, usually in the config page we can find a more detailed explanation.
We can open issue, and write a style guide for it, WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be good idea to standardize naming, although it is typically a painful process. IMO I'd prefer batch_size
for now as people got used to it and it would be a confusion for community to understand if there is any diff between batch_size
and cfg_batch_size
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, let's use batch_size
for now.
I really appreciate your detailed review. I have addressed them in 4c111c3 BTW, I take code understandability very seriously, and I’m happy to make any small changes that improve it. I encourage others to do similar reviews -- point out anything that doesn’t make sense and to share even small nitpicks.
I don't get this changing filter evaluation time idea, could you elaborate? |
As far as I understood the filter evaluation happens in post phase, after tuples joined by key and this makes a lot of sense especially for SEMI, ANTI joins where you have to track filter eval results for the same key across all batches. For example
So the right record batch you can evaluate filter |
Btw Would be that possible to calculate the cost of the join like in https://www.youtube.com/watch?v=RcEW0P8iVTc The video shows multiple implementations for NLJ and how to calculate the cost and describe pseudo code, it would be super useful for community and further improvements. From what I understood, the left side scanned once, and entirely saved in memory, what about right scans? |
I see, but this step should be handled below the join operator by filter pushdown:
Perhaps it's better to rename the variable name from I'm not sure if such filter pushdowns would be incorrect for SEMI/ANTI joins, I'll double check that. |
Yes, that's exactly the idea for the future memory-limited NLJ implementation -- for each buffered left batches (under memory limit), do 1 round of right scan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @2010YOUY01 I think this PR is good the way it is.
Given the change I would probably ask another pair of eyes, like @jonathanc-n who is also working on join in #16660
// PROPERTIES: | ||
// Operator's properties that remain constant | ||
// | ||
// Note: The implementation uses the terms left/inner/build-side table and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with this. "Inner" and "outer" table, in particular for a nested loop join, have fairly strict definitions. We should not confuse the issue by referring to what is historically referred to as the outer table as the "inner" table in this implementation. It makes it harder for contributors to understand the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. What do you think about using only the terms left/build and right/probe in the implementation?
I think build and probe are unambiguous, and in DataFusion, left == build is a convention: it always swaps the smaller table to the left as the build side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I think we should just keep it as left/build + right/probe. I can see the use of referring as it to inner/outer but it gets confusing because in academics (as @mbutrovich says), the term 'outer table' is the table which drives the outer loop. despite this, I've noticed that people do associate the left table to be the inner one, so just removing that ambiguity is probably the best idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Snowflake flips the convention and calls the left table "inner" which confuses the issue, regardless of physical join implementation. I think left/build and right/probe is consistent and fine, even though build and probe don't have as much meaning in a NLJ. I just was concerned about inner and outer getting confused particularly for a NLJ. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in 8cc3654. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just feedback on the comments, but I'd like a discussion before merging. The implementation looked clean to me. Nicely done!
I'll try to take a look today |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lgtm, thank you @2010YOUY01. nice clean design!
There are no new changes included. The speedup reaches 5× simply because the NLJ micro-benchmark is extended with cases where the join predicate is very cheap to evaluate (see #16996 (comment)), and such cases favor the new implementation more. I changed the title to make it more click-baity 🤣 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the comments and variable names. This LGTM now.
Thanks @2010YOUY01 @mbutrovich @jonathanc-n @UBarney @ding-young for the fantastic team work. I'm going to merge this PR in the end of the day |
Can someone please run the extended tests on this branch? I am pretty sure that the extended tests are failing against this branch but I am having issues with git locally so I'd like confirmation. Edit: you'll likely have to merge latest main to get the latest datafusion-testing pin and update your submodules to make sure they reflect that pin. Then
|
Here is an example of a query that passes in main and fails on this branch: CREATE TABLE tab0(col0 INTEGER, col1 INTEGER, col2 INTEGER);
CREATE TABLE tab1(col0 INTEGER, col1 INTEGER, col2 INTEGER);
CREATE TABLE tab2(col0 INTEGER, col1 INTEGER, col2 INTEGER);
INSERT INTO tab0 VALUES(97,1,99);
INSERT INTO tab0 VALUES(15,81,47);
INSERT INTO tab0 VALUES(87,21,10);
INSERT INTO tab1 VALUES(51,14,96);
INSERT INTO tab1 VALUES(85,5,59);
INSERT INTO tab1 VALUES(91,47,68);
INSERT INTO tab2 VALUES(64,77,40);
INSERT INTO tab2 VALUES(75,67,58);
INSERT INTO tab2 VALUES(46,51,23);
SELECT DISTINCT 32 AS col2 FROM tab0 AS cor0 LEFT OUTER JOIN tab2 AS cor1 ON ( NULL ) IS NULL;
Arrow error: Invalid argument error: must either specify a row count or at least one column There are a bunch of other failures where the result count between main and this branch do not match. I verified a bunch of the queries do have NestedLoopJoinExec in the explain result. |
Thanks for the reporting, I'll work on that today. I have ran the extended test on a early version, those failures must be introduced by my recent changes 🤔 |
Run extended tests |
That was removed recently unfortunately. |
I have tested the sqlite test suite locally, and it's passing now.
This PR is ready to merge again. |
@2010YOUY01 I think you can run extended tests on your forked repo from |
Ah, yes. I forget pushing to private clone's main branch can also trigger it. I have started it: |
It is passed. I think its safe to merge now, thanks everyone@ |
Which issue does this PR close?
Rationale for this change
Summary
This PR rewrites the NLJ operator from scratch with a different approach, to limit the extra intermediate data overhead to only 1
RecordBatch
, and eliminate other redundant conversions in the old implementation.Using the NLJ micro-bench introduced inside this PR, this rewrite can introduce up to 5X speed-up, and uses only 1% memory in extreme cases.
Speed benchmark
Memory Usage Benchmark
Main
PR
Next Steps
I think it is ready for review, the major potential optimizations have all been done. Though there is one minor chore to be done:
Follow-ups
Why a Rewrite?
(TLDR: it's the easiest way to address the existing problem)
The original implementation performs a Cartesian product of (all-left-batches x right-batch), materializes that intermediate result for predicate evaluation, and then materializes the (potentially very large) final result all at once. This design is inherently inefficient, and although many patches have attempted to alleviate the problem, the fundamental issue remains.
A key challenge is that the original design and the ideal design (i.e., one that produces small intermediates during execution) are fundamentally different. As a result, it's practically impossible to make small incremental changes that fully address the inefficiency. These patches may also increase code complexity, making long-term maintenance more difficult.
Example of Prior Work
Here's a recent example of a small patch intended to improve the situation:
#16443
Even with careful engineering, I still feel the entropy in the code increases.
Since NLJ is a relatively straightforward operator, a full rewrite seemed worthwhile. This allows for a clean, simplified design focused on current goals—performance and memory efficiency—without being constrained by the legacy implementation.
What changes are included in this PR?
Implementation
The implementation/design doc can be found in the source code.
A brief comparison between old implementation and this PR:
Old implementation
all_buffered_left_batch x one_right_batch
) to calculate indicesThis PR
one_left_row x one_right_batch
), and do filtering, and output construction directly on this small intermediate.batch_size
thresholdOld v.s. PR
indices <--> batch
, and this PR can use the right batch directly. This avoids unnecessary transformations and make the implementation more cache-friendlyleft_buffered_batches_total_rows * right_batch_size (default 8192) * 12 Bytes (left and right indices are represented by uint64 and uint32)
, can be significant for large dataset.Are these changes tested?
Existing tests
Are there any user-facing changes?
The old implementation can maintain right input order in certain cases, this PR's new design is not able to maintain that property -- preserving it would require a different design, and it has significant performance and memory overhead.
If this property is important to some user, we can keep the old implementation (maybe rename to
RightOrderPreservingNLJ
and use a configuration to control it)