@@ -23,7 +23,7 @@ use arrow::{
23
23
row:: { RowConverter , Rows , SortField } ,
24
24
} ;
25
25
use datafusion_expr:: { ColumnarValue , Operator } ;
26
- use parking_lot :: RwLock ;
26
+ use arc_swap :: ArcSwapOption ;
27
27
use std:: mem:: size_of;
28
28
use std:: { cmp:: Ordering , collections:: BinaryHeap , sync:: Arc } ;
29
29
@@ -134,7 +134,7 @@ pub struct TopKDynamicFilters {
134
134
/// The current *global* threshold for the dynamic filter.
135
135
/// This is shared across all partitions and is updated by any of them.
136
136
/// Stored as row bytes for efficient comparison.
137
- threshold_row : Arc < RwLock < Option < Vec < u8 > > > > ,
137
+ threshold_row : Arc < ArcSwapOption < Vec < u8 > > > ,
138
138
/// The expression used to evaluate the dynamic filter
139
139
expr : Arc < DynamicFilterPhysicalExpr > ,
140
140
}
@@ -143,7 +143,7 @@ impl TopKDynamicFilters {
143
143
/// Create a new `TopKDynamicFilters` with the given expression
144
144
pub fn new ( expr : Arc < DynamicFilterPhysicalExpr > ) -> Self {
145
145
Self {
146
- threshold_row : Arc :: new ( RwLock :: new ( None ) ) ,
146
+ threshold_row : Arc :: new ( ArcSwapOption :: from ( None ) ) ,
147
147
expr,
148
148
}
149
149
}
@@ -349,62 +349,67 @@ impl TopK {
349
349
350
350
let new_threshold_row = & max_row. row ;
351
351
352
+ // Extract scalar values BEFORE acquiring lock to reduce critical section
353
+ let thresholds = match self . heap . get_threshold_values ( & self . expr ) ? {
354
+ Some ( t) => t,
355
+ None => return Ok ( ( ) ) ,
356
+ } ;
357
+
352
358
// Extract filter expression reference before entering critical section
353
359
let filter_expr = Arc :: clone ( & self . filter . expr ) ;
354
360
355
- // Check if we need to update and do both threshold and filter update atomically
356
- {
357
- let mut threshold_guard = self . filter . threshold_row . write ( ) ;
358
- if let Some ( current_row) = threshold_guard. as_ref ( ) {
359
- match current_row. as_slice ( ) . cmp ( new_threshold_row) {
360
- Ordering :: Greater => {
361
- // new < current, so new threshold is more selective
362
- // Update threshold and filter atomically to prevent race conditions
363
- * threshold_guard = Some ( new_threshold_row. to_vec ( ) ) ;
364
-
365
- // Extract scalar values for filter expression creation
366
- let thresholds =
367
- match self . heap . get_threshold_values ( & self . expr ) ? {
368
- Some ( t) => t,
369
- None => return Ok ( ( ) ) ,
370
- } ;
371
-
372
- // Update the filter expression while still holding the lock
373
- Self :: update_filter_expression (
374
- & filter_expr,
375
- & self . expr ,
376
- thresholds,
377
- ) ?;
378
- }
379
- _ => {
380
- // Same threshold or current is more selective, no need to update
381
- }
382
- }
383
- } else {
384
- // No current thresholds, so update with the new ones
385
- * threshold_guard = Some ( new_threshold_row. to_vec ( ) ) ;
361
+ // Check if we need to update the threshold (lock-free read)
362
+ let current_threshold = self . filter . threshold_row . load ( ) ;
363
+ let needs_update = match current_threshold. as_ref ( ) {
364
+ Some ( current_row) => {
365
+ // new < current means new threshold is more selective
366
+ current_row. as_slice ( ) . cmp ( new_threshold_row) == Ordering :: Greater
367
+ }
368
+ None => true , // No current threshold, so we need to set one
369
+ } ;
386
370
387
- // Extract scalar values for filter expression creation
388
- let thresholds = match self . heap . get_threshold_values ( & self . expr ) ? {
389
- Some ( t) => t,
390
- None => return Ok ( ( ) ) ,
391
- } ;
371
+ // Only proceed if we need to update
372
+ if needs_update {
373
+ // Build the filter expression OUTSIDE any synchronization
374
+ let predicate = Self :: build_filter_expression ( & self . expr , thresholds) ?;
375
+ let new_threshold_arc = Arc :: new ( new_threshold_row. to_vec ( ) ) ;
376
+
377
+ // Atomically update the threshold using compare-and-swap
378
+ let old_threshold = self . filter . threshold_row . compare_and_swap ( & current_threshold, Some ( Arc :: clone ( & new_threshold_arc) ) ) ;
379
+
380
+ // Only update filter if we successfully updated the threshold
381
+ // (or if there was no previous threshold and we're the first)
382
+ let should_update_filter = match ( old_threshold. as_ref ( ) , current_threshold. as_ref ( ) ) {
383
+ // We successfully swapped
384
+ ( Some ( old) , Some ( expected) ) if Arc :: ptr_eq ( old, expected) => true ,
385
+ // We were the first to set it
386
+ ( None , None ) => true ,
387
+ // Another thread updated before us, check if our threshold is still better
388
+ ( Some ( actual_old) , _) => {
389
+ actual_old. as_slice ( ) . cmp ( new_threshold_row) == Ordering :: Greater
390
+ }
391
+ _ => false ,
392
+ } ;
392
393
393
- // Update the filter expression while still holding the lock
394
- Self :: update_filter_expression ( & filter_expr, & self . expr , thresholds) ?;
394
+ if should_update_filter {
395
+ // Update the filter expression
396
+ if let Some ( pred) = predicate {
397
+ if !pred. eq ( & lit ( true ) ) {
398
+ filter_expr. update ( pred) ?;
399
+ }
400
+ }
395
401
}
396
- } ;
402
+ }
397
403
398
404
Ok ( ( ) )
399
405
}
400
406
401
- /// Update the filter expression with the given thresholds.
402
- /// This should only be called while holding the threshold lock.
403
- fn update_filter_expression (
404
- filter_expr : & DynamicFilterPhysicalExpr ,
407
+ /// Build the filter expression with the given thresholds.
408
+ /// This is now called outside of any locks to reduce critical section time.
409
+ fn build_filter_expression (
405
410
sort_exprs : & [ PhysicalSortExpr ] ,
406
411
thresholds : Vec < ScalarValue > ,
407
- ) -> Result < ( ) > {
412
+ ) -> Result < Option < Arc < dyn PhysicalExpr > > > {
408
413
// Create filter expressions for each threshold
409
414
let mut filters: Vec < Arc < dyn PhysicalExpr > > =
410
415
Vec :: with_capacity ( thresholds. len ( ) ) ;
@@ -482,13 +487,7 @@ impl TopK {
482
487
. into_iter ( )
483
488
. reduce ( |a, b| Arc :: new ( BinaryExpr :: new ( a, Operator :: Or , b) ) ) ;
484
489
485
- if let Some ( predicate) = dynamic_predicate {
486
- if !predicate. eq ( & lit ( true ) ) {
487
- filter_expr. update ( predicate) ?;
488
- }
489
- }
490
-
491
- Ok ( ( ) )
490
+ Ok ( dynamic_predicate)
492
491
}
493
492
494
493
/// If input ordering shares a common sort prefix with the TopK, and if the TopK's heap is full,
0 commit comments