|
17 | 17 |
|
18 | 18 | //! TopK: Combination of Sort / LIMIT
|
19 | 19 |
|
| 20 | +use arc_swap::ArcSwapOption; |
20 | 21 | use arrow::{
|
21 | 22 | array::{Array, AsArray},
|
22 | 23 | compute::{interleave_record_batch, prep_null_mask_filter, FilterBuilder},
|
23 | 24 | row::{RowConverter, Rows, SortField},
|
24 | 25 | };
|
25 | 26 | use datafusion_expr::{ColumnarValue, Operator};
|
26 |
| -use arc_swap::ArcSwapOption; |
27 | 27 | use std::mem::size_of;
|
28 | 28 | use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
|
29 | 29 |
|
@@ -375,24 +375,28 @@ impl TopK {
|
375 | 375 | let new_threshold_arc = Arc::new(new_threshold_row.to_vec());
|
376 | 376 |
|
377 | 377 | // Atomically update the threshold using compare-and-swap
|
378 |
| - let old_threshold = self.filter.threshold_row.compare_and_swap(¤t_threshold, Some(Arc::clone(&new_threshold_arc))); |
379 |
| - |
| 378 | + let old_threshold = self.filter.threshold_row.compare_and_swap( |
| 379 | + ¤t_threshold, |
| 380 | + Some(Arc::clone(&new_threshold_arc)), |
| 381 | + ); |
| 382 | + |
380 | 383 | // Only update filter if we successfully updated the threshold
|
381 | 384 | // (or if there was no previous threshold and we're the first)
|
382 |
| - let should_update_filter = match (old_threshold.as_ref(), current_threshold.as_ref()) { |
383 |
| - // We successfully swapped |
384 |
| - (Some(old), Some(expected)) if Arc::ptr_eq(old, expected) => true, |
385 |
| - // We were the first to set it |
386 |
| - (None, None) => true, |
387 |
| - // Another thread updated before us, check if our threshold is still better |
388 |
| - (Some(actual_old), _) => { |
389 |
| - actual_old.as_slice().cmp(new_threshold_row) == Ordering::Greater |
390 |
| - } |
391 |
| - _ => false, |
392 |
| - }; |
| 385 | + let should_update_filter = |
| 386 | + match (old_threshold.as_ref(), current_threshold.as_ref()) { |
| 387 | + // We successfully swapped |
| 388 | + (Some(old), Some(expected)) if Arc::ptr_eq(old, expected) => true, |
| 389 | + // We were the first to set it |
| 390 | + (None, None) => true, |
| 391 | + // Another thread updated before us, check if our threshold is still better |
| 392 | + (Some(actual_old), _) => { |
| 393 | + actual_old.as_slice().cmp(new_threshold_row) == Ordering::Greater |
| 394 | + } |
| 395 | + _ => false, |
| 396 | + }; |
393 | 397 |
|
394 | 398 | if should_update_filter {
|
395 |
| - // Update the filter expression |
| 399 | + // Update the filter expression |
396 | 400 | if let Some(pred) = predicate {
|
397 | 401 | if !pred.eq(&lit(true)) {
|
398 | 402 | filter_expr.update(pred)?;
|
|
0 commit comments