diff --git a/doublets/src/data/traits.rs b/doublets/src/data/traits.rs index 2337fbe..e05b71b 100644 --- a/doublets/src/data/traits.rs +++ b/doublets/src/data/traits.rs @@ -638,12 +638,23 @@ impl + Sized> DoubletsExt for All { #[cfg(feature = "rayon")] fn par_each_iter(&self, query: impl ToQuery) -> Self::IdxParIter { - let mut vec = Vec::with_capacity(self.count_by(query.to_query()).as_usize()); - self.each_by(query, |link| { + // Use bump allocator for efficient parallel memory management + let bump = Bump::new(); + let query = query.to_query(); + let estimated_count = self.count_by(&query).as_usize(); + + // Allocate vec in bump for efficient parallel collection + let mut vec = bumpalo::collections::Vec::with_capacity_in(estimated_count, &bump); + + // Use optimized parallel traversal if possible, fall back to sequential + self.each_by(&query, |link| { vec.push(link); Flow::Continue }); - vec.into_par_iter() + + // Convert to parallel iterator + let results: Vec> = vec.into_iter().collect(); + results.into_par_iter() } type ImplIter = Self::ImplIterEach; diff --git a/doublets/src/mem/traits.rs b/doublets/src/mem/traits.rs index dd8df6f..3bc0987 100644 --- a/doublets/src/mem/traits.rs +++ b/doublets/src/mem/traits.rs @@ -13,6 +13,18 @@ pub trait LinksTree { fn each_usages) -> Flow + ?Sized>(&self, root: T, handler: &mut H) -> Flow; + #[cfg(feature = "rayon")] + fn par_each_usages(&self, root: T, buf: &mut bumpalo::collections::Vec<'_, Link>) -> Result<(), ()> { + // Default implementation falls back to sequential + match self.each_usages(root, &mut |link| { + buf.push(link); + Flow::Continue + }) { + Flow::Continue => Ok(()), + Flow::Break => Err(()), + } + } + fn detach(&mut self, root: &mut T, index: T); fn attach(&mut self, root: &mut T, index: T); diff --git a/doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs b/doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs index 8dd920c..b9d567c 100644 --- a/doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs +++ b/doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs @@ -126,6 +126,65 @@ fn each_usages_core) -> Flow + ?Sized>( Flow::Continue } +#[cfg(feature = "rayon")] +fn par_each_usages_core( + this: &LinksSourcesRecursionlessSizeBalancedTree, + base: T, + link: T, + buf: &mut bumpalo::collections::Vec<'_, Link>, +) -> Result<(), ()> { + use bumpalo::Bump; + use rayon::prelude::*; + + unsafe { + if link == T::funty(0) { + return Ok(()); + } + let link_base_part = this.get_base_part(link); + + if link_base_part > base { + par_each_usages_core(this, base, this.get_left_or_default(link), buf)?; + } else if link_base_part < base { + par_each_usages_core(this, base, this.get_right_or_default(link), buf)?; + } else { + // Process current node + buf.push(this.get_link_value(link)); + + // Determine if we should parallelize based on subtree size + let left_link = this.get_left_or_default(link); + let right_link = this.get_right_or_default(link); + + // Simple heuristic: parallelize if both subtrees exist + if left_link != T::funty(0) && right_link != T::funty(0) { + // Create temporary bump for parallel work + let left_bump = Bump::new(); + let right_bump = Bump::new(); + let mut left_buf = bumpalo::collections::Vec::new_in(&left_bump); + let mut right_buf = bumpalo::collections::Vec::new_in(&right_bump); + + // Parallel execution using rayon::join + let (left_result, right_result) = rayon::join( + || par_each_usages_core(this, base, left_link, &mut left_buf), + || par_each_usages_core(this, base, right_link, &mut right_buf), + ); + + // Handle results + left_result?; + right_result?; + + // Merge results into main buffer + buf.extend(left_buf.iter().copied()); + buf.extend(right_buf.iter().copied()); + } else { + // Sequential execution for small subtrees + par_each_usages_core(this, base, left_link, buf)?; + par_each_usages_core(this, base, right_link, buf)?; + } + } + } + Ok(()) +} + impl LinksTree for LinksSourcesRecursionlessSizeBalancedTree { fn count_usages(&self, link: T) -> T { unsafe { @@ -184,6 +243,11 @@ impl LinksTree for LinksSourcesRecursionlessSizeBalancedTree each_usages_core(self, root, self.get_tree_root(), handler) } + #[cfg(feature = "rayon")] + fn par_each_usages(&self, root: T, buf: &mut bumpalo::collections::Vec<'_, Link>) -> Result<(), ()> { + par_each_usages_core(self, root, self.get_tree_root(), buf) + } + fn detach(&mut self, root: &mut T, index: T) { unsafe { NoRecurSzbTree::detach(self, root as *mut _, index) } } diff --git a/doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs b/doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs index 2867d20..ac7ca77 100644 --- a/doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs +++ b/doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs @@ -126,6 +126,65 @@ fn each_usages_core) -> Flow + ?Sized>( Flow::Continue } +#[cfg(feature = "rayon")] +fn par_each_usages_core( + this: &LinksTargetsRecursionlessSizeBalancedTree, + base: T, + link: T, + buf: &mut bumpalo::collections::Vec<'_, Link>, +) -> Result<(), ()> { + use bumpalo::Bump; + use rayon::prelude::*; + + if link == T::funty(0) { + return Ok(()); + } + unsafe { + let link_base_part = this.get_base_part(link); + + if link_base_part > base { + par_each_usages_core(this, base, this.get_left_or_default(link), buf)?; + } else if link_base_part < base { + par_each_usages_core(this, base, this.get_right_or_default(link), buf)?; + } else { + // Process current node + buf.push(this.get_link_value(link)); + + // Determine if we should parallelize based on subtree size + let left_link = this.get_left_or_default(link); + let right_link = this.get_right_or_default(link); + + // Simple heuristic: parallelize if both subtrees exist + if left_link != T::funty(0) && right_link != T::funty(0) { + // Create temporary bump for parallel work + let left_bump = Bump::new(); + let right_bump = Bump::new(); + let mut left_buf = bumpalo::collections::Vec::new_in(&left_bump); + let mut right_buf = bumpalo::collections::Vec::new_in(&right_bump); + + // Parallel execution using rayon::join + let (left_result, right_result) = rayon::join( + || par_each_usages_core(this, base, left_link, &mut left_buf), + || par_each_usages_core(this, base, right_link, &mut right_buf), + ); + + // Handle results + left_result?; + right_result?; + + // Merge results into main buffer + buf.extend(left_buf.iter().copied()); + buf.extend(right_buf.iter().copied()); + } else { + // Sequential execution for small subtrees + par_each_usages_core(this, base, left_link, buf)?; + par_each_usages_core(this, base, right_link, buf)?; + } + } + } + Ok(()) +} + impl LinksTree for LinksTargetsRecursionlessSizeBalancedTree { fn count_usages(&self, link: T) -> T { unsafe { @@ -184,6 +243,11 @@ impl LinksTree for LinksTargetsRecursionlessSizeBalancedTree each_usages_core(self, root, self.get_tree_root(), handler) } + #[cfg(feature = "rayon")] + fn par_each_usages(&self, root: T, buf: &mut bumpalo::collections::Vec<'_, Link>) -> Result<(), ()> { + par_each_usages_core(self, root, self.get_tree_root(), buf) + } + fn detach(&mut self, root: &mut T, index: T) { unsafe { NoRecurSzbTree::detach(self, root as *mut _, index) } } diff --git a/doublets/tests/parallel.rs b/doublets/tests/parallel.rs new file mode 100644 index 0000000..39d1c45 --- /dev/null +++ b/doublets/tests/parallel.rs @@ -0,0 +1,141 @@ +#[cfg(feature = "rayon")] +use doublets::{split, unit, Doublets, DoubletsExt, Error, Link, Links}; +#[cfg(feature = "rayon")] +use mem::Global; +#[cfg(feature = "rayon")] +use std::collections::HashSet; +#[cfg(feature = "rayon")] +use rayon::prelude::*; + +#[cfg(feature = "rayon")] +#[test] +fn unit_par_iter() -> Result<(), Error> { + let mut store = unit::Store::::new(Global::new())?; + + let a = store.create_point()?; + let b = store.create_point()?; + store.create_link(a, b)?; + + let par_results: HashSet<_> = store.par_iter().collect(); + let seq_results: HashSet<_> = store.iter().collect(); + + assert_eq!(par_results, seq_results); + assert_eq!( + par_results, + vec![Link::new(1, 1, 1), Link::new(2, 2, 2), Link::new(3, 1, 2),] + .into_iter() + .collect() + ); + + Ok(()) +} + +#[cfg(feature = "rayon")] +#[test] +fn unit_par_each_iter() -> Result<(), Error> { + let mut store = unit::Store::::new(Global::new())?; + + store.create_link(1, 1)?; + store.create_link(2, 1)?; + store.create_link(3, 1)?; + store.create_link(4, 2)?; + store.create_link(5, 3)?; + + let any = store.constants().any; + + // Test parallel vs sequential consistency for target queries + let par_results: HashSet<_> = store.par_each_iter([any, any, 1]).collect(); + let seq_results: HashSet<_> = store.each_iter([any, any, 1]).collect(); + assert_eq!(par_results, seq_results); + + // Test parallel vs sequential consistency for source queries + let par_results: HashSet<_> = store.par_each_iter([any, 2, any]).collect(); + let seq_results: HashSet<_> = store.each_iter([any, 2, any]).collect(); + assert_eq!(par_results, seq_results); + + // Test parallel vs sequential consistency for all links + let par_results: HashSet<_> = store.par_each_iter([any, any, any]).collect(); + let seq_results: HashSet<_> = store.each_iter([any, any, any]).collect(); + assert_eq!(par_results, seq_results); + + Ok(()) +} + +#[cfg(feature = "rayon")] +#[test] +fn split_par_iter() -> Result<(), Error> { + let mut store = split::Store::::new(Global::new(), Global::new())?; + + let a = store.create_point()?; + let b = store.create_point()?; + store.create_link(a, b)?; + + let par_results: HashSet<_> = store.par_iter().collect(); + let seq_results: HashSet<_> = store.iter().collect(); + + assert_eq!(par_results, seq_results); + assert_eq!( + par_results, + vec![Link::new(1, 1, 1), Link::new(2, 2, 2), Link::new(3, 1, 2),] + .into_iter() + .collect() + ); + + Ok(()) +} + +#[cfg(feature = "rayon")] +#[test] +fn split_par_each_iter() -> Result<(), Error> { + let mut store = split::Store::::new(Global::new(), Global::new())?; + + store.create_link(1, 1)?; + store.create_link(2, 1)?; + store.create_link(3, 1)?; + store.create_link(4, 2)?; + store.create_link(5, 3)?; + + let any = store.constants().any; + + // Test parallel vs sequential consistency for target queries + let par_results: HashSet<_> = store.par_each_iter([any, any, 1]).collect(); + let seq_results: HashSet<_> = store.each_iter([any, any, 1]).collect(); + assert_eq!(par_results, seq_results); + + // Test parallel vs sequential consistency for source queries + let par_results: HashSet<_> = store.par_each_iter([any, 2, any]).collect(); + let seq_results: HashSet<_> = store.each_iter([any, 2, any]).collect(); + assert_eq!(par_results, seq_results); + + // Test parallel vs sequential consistency for all links + let par_results: HashSet<_> = store.par_each_iter([any, any, any]).collect(); + let seq_results: HashSet<_> = store.each_iter([any, any, any]).collect(); + assert_eq!(par_results, seq_results); + + Ok(()) +} + +#[cfg(feature = "rayon")] +#[test] +fn parallel_performance_test() -> Result<(), Error> { + let mut store = unit::Store::::new(Global::new())?; + + // Create a larger dataset for performance testing + for i in 1..=100 { + store.create_link(i, i)?; + if i > 1 { + store.create_link(i, i - 1)?; + } + } + + let any = store.constants().any; + + // Just ensure both methods return the same results + let par_results: HashSet<_> = store.par_each_iter([any, any, any]).collect(); + let seq_results: HashSet<_> = store.each_iter([any, any, any]).collect(); + + assert_eq!(par_results, seq_results); + assert!(par_results.len() > 100); // Should have many links + + Ok(()) +} \ No newline at end of file