diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index c0eaa3e20528..5b6c69b52a93 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -53,6 +53,7 @@ log = { workspace = true } parking_lot = { workspace = true } paste = "^1.0" petgraph = "0.8.2" +rand = { workspace = true } [dev-dependencies] arrow = { workspace = true, features = ["test_utils"] } diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index eeb0c6e8028f..e156c88d88f6 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -16,6 +16,7 @@ // under the License. use parking_lot::RwLock; +use rand::Rng; use std::{any::Any, fmt::Display, hash::Hash, sync::Arc}; use crate::PhysicalExpr; @@ -27,13 +28,18 @@ use datafusion_common::{ use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; +/// Callback function type for update notifications. +/// The callback receives a reference to the entire DynamicFilterPhysicalExpr after it has been updated. +type UpdateCallback = Box Result<()> + Send + Sync>; + /// A dynamic [`PhysicalExpr`] that can be updated by anyone with a reference to it. /// /// Any `ExecutionPlan` that uses this expression and holds a reference to it internally should probably also /// implement `ExecutionPlan::reset_state` to remain compatible with recursive queries and other situations where /// the same `ExecutionPlan` is reused with different data. -#[derive(Debug)] pub struct DynamicFilterPhysicalExpr { + /// A unique identifier for this dynamic filter expression, used for wire protocol communication. + id: u64, /// The original children of this PhysicalExpr, if any. /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) /// and later remapped to the actual expressions that are being filtered. @@ -49,6 +55,25 @@ pub struct DynamicFilterPhysicalExpr { /// But this can have overhead in production, so it's only included in our tests. data_type: Arc>>, nullable: Arc>>, + /// Callbacks that are triggered when the expression is updated. + update_callbacks: Arc>>, +} + +impl std::fmt::Debug for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DynamicFilterPhysicalExpr") + .field("id", &self.id) + .field("children", &self.children) + .field("remapped_children", &self.remapped_children) + .field("inner", &self.inner) + .field("data_type", &self.data_type) + .field("nullable", &self.nullable) + .field( + "update_callbacks", + &format_args!("[{} callbacks]", self.update_callbacks.read().len()), + ) + .finish() + } } #[derive(Debug)] @@ -77,6 +102,7 @@ impl Inner { impl Hash for DynamicFilterPhysicalExpr { fn hash(&self, state: &mut H) { + self.id.hash(state); let inner = self.current().expect("Failed to get current expression"); inner.dyn_hash(state); self.children.dyn_hash(state); @@ -86,6 +112,9 @@ impl Hash for DynamicFilterPhysicalExpr { impl PartialEq for DynamicFilterPhysicalExpr { fn eq(&self, other: &Self) -> bool { + if self.id != other.id { + return false; + } let inner = self.current().expect("Failed to get current expression"); let our_children = self.remapped_children.as_ref().unwrap_or(&self.children); let other_children = other.remapped_children.as_ref().unwrap_or(&other.children); @@ -136,14 +165,28 @@ impl DynamicFilterPhysicalExpr { inner: Arc, ) -> Self { Self { + id: rand::rng().random(), children, remapped_children: None, // Initially no remapped children inner: Arc::new(RwLock::new(Inner::new(inner))), data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), + update_callbacks: Arc::new(RwLock::new(Vec::new())), } } + /// Get the unique identifier for this dynamic filter expression. + /// This can be used to serialize updates across the wire. + pub fn id(&self) -> u64 { + self.id + } + + /// Register a callback that will be triggered whenever the expression is updated. + /// The callback receives a reference to the entire DynamicFilterPhysicalExpr after the update. + pub fn register_update_callback(&self, callback: UpdateCallback) { + self.update_callbacks.write().push(callback); + } + fn remap_children( children: &[Arc], remapped_children: Option<&Vec>>, @@ -204,6 +247,18 @@ impl DynamicFilterPhysicalExpr { generation: current.generation + 1, expr: new_expr, }; + // Release the write lock before triggering callbacks + drop(current); + + // Trigger all registered update callbacks + let callbacks = self.update_callbacks.read(); + for callback in callbacks.iter() { + if let Err(e) = callback(self) { + // Log callback errors but don't fail the update + log::warn!("Update callback failed: {e}"); + } + } + Ok(()) } } @@ -226,11 +281,13 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { children: Vec>, ) -> Result> { Ok(Arc::new(Self { + id: self.id, children: self.children.clone(), remapped_children: Some(children), inner: Arc::clone(&self.inner), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), + update_callbacks: Arc::clone(&self.update_callbacks), })) } @@ -448,6 +505,130 @@ mod test { assert_eq!(snapshot, Some(new_expr)); } + #[test] + fn test_unique_id_generation() { + let expr = lit(42) as Arc; + let dynamic_filter1 = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + let dynamic_filter2 = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + + // Each instance should have a unique ID + assert_ne!(dynamic_filter1.id(), dynamic_filter2.id()); + + // ID should be preserved through with_new_children + let dynamic_filter1_clone = + Arc::new(dynamic_filter1).with_new_children(vec![]).unwrap(); + let dynamic_filter1_downcast = dynamic_filter1_clone + .as_any() + .downcast_ref::() + .unwrap(); + + // ID should be the same after with_new_children + assert_eq!(dynamic_filter1_downcast.id(), dynamic_filter1_downcast.id()); + } + + #[test] + fn test_update_callbacks() { + use std::sync::atomic::{AtomicU64, Ordering}; + + let expr = lit(42) as Arc; + let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + + // Counter to track callback invocations + let callback_count = Arc::new(AtomicU64::new(0)); + let last_seen_id = Arc::new(AtomicU64::new(0)); + + // Register a callback + let callback_count_clone = Arc::clone(&callback_count); + let last_seen_id_clone = Arc::clone(&last_seen_id); + dynamic_filter.register_update_callback(Box::new( + move |filter: &DynamicFilterPhysicalExpr| { + callback_count_clone.fetch_add(1, Ordering::SeqCst); + last_seen_id_clone.store(filter.id(), Ordering::SeqCst); + Ok(()) + }, + )); + + // Initial state - no callbacks triggered yet + assert_eq!(callback_count.load(Ordering::SeqCst), 0); + + // Update the expression - should trigger callback + let new_expr = lit(100) as Arc; + dynamic_filter.update(Arc::clone(&new_expr)).unwrap(); + + // Callback should have been triggered once + assert_eq!(callback_count.load(Ordering::SeqCst), 1); + assert_eq!(last_seen_id.load(Ordering::SeqCst), dynamic_filter.id()); + + // Update again - should trigger callback again + let another_expr = lit(200) as Arc; + dynamic_filter.update(Arc::clone(&another_expr)).unwrap(); + + // Callback should have been triggered twice + assert_eq!(callback_count.load(Ordering::SeqCst), 2); + assert_eq!(last_seen_id.load(Ordering::SeqCst), dynamic_filter.id()); + + // Test callback preservation through with_new_children + let cloned_filter = Arc::new(dynamic_filter).with_new_children(vec![]).unwrap(); + let cloned_filter_downcast = cloned_filter + .as_any() + .downcast_ref::() + .unwrap(); + + // Update the cloned filter - should still trigger the original callback + let final_expr = lit(300) as Arc; + cloned_filter_downcast + .update(Arc::clone(&final_expr)) + .unwrap(); + + // Callback should have been triggered a third time + assert_eq!(callback_count.load(Ordering::SeqCst), 3); + assert_eq!( + last_seen_id.load(Ordering::SeqCst), + cloned_filter_downcast.id() + ); + } + + #[test] + fn test_callback_error_handling() { + use std::sync::atomic::{AtomicU64, Ordering}; + + let expr = lit(42) as Arc; + let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); + + let success_count = Arc::new(AtomicU64::new(0)); + let success_count_clone = Arc::clone(&success_count); + + // Register a callback that succeeds + dynamic_filter.register_update_callback(Box::new(move |_| { + success_count_clone.fetch_add(1, Ordering::SeqCst); + Ok(()) + })); + + // Register a callback that fails + dynamic_filter.register_update_callback(Box::new(|_| { + Err(datafusion_common::DataFusionError::Internal( + "Test error".to_string(), + )) + })); + + // Register another callback that succeeds + let success_count_clone2 = Arc::clone(&success_count); + dynamic_filter.register_update_callback(Box::new(move |_| { + success_count_clone2.fetch_add(1, Ordering::SeqCst); + Ok(()) + })); + + // Update should succeed even though one callback fails + let new_expr = lit(100) as Arc; + let result = dynamic_filter.update(Arc::clone(&new_expr)); + + // Update operation should succeed + assert!(result.is_ok()); + + // Both successful callbacks should have been called + assert_eq!(success_count.load(Ordering::SeqCst), 2); + } + #[test] fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { let dynamic_filter =