Skip to content

Commit 1206019

Browse files
Rewrite Nested Loop Join executor for 5× speed and 1% memory usage (#16996)
* Rewrite NestedLoopJoin for better performance and memory efficiency --------- Co-authored-by: Matt Butrovich <[email protected]>
1 parent 5c370fa commit 1206019

File tree

8 files changed

+1634
-772
lines changed

8 files changed

+1634
-772
lines changed

benchmarks/README.md

Lines changed: 47 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -440,37 +440,6 @@ Your benchmark should create and use an instance of `BenchmarkRun` defined in `b
440440

441441
The output of `dfbench` help includes a description of each benchmark, which is reproduced here for convenience.
442442

443-
## Cancellation
444-
445-
Test performance of cancelling queries.
446-
447-
Queries in DataFusion should stop executing "quickly" after they are
448-
cancelled (the output stream is dropped).
449-
450-
The queries are executed on a synthetic dataset generated during
451-
the benchmark execution that is an anonymized version of a
452-
real-world data set.
453-
454-
The query is an anonymized version of a real-world query, and the
455-
test starts the query then cancels it and reports how long it takes
456-
for the runtime to fully exit.
457-
458-
Example output:
459-
460-
```
461-
Using 7 files found on disk
462-
Starting to load data into in-memory object store
463-
Done loading data into in-memory object store
464-
in main, sleeping
465-
Starting spawned
466-
Creating logical plan...
467-
Creating physical plan...
468-
Executing physical plan...
469-
Getting results...
470-
cancelling thread
471-
done dropping runtime in 83.531417ms
472-
```
473-
474443
## ClickBench
475444

476445
The ClickBench[1] benchmarks are widely cited in the industry and
@@ -741,3 +710,50 @@ For example, to run query 1 with the small data generated above:
741710
```bash
742711
cargo run --release --bin dfbench -- h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/window.sql --query 1
743712
```
713+
714+
# Micro-Benchmarks
715+
716+
## Nested Loop Join
717+
718+
This benchmark focuses on the performance of queries with nested loop joins, minimizing other overheads such as scanning data sources or evaluating predicates.
719+
720+
Different queries are included to test nested loop joins under various workloads.
721+
722+
### Example Run
723+
724+
```bash
725+
# No need to generate data: this benchmark uses table function `range()` as the data source
726+
727+
./bench.sh run nlj
728+
```
729+
730+
## Cancellation
731+
732+
Test performance of cancelling queries.
733+
734+
Queries in DataFusion should stop executing "quickly" after they are
735+
cancelled (the output stream is dropped).
736+
737+
The queries are executed on a synthetic dataset generated during
738+
the benchmark execution that is an anonymized version of a
739+
real-world data set.
740+
741+
The query is an anonymized version of a real-world query, and the
742+
test starts the query then cancels it and reports how long it takes
743+
for the runtime to fully exit.
744+
745+
Example output:
746+
747+
```
748+
Using 7 files found on disk
749+
Starting to load data into in-memory object store
750+
Done loading data into in-memory object store
751+
in main, sleeping
752+
Starting spawned
753+
Creating logical plan...
754+
Creating physical plan...
755+
Executing physical plan...
756+
Getting results...
757+
cancelling thread
758+
done dropping runtime in 83.531417ms
759+
```

benchmarks/bench.sh

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ imdb: Join Order Benchmark (JOB) using the IMDB dataset conver
124124
125125
# Micro-Benchmarks (specific operators and features)
126126
cancellation: How long cancelling a query takes
127+
nlj: Benchmark for simple nested loop joins, testing various join scenarios
127128
128129
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
129130
Supported Configuration (Environment Variables)
@@ -196,6 +197,7 @@ main() {
196197
data_clickbench_1
197198
data_clickbench_partitioned
198199
data_imdb
200+
# nlj uses range() function, no data generation needed
199201
;;
200202
tpch)
201203
data_tpch "1"
@@ -298,6 +300,10 @@ main() {
298300
# same data as for tpch
299301
data_tpch "1"
300302
;;
303+
nlj)
304+
# nlj uses range() function, no data generation needed
305+
echo "NLJ benchmark does not require data generation"
306+
;;
301307
*)
302308
echo "Error: unknown benchmark '$BENCHMARK' for data generation"
303309
usage
@@ -354,6 +360,7 @@ main() {
354360
run_h2o_join "BIG" "PARQUET" "join"
355361
run_imdb
356362
run_external_aggr
363+
run_nlj
357364
;;
358365
tpch)
359366
run_tpch "1" "parquet"
@@ -458,6 +465,9 @@ main() {
458465
topk_tpch)
459466
run_topk_tpch
460467
;;
468+
nlj)
469+
run_nlj
470+
;;
461471
*)
462472
echo "Error: unknown benchmark '$BENCHMARK' for run"
463473
usage
@@ -1085,6 +1095,14 @@ run_topk_tpch() {
10851095
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" --limit 100 ${QUERY_ARG}
10861096
}
10871097

1098+
# Runs the nlj benchmark
1099+
run_nlj() {
1100+
RESULTS_FILE="${RESULTS_DIR}/nlj.json"
1101+
echo "RESULTS_FILE: ${RESULTS_FILE}"
1102+
echo "Running nlj benchmark..."
1103+
debug_run $CARGO_COMMAND --bin dfbench -- nlj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG}
1104+
}
1105+
10881106

10891107
compare_benchmarks() {
10901108
BASE_RESULTS_DIR="${SCRIPT_DIR}/results"

benchmarks/src/bin/dfbench.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
3333
#[global_allocator]
3434
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
3535

36-
use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, sort_tpch, tpch};
36+
use datafusion_benchmarks::{cancellation, clickbench, h2o, imdb, nlj, sort_tpch, tpch};
3737

3838
#[derive(Debug, StructOpt)]
3939
#[structopt(about = "benchmark command")]
@@ -42,6 +42,7 @@ enum Options {
4242
Clickbench(clickbench::RunOpt),
4343
H2o(h2o::RunOpt),
4444
Imdb(imdb::RunOpt),
45+
Nlj(nlj::RunOpt),
4546
SortTpch(sort_tpch::RunOpt),
4647
Tpch(tpch::RunOpt),
4748
TpchConvert(tpch::ConvertOpt),
@@ -57,6 +58,7 @@ pub async fn main() -> Result<()> {
5758
Options::Clickbench(opt) => opt.run().await,
5859
Options::H2o(opt) => opt.run().await,
5960
Options::Imdb(opt) => Box::pin(opt.run()).await,
61+
Options::Nlj(opt) => opt.run().await,
6062
Options::SortTpch(opt) => opt.run().await,
6163
Options::Tpch(opt) => Box::pin(opt.run()).await,
6264
Options::TpchConvert(opt) => opt.run().await,

benchmarks/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub mod cancellation;
2020
pub mod clickbench;
2121
pub mod h2o;
2222
pub mod imdb;
23+
pub mod nlj;
2324
pub mod sort_tpch;
2425
pub mod tpch;
2526
pub mod util;

0 commit comments

Comments
 (0)