Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/physical-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ log = { workspace = true }
parking_lot = { workspace = true }
paste = "^1.0"
petgraph = "0.8.2"
rand = { workspace = true }

[dev-dependencies]
arrow = { workspace = true, features = ["test_utils"] }
Expand Down
183 changes: 182 additions & 1 deletion datafusion/physical-expr/src/expressions/dynamic_filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

use parking_lot::RwLock;
use rand::Rng;
use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};

use crate::PhysicalExpr;
Expand All @@ -27,13 +28,18 @@ use datafusion_common::{
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash};

/// Callback function type for update notifications.
/// The callback receives a reference to the entire DynamicFilterPhysicalExpr after it has been updated.
type UpdateCallback = Box<dyn Fn(&DynamicFilterPhysicalExpr) -> Result<()> + Send + Sync>;

/// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it.
///
/// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also
/// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where
/// the same `ExecutionPlan` is reused with different data.
#[derive(Debug)]
pub struct DynamicFilterPhysicalExpr {
/// A unique identifier for this dynamic filter expression, used for wire protocol communication.
id: u64,
/// The original children of this PhysicalExpr, if any.
/// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`)
/// and later remapped to the actual expressions that are being filtered.
Expand All @@ -49,6 +55,25 @@ pub struct DynamicFilterPhysicalExpr {
/// But this can have overhead in production, so it's only included in our tests.
data_type: Arc<RwLock<Option<DataType>>>,
nullable: Arc<RwLock<Option<bool>>>,
/// Callbacks that are triggered when the expression is updated.
update_callbacks: Arc<RwLock<Vec<UpdateCallback>>>,
}

impl std::fmt::Debug for DynamicFilterPhysicalExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DynamicFilterPhysicalExpr")
.field("id", &self.id)
.field("children", &self.children)
.field("remapped_children", &self.remapped_children)
.field("inner", &self.inner)
.field("data_type", &self.data_type)
.field("nullable", &self.nullable)
.field(
"update_callbacks",
&format_args!("[{} callbacks]", self.update_callbacks.read().len()),
)
.finish()
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -77,6 +102,7 @@ impl Inner {

impl Hash for DynamicFilterPhysicalExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
let inner = self.current().expect("Failed to get current expression");
inner.dyn_hash(state);
self.children.dyn_hash(state);
Expand All @@ -86,6 +112,9 @@ impl Hash for DynamicFilterPhysicalExpr {

impl PartialEq for DynamicFilterPhysicalExpr {
fn eq(&self, other: &Self) -> bool {
if self.id != other.id {
return false;
}
let inner = self.current().expect("Failed to get current expression");
let our_children = self.remapped_children.as_ref().unwrap_or(&self.children);
let other_children = other.remapped_children.as_ref().unwrap_or(&other.children);
Expand Down Expand Up @@ -136,14 +165,28 @@ impl DynamicFilterPhysicalExpr {
inner: Arc<dyn PhysicalExpr>,
) -> Self {
Self {
id: rand::rng().random(),
children,
remapped_children: None, // Initially no remapped children
inner: Arc::new(RwLock::new(Inner::new(inner))),
data_type: Arc::new(RwLock::new(None)),
nullable: Arc::new(RwLock::new(None)),
update_callbacks: Arc::new(RwLock::new(Vec::new())),
}
}

/// Get the unique identifier for this dynamic filter expression.
/// This can be used to serialize updates across the wire.
pub fn id(&self) -> u64 {
self.id
}

/// Register a callback that will be triggered whenever the expression is updated.
/// The callback receives a reference to the entire DynamicFilterPhysicalExpr after the update.
pub fn register_update_callback(&self, callback: UpdateCallback) {
self.update_callbacks.write().push(callback);
}

fn remap_children(
children: &[Arc<dyn PhysicalExpr>],
remapped_children: Option<&Vec<Arc<dyn PhysicalExpr>>>,
Expand Down Expand Up @@ -204,6 +247,18 @@ impl DynamicFilterPhysicalExpr {
generation: current.generation + 1,
expr: new_expr,
};
// Release the write lock before triggering callbacks
drop(current);

// Trigger all registered update callbacks
let callbacks = self.update_callbacks.read();
for callback in callbacks.iter() {
if let Err(e) = callback(self) {
// Log callback errors but don't fail the update
log::warn!("Update callback failed: {e}");
}
}

Ok(())
}
}
Expand All @@ -226,11 +281,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr {
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(Arc::new(Self {
id: self.id,
children: self.children.clone(),
remapped_children: Some(children),
inner: Arc::clone(&self.inner),
data_type: Arc::clone(&self.data_type),
nullable: Arc::clone(&self.nullable),
update_callbacks: Arc::clone(&self.update_callbacks),
}))
}

Expand Down Expand Up @@ -448,6 +505,130 @@ mod test {
assert_eq!(snapshot, Some(new_expr));
}

#[test]
fn test_unique_id_generation() {
let expr = lit(42) as Arc<dyn PhysicalExpr>;
let dynamic_filter1 = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));
let dynamic_filter2 = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));

// Each instance should have a unique ID
assert_ne!(dynamic_filter1.id(), dynamic_filter2.id());

// ID should be preserved through with_new_children
let dynamic_filter1_clone =
Arc::new(dynamic_filter1).with_new_children(vec![]).unwrap();
let dynamic_filter1_downcast = dynamic_filter1_clone
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap();

// ID should be the same after with_new_children
assert_eq!(dynamic_filter1_downcast.id(), dynamic_filter1_downcast.id());
}

#[test]
fn test_update_callbacks() {
use std::sync::atomic::{AtomicU64, Ordering};

let expr = lit(42) as Arc<dyn PhysicalExpr>;
let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));

// Counter to track callback invocations
let callback_count = Arc::new(AtomicU64::new(0));
let last_seen_id = Arc::new(AtomicU64::new(0));

// Register a callback
let callback_count_clone = Arc::clone(&callback_count);
let last_seen_id_clone = Arc::clone(&last_seen_id);
dynamic_filter.register_update_callback(Box::new(
move |filter: &DynamicFilterPhysicalExpr| {
callback_count_clone.fetch_add(1, Ordering::SeqCst);
last_seen_id_clone.store(filter.id(), Ordering::SeqCst);
Ok(())
},
));

// Initial state - no callbacks triggered yet
assert_eq!(callback_count.load(Ordering::SeqCst), 0);

// Update the expression - should trigger callback
let new_expr = lit(100) as Arc<dyn PhysicalExpr>;
dynamic_filter.update(Arc::clone(&new_expr)).unwrap();

// Callback should have been triggered once
assert_eq!(callback_count.load(Ordering::SeqCst), 1);
assert_eq!(last_seen_id.load(Ordering::SeqCst), dynamic_filter.id());

// Update again - should trigger callback again
let another_expr = lit(200) as Arc<dyn PhysicalExpr>;
dynamic_filter.update(Arc::clone(&another_expr)).unwrap();

// Callback should have been triggered twice
assert_eq!(callback_count.load(Ordering::SeqCst), 2);
assert_eq!(last_seen_id.load(Ordering::SeqCst), dynamic_filter.id());

// Test callback preservation through with_new_children
let cloned_filter = Arc::new(dynamic_filter).with_new_children(vec![]).unwrap();
let cloned_filter_downcast = cloned_filter
.as_any()
.downcast_ref::<DynamicFilterPhysicalExpr>()
.unwrap();

// Update the cloned filter - should still trigger the original callback
let final_expr = lit(300) as Arc<dyn PhysicalExpr>;
cloned_filter_downcast
.update(Arc::clone(&final_expr))
.unwrap();

// Callback should have been triggered a third time
assert_eq!(callback_count.load(Ordering::SeqCst), 3);
assert_eq!(
last_seen_id.load(Ordering::SeqCst),
cloned_filter_downcast.id()
);
}

#[test]
fn test_callback_error_handling() {
use std::sync::atomic::{AtomicU64, Ordering};

let expr = lit(42) as Arc<dyn PhysicalExpr>;
let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr));

let success_count = Arc::new(AtomicU64::new(0));
let success_count_clone = Arc::clone(&success_count);

// Register a callback that succeeds
dynamic_filter.register_update_callback(Box::new(move |_| {
success_count_clone.fetch_add(1, Ordering::SeqCst);
Ok(())
}));

// Register a callback that fails
dynamic_filter.register_update_callback(Box::new(|_| {
Err(datafusion_common::DataFusionError::Internal(
"Test error".to_string(),
))
}));

// Register another callback that succeeds
let success_count_clone2 = Arc::clone(&success_count);
dynamic_filter.register_update_callback(Box::new(move |_| {
success_count_clone2.fetch_add(1, Ordering::SeqCst);
Ok(())
}));

// Update should succeed even though one callback fails
let new_expr = lit(100) as Arc<dyn PhysicalExpr>;
let result = dynamic_filter.update(Arc::clone(&new_expr));

// Update operation should succeed
assert!(result.is_ok());

// Both successful callbacks should have been called
assert_eq!(success_count.load(Ordering::SeqCst), 2);
}

#[test]
fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() {
let dynamic_filter =
Expand Down
Loading