Skip to content

Commit 310100b

Browse files
committed
use more ArcSwap
1 parent cd344ed commit 310100b

File tree

4 files changed

+18
-26
lines changed

4 files changed

+18
-26
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ ahash = { version = "0.8", default-features = false, features = [
9090
"runtime-rng",
9191
] }
9292
apache-avro = { version = "0.17", default-features = false }
93+
arc-swap = "1.7.1"
9394
arrow = { version = "56.0.0", features = [
9495
"prettyprint",
9596
"chrono-tz",

datafusion/physical-expr/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ name = "datafusion_physical_expr"
3939

4040
[dependencies]
4141
ahash = { workspace = true }
42+
arc-swap = { workspace = true }
4243
arrow = { workspace = true }
4344
datafusion-common = { workspace = true, default-features = true }
4445
datafusion-expr = { workspace = true }

datafusion/physical-expr/src/expressions/dynamic_filters.rs

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::{
2222
sync::{Arc, RwLock},
2323
};
2424

25+
use arc_swap::ArcSwap;
26+
2527
use crate::PhysicalExpr;
2628
use arrow::datatypes::{DataType, Schema};
2729
use datafusion_common::{
@@ -47,7 +49,7 @@ pub struct DynamicFilterPhysicalExpr {
4749
/// so that when we update `current()` in subsequent iterations we can re-apply the replacements.
4850
remapped_children: Option<Vec<Arc<dyn PhysicalExpr>>>,
4951
/// The source of dynamic filters.
50-
inner: Arc<RwLock<Inner>>,
52+
inner: Arc<ArcSwap<Inner>>,
5153
/// For testing purposes track the data type and nullability to make sure they don't change.
5254
/// If they do, there's a bug in the implementation.
5355
/// But this can have overhead in production, so it's only included in our tests.
@@ -137,7 +139,7 @@ impl DynamicFilterPhysicalExpr {
137139
Self {
138140
children,
139141
remapped_children: None, // Initially no remapped children
140-
inner: Arc::new(RwLock::new(Inner::new(inner))),
142+
inner: Arc::new(ArcSwap::from_pointee(Inner::new(inner))),
141143
data_type: Arc::new(RwLock::new(None)),
142144
nullable: Arc::new(RwLock::new(None)),
143145
}
@@ -176,17 +178,8 @@ impl DynamicFilterPhysicalExpr {
176178
/// This will return the current expression with any children
177179
/// remapped to match calls to [`PhysicalExpr::with_new_children`].
178180
pub fn current(&self) -> Result<Arc<dyn PhysicalExpr>> {
179-
let inner = Arc::clone(
180-
&self
181-
.inner
182-
.read()
183-
.map_err(|_| {
184-
datafusion_common::DataFusionError::Execution(
185-
"Failed to acquire read lock for inner".to_string(),
186-
)
187-
})?
188-
.expr,
189-
);
181+
let inner_ref = self.inner.load();
182+
let inner = Arc::clone(&inner_ref.expr);
190183
let inner =
191184
Self::remap_children(&self.children, self.remapped_children.as_ref(), inner)?;
192185
Ok(inner)
@@ -199,11 +192,6 @@ impl DynamicFilterPhysicalExpr {
199192
/// - When we've computed the probe side's hash table in a HashJoinExec
200193
/// - After every batch is processed if we update the TopK heap in a SortExec using a TopK approach.
201194
pub fn update(&self, new_expr: Arc<dyn PhysicalExpr>) -> Result<()> {
202-
let mut current = self.inner.write().map_err(|_| {
203-
datafusion_common::DataFusionError::Execution(
204-
"Failed to acquire write lock for inner".to_string(),
205-
)
206-
})?;
207195
// Remap the children of the new expression to match the original children
208196
// We still do this again in `current()` but doing it preventively here
209197
// reduces the work needed in some cases if `current()` is called multiple times
@@ -213,10 +201,14 @@ impl DynamicFilterPhysicalExpr {
213201
self.remapped_children.as_ref(),
214202
new_expr,
215203
)?;
216-
// Update the inner expression to the new expression.
217-
current.expr = new_expr;
218-
// Increment the generation to indicate that the expression has changed.
219-
current.generation += 1;
204+
205+
// Load the current inner, increment generation, and store the new one
206+
let current = self.inner.load();
207+
let new_inner = Inner {
208+
generation: current.generation + 1,
209+
expr: new_expr,
210+
};
211+
self.inner.store(Arc::new(new_inner));
220212
Ok(())
221213
}
222214
}
@@ -324,10 +316,7 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
324316

325317
fn snapshot_generation(&self) -> u64 {
326318
// Return the current generation of the expression.
327-
self.inner
328-
.read()
329-
.expect("Failed to acquire read lock for inner")
330-
.generation
319+
self.inner.load().generation
331320
}
332321
}
333322

0 commit comments

Comments
 (0)