@@ -25,11 +25,18 @@ mod data_utils;
25
25
use crate :: criterion:: Criterion ;
26
26
use arrow:: array:: { ArrayRef , RecordBatch } ;
27
27
use arrow:: datatypes:: { DataType , Field , Fields , Schema } ;
28
+ use arrow_schema:: TimeUnit :: Nanosecond ;
28
29
use criterion:: Bencher ;
29
30
use datafusion:: datasource:: MemTable ;
30
31
use datafusion:: execution:: context:: SessionContext ;
32
+ use datafusion:: prelude:: DataFrame ;
31
33
use datafusion_common:: ScalarValue ;
32
- use datafusion_expr:: col;
34
+ use datafusion_expr:: Expr :: Literal ;
35
+ use datafusion_expr:: { cast, col, lit, not, try_cast, when} ;
36
+ use datafusion_functions:: expr_fn:: {
37
+ btrim, length, regexp_like, regexp_replace, to_timestamp, upper,
38
+ } ;
39
+ use std:: ops:: Rem ;
33
40
use std:: path:: PathBuf ;
34
41
use std:: sync:: Arc ;
35
42
use test_utils:: tpcds:: tpcds_schemas;
@@ -58,6 +65,150 @@ fn physical_plan(ctx: &SessionContext, rt: &Runtime, sql: &str) {
58
65
} ) ) ;
59
66
}
60
67
68
+ /// Build a dataframe for testing logical plan optimization
69
+ fn build_test_data_frame ( ctx : & SessionContext , rt : & Runtime ) -> DataFrame {
70
+ register_string_table ( ctx, 100 , 1000 ) ;
71
+
72
+ rt. block_on ( async {
73
+ let mut df = ctx. table ( "t" ) . await . unwrap ( ) ;
74
+ // add some columns in
75
+ for i in 100 ..150 {
76
+ df = df
77
+ . with_column ( & format ! ( "c{i}" ) , Literal ( ScalarValue :: Utf8 ( None ) , None ) )
78
+ . unwrap ( ) ;
79
+ }
80
+ // add in some columns with string encoded timestamps
81
+ for i in 150 ..175 {
82
+ df = df
83
+ . with_column (
84
+ & format ! ( "c{i}" ) ,
85
+ Literal ( ScalarValue :: Utf8 ( Some ( "2025-08-21 09:43:17" . into ( ) ) ) , None ) ,
86
+ )
87
+ . unwrap ( ) ;
88
+ }
89
+ // do a bunch of ops on the columns
90
+ for i in 0 ..175 {
91
+ // trim the columns
92
+ df = df
93
+ . with_column ( & format ! ( "c{i}" ) , btrim ( vec ! [ col( format!( "c{i}" ) ) ] ) )
94
+ . unwrap ( ) ;
95
+ }
96
+
97
+ for i in 0 ..175 {
98
+ let c_name = format ! ( "c{i}" ) ;
99
+ let c = col ( & c_name) ;
100
+
101
+ // random ops
102
+ if i % 5 == 0 && i < 150 {
103
+ // the actual ops here are largely unimportant as they are just a sample
104
+ // of ops that could occur on a dataframe
105
+ df = df
106
+ . with_column ( & c_name, cast ( c. clone ( ) , DataType :: Utf8 ) )
107
+ . unwrap ( )
108
+ . with_column (
109
+ & c_name,
110
+ when (
111
+ cast ( c. clone ( ) , DataType :: Int32 ) . gt ( lit ( 135 ) ) ,
112
+ cast (
113
+ cast ( c. clone ( ) , DataType :: Int32 ) - lit ( i + 3 ) ,
114
+ DataType :: Utf8 ,
115
+ ) ,
116
+ )
117
+ . otherwise ( c. clone ( ) )
118
+ . unwrap ( ) ,
119
+ )
120
+ . unwrap ( )
121
+ . with_column (
122
+ & c_name,
123
+ when (
124
+ c. clone ( ) . is_not_null ( ) . and (
125
+ cast ( c. clone ( ) , DataType :: Int32 )
126
+ . between ( lit ( 120 ) , lit ( 130 ) ) ,
127
+ ) ,
128
+ Literal ( ScalarValue :: Utf8 ( None ) , None ) ,
129
+ )
130
+ . otherwise (
131
+ when (
132
+ c. clone ( ) . is_not_null ( ) . and ( regexp_like (
133
+ cast ( c. clone ( ) , DataType :: Utf8View ) ,
134
+ lit ( "[0-9]*" ) ,
135
+ None ,
136
+ ) ) ,
137
+ upper ( c. clone ( ) ) ,
138
+ )
139
+ . otherwise ( c. clone ( ) )
140
+ . unwrap ( ) ,
141
+ )
142
+ . unwrap ( ) ,
143
+ )
144
+ . unwrap ( )
145
+ . with_column (
146
+ & c_name,
147
+ when (
148
+ c. clone ( ) . is_not_null ( ) . and (
149
+ cast ( c. clone ( ) , DataType :: Int32 )
150
+ . between ( lit ( 90 ) , lit ( 100 ) ) ,
151
+ ) ,
152
+ cast ( c. clone ( ) , DataType :: Utf8View ) ,
153
+ )
154
+ . otherwise ( Literal ( ScalarValue :: Date32 ( None ) , None ) )
155
+ . unwrap ( ) ,
156
+ )
157
+ . unwrap ( )
158
+ . with_column (
159
+ & c_name,
160
+ when (
161
+ c. clone ( ) . is_not_null ( ) . and (
162
+ cast ( c. clone ( ) , DataType :: Int32 ) . rem ( lit ( 10 ) ) . gt ( lit ( 7 ) ) ,
163
+ ) ,
164
+ regexp_replace (
165
+ cast ( c. clone ( ) , DataType :: Utf8View ) ,
166
+ lit ( "1" ) ,
167
+ lit ( "a" ) ,
168
+ None ,
169
+ ) ,
170
+ )
171
+ . otherwise ( Literal ( ScalarValue :: Date32 ( None ) , None ) )
172
+ . unwrap ( ) ,
173
+ )
174
+ . unwrap ( )
175
+ }
176
+ if i >= 150 {
177
+ df = df
178
+ . with_column (
179
+ & c_name,
180
+ try_cast (
181
+ to_timestamp ( vec ! [ c. clone( ) , lit( "%Y-%m-%d %H:%M:%S" ) ] ) ,
182
+ DataType :: Timestamp ( Nanosecond , Some ( "UTC" . into ( ) ) ) ,
183
+ ) ,
184
+ )
185
+ . unwrap ( )
186
+ . with_column ( & c_name, try_cast ( c. clone ( ) , DataType :: Date32 ) )
187
+ . unwrap ( )
188
+ }
189
+
190
+ // add in a few unions
191
+ if i % 30 == 0 {
192
+ let df1 = df
193
+ . clone ( )
194
+ . filter ( length ( c. clone ( ) ) . gt ( lit ( 2 ) ) )
195
+ . unwrap ( )
196
+ . with_column ( & format ! ( "c{i}_filtered" ) , lit ( true ) )
197
+ . unwrap ( ) ;
198
+ let df2 = df
199
+ . filter ( not ( length ( c. clone ( ) ) . gt ( lit ( 2 ) ) ) )
200
+ . unwrap ( )
201
+ . with_column ( & format ! ( "c{i}_filtered" ) , lit ( false ) )
202
+ . unwrap ( ) ;
203
+
204
+ df = df1. union_by_name ( df2) . unwrap ( )
205
+ }
206
+ }
207
+
208
+ df
209
+ } )
210
+ }
211
+
61
212
/// Create schema with the specified number of columns
62
213
fn create_schema ( column_prefix : & str , num_columns : usize ) -> Schema {
63
214
let fields: Fields = ( 0 ..num_columns)
@@ -180,13 +331,40 @@ fn register_union_order_table(ctx: &SessionContext, num_columns: usize, num_rows
180
331
ctx. register_table ( "t" , Arc :: new ( table) ) . unwrap ( ) ;
181
332
}
182
333
334
+ /// Registers a table like this:
335
+ /// c0,c1,c2...,c99
336
+ /// "0","100"..."9900"
337
+ /// "0","200"..."19800"
338
+ /// "0","300"..."29700"
339
+ fn register_string_table ( ctx : & SessionContext , num_columns : usize , num_rows : usize ) {
340
+ // ("c0", ["0", "0", ...])
341
+ // ("c1": ["100", "200", ...])
342
+ // etc
343
+ let iter = ( 0 ..num_columns) . map ( |i| i as u64 ) . map ( |i| {
344
+ let array: ArrayRef = Arc :: new ( arrow:: array:: StringViewArray :: from_iter_values (
345
+ ( 0 ..num_rows)
346
+ . map ( |j| format ! ( "c{}" , j as u64 * 100 + i) )
347
+ . collect :: < Vec < _ > > ( ) ,
348
+ ) ) ;
349
+ ( format ! ( "c{i}" ) , array)
350
+ } ) ;
351
+ let batch = RecordBatch :: try_from_iter ( iter) . unwrap ( ) ;
352
+ let schema = batch. schema ( ) ;
353
+ let partitions = vec ! [ vec![ batch] ] ;
354
+
355
+ // create the table
356
+ let table = MemTable :: try_new ( schema, partitions) . unwrap ( ) ;
357
+
358
+ ctx. register_table ( "t" , Arc :: new ( table) ) . unwrap ( ) ;
359
+ }
360
+
183
361
/// return a query like
184
362
/// ```sql
185
- /// select c1, null as c2, ... null as cn from t ORDER BY c1
363
+ /// select c1, 2 as c2, ... n as cn from t ORDER BY c1
186
364
/// UNION ALL
187
- /// select null as c1, c2, ... null as cn from t ORDER BY c2
365
+ /// select 1 as c1, c2, ... n as cn from t ORDER BY c2
188
366
/// ...
189
- /// select null as c1, null as c2, ... cn from t ORDER BY cn
367
+ /// select 1 as c1, 2 as c2, ... cn from t ORDER BY cn
190
368
/// ORDER BY c1, c2 ... CN
191
369
/// ```
192
370
fn union_orderby_query ( n : usize ) -> String {
@@ -200,7 +378,7 @@ fn union_orderby_query(n: usize) -> String {
200
378
if i == j {
201
379
format ! ( "c{j}" )
202
380
} else {
203
- format ! ( "null as c{j}" )
381
+ format ! ( "{j} as c{j}" )
204
382
}
205
383
} )
206
384
. collect :: < Vec < _ > > ( )
@@ -370,16 +548,37 @@ fn criterion_benchmark(c: &mut Criterion) {
370
548
} ) ;
371
549
372
550
// -- Sorted Queries --
373
- register_union_order_table ( & ctx, 100 , 1000 ) ;
374
-
375
- // this query has many expressions in its sort order so stresses
376
- // order equivalence validation
377
- c. bench_function ( "physical_sorted_union_orderby" , |b| {
378
- // SELECT ... UNION ALL ...
379
- let query = union_orderby_query ( 20 ) ;
380
- b. iter ( || physical_plan ( & ctx, & rt, & query) )
551
+ for column_count in [ 10 , 50 , 100 , 200 , 300 ] {
552
+ register_union_order_table ( & ctx, column_count, 1000 ) ;
553
+
554
+ // this query has many expressions in its sort order so stresses
555
+ // order equivalence validation
556
+ c. bench_function (
557
+ & format ! ( "physical_sorted_union_order_by_{column_count}" ) ,
558
+ |b| {
559
+ // SELECT ... UNION ALL ...
560
+ let query = union_orderby_query ( column_count) ;
561
+ b. iter ( || physical_plan ( & ctx, & rt, & query) )
562
+ } ,
563
+ ) ;
564
+
565
+ let _ = ctx. deregister_table ( "t" ) ;
566
+ }
567
+
568
+ // -- validate logical plan optimize performance
569
+ let df = build_test_data_frame ( & ctx, & rt) ;
570
+
571
+ c. bench_function ( "logical_plan_optimize" , |b| {
572
+ b. iter ( || {
573
+ let df_clone = df. clone ( ) ;
574
+ criterion:: black_box (
575
+ rt. block_on ( async { df_clone. into_optimized_plan ( ) . unwrap ( ) } ) ,
576
+ ) ;
577
+ } )
381
578
} ) ;
382
579
580
+ let _ = ctx. deregister_table ( "t" ) ;
581
+
383
582
// --- TPC-H ---
384
583
385
584
let tpch_ctx = register_defs ( SessionContext :: new ( ) , tpch_schemas ( ) ) ;
0 commit comments