Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions doublets/src/data/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,23 @@

#[cfg(feature = "rayon")]
fn par_each_iter(&self, query: impl ToQuery<T>) -> Self::IdxParIter {
let mut vec = Vec::with_capacity(self.count_by(query.to_query()).as_usize());
self.each_by(query, |link| {
// Use bump allocator for efficient parallel memory management
let bump = Bump::new();
let query = query.to_query();
let estimated_count = self.count_by(&query).as_usize();

Check failure on line 644 in doublets/src/data/traits.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `&Query<'_, T>: ToQuery<T>` is not satisfied

// Allocate vec in bump for efficient parallel collection
let mut vec = bumpalo::collections::Vec::with_capacity_in(estimated_count, &bump);

// Use optimized parallel traversal if possible, fall back to sequential
self.each_by(&query, |link| {

Check failure on line 650 in doublets/src/data/traits.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `&Query<'_, T>: ToQuery<T>` is not satisfied
vec.push(link);
Flow::Continue
});
vec.into_par_iter()

// Convert to parallel iterator
let results: Vec<Link<T>> = vec.into_iter().collect();
results.into_par_iter()
}

type ImplIter = Self::ImplIterEach;
Expand Down
12 changes: 12 additions & 0 deletions doublets/src/mem/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ pub trait LinksTree<T: LinkType> {

fn each_usages<H: FnMut(Link<T>) -> Flow + ?Sized>(&self, root: T, handler: &mut H) -> Flow;

#[cfg(feature = "rayon")]
fn par_each_usages(&self, root: T, buf: &mut bumpalo::collections::Vec<'_, Link<T>>) -> Result<(), ()> {
// Default implementation falls back to sequential
match self.each_usages(root, &mut |link| {
buf.push(link);
Flow::Continue
}) {
Flow::Continue => Ok(()),
Flow::Break => Err(()),
}
}

fn detach(&mut self, root: &mut T, index: T);

fn attach(&mut self, root: &mut T, index: T);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,65 @@
Flow::Continue
}

#[cfg(feature = "rayon")]
fn par_each_usages_core<T: LinkType>(
this: &LinksSourcesRecursionlessSizeBalancedTree<T>,
base: T,
link: T,
buf: &mut bumpalo::collections::Vec<'_, Link<T>>,
) -> Result<(), ()> {
use bumpalo::Bump;
use rayon::prelude::*;

Check warning on line 137 in doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

unused import: `rayon::prelude`

unsafe {
if link == T::funty(0) {
return Ok(());
}
let link_base_part = this.get_base_part(link);

if link_base_part > base {
par_each_usages_core(this, base, this.get_left_or_default(link), buf)?;
} else if link_base_part < base {
par_each_usages_core(this, base, this.get_right_or_default(link), buf)?;
} else {
// Process current node
buf.push(this.get_link_value(link));

// Determine if we should parallelize based on subtree size
let left_link = this.get_left_or_default(link);
let right_link = this.get_right_or_default(link);

// Simple heuristic: parallelize if both subtrees exist
if left_link != T::funty(0) && right_link != T::funty(0) {
// Create temporary bump for parallel work
let left_bump = Bump::new();
let right_bump = Bump::new();
let mut left_buf = bumpalo::collections::Vec::new_in(&left_bump);
let mut right_buf = bumpalo::collections::Vec::new_in(&right_bump);

// Parallel execution using rayon::join
let (left_result, right_result) = rayon::join(
|| par_each_usages_core(this, base, left_link, &mut left_buf),
|| par_each_usages_core(this, base, right_link, &mut right_buf),
);

// Handle results
left_result?;
right_result?;

// Merge results into main buffer
buf.extend(left_buf.iter().copied());

Check failure on line 176 in doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied

Check failure on line 176 in doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied
buf.extend(right_buf.iter().copied());

Check failure on line 177 in doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied

Check failure on line 177 in doublets/src/mem/unit/generic/sources_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied
} else {
// Sequential execution for small subtrees
par_each_usages_core(this, base, left_link, buf)?;
par_each_usages_core(this, base, right_link, buf)?;
}
}
}
Ok(())
}

impl<T: LinkType> LinksTree<T> for LinksSourcesRecursionlessSizeBalancedTree<T> {
fn count_usages(&self, link: T) -> T {
unsafe {
Expand Down Expand Up @@ -184,6 +243,11 @@
each_usages_core(self, root, self.get_tree_root(), handler)
}

#[cfg(feature = "rayon")]
fn par_each_usages(&self, root: T, buf: &mut bumpalo::collections::Vec<'_, Link<T>>) -> Result<(), ()> {
par_each_usages_core(self, root, self.get_tree_root(), buf)
}

fn detach(&mut self, root: &mut T, index: T) {
unsafe { NoRecurSzbTree::detach(self, root as *mut _, index) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,65 @@
Flow::Continue
}

#[cfg(feature = "rayon")]
fn par_each_usages_core<T: LinkType>(
this: &LinksTargetsRecursionlessSizeBalancedTree<T>,
base: T,
link: T,
buf: &mut bumpalo::collections::Vec<'_, Link<T>>,
) -> Result<(), ()> {
use bumpalo::Bump;
use rayon::prelude::*;

Check warning on line 137 in doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

unused import: `rayon::prelude`

if link == T::funty(0) {
return Ok(());
}
unsafe {
let link_base_part = this.get_base_part(link);

if link_base_part > base {
par_each_usages_core(this, base, this.get_left_or_default(link), buf)?;
} else if link_base_part < base {
par_each_usages_core(this, base, this.get_right_or_default(link), buf)?;
} else {
// Process current node
buf.push(this.get_link_value(link));

// Determine if we should parallelize based on subtree size
let left_link = this.get_left_or_default(link);
let right_link = this.get_right_or_default(link);

// Simple heuristic: parallelize if both subtrees exist
if left_link != T::funty(0) && right_link != T::funty(0) {
// Create temporary bump for parallel work
let left_bump = Bump::new();
let right_bump = Bump::new();
let mut left_buf = bumpalo::collections::Vec::new_in(&left_bump);
let mut right_buf = bumpalo::collections::Vec::new_in(&right_bump);

// Parallel execution using rayon::join
let (left_result, right_result) = rayon::join(
|| par_each_usages_core(this, base, left_link, &mut left_buf),
|| par_each_usages_core(this, base, right_link, &mut right_buf),
);

// Handle results
left_result?;
right_result?;

// Merge results into main buffer
buf.extend(left_buf.iter().copied());

Check failure on line 176 in doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied

Check failure on line 176 in doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied
buf.extend(right_buf.iter().copied());

Check failure on line 177 in doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied

Check failure on line 177 in doublets/src/mem/unit/generic/targets_recursionless_size_balanced_tree.rs

View workflow job for this annotation

GitHub Actions / Benchmark

the trait bound `Link<T>: Copy` is not satisfied
} else {
// Sequential execution for small subtrees
par_each_usages_core(this, base, left_link, buf)?;
par_each_usages_core(this, base, right_link, buf)?;
}
}
}
Ok(())
}

impl<T: LinkType> LinksTree<T> for LinksTargetsRecursionlessSizeBalancedTree<T> {
fn count_usages(&self, link: T) -> T {
unsafe {
Expand Down Expand Up @@ -184,6 +243,11 @@
each_usages_core(self, root, self.get_tree_root(), handler)
}

#[cfg(feature = "rayon")]
fn par_each_usages(&self, root: T, buf: &mut bumpalo::collections::Vec<'_, Link<T>>) -> Result<(), ()> {
par_each_usages_core(self, root, self.get_tree_root(), buf)
}

fn detach(&mut self, root: &mut T, index: T) {
unsafe { NoRecurSzbTree::detach(self, root as *mut _, index) }
}
Expand Down
141 changes: 141 additions & 0 deletions doublets/tests/parallel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#[cfg(feature = "rayon")]
use doublets::{split, unit, Doublets, DoubletsExt, Error, Link, Links};
#[cfg(feature = "rayon")]
use mem::Global;
#[cfg(feature = "rayon")]
use std::collections::HashSet;
#[cfg(feature = "rayon")]
use rayon::prelude::*;

#[cfg(feature = "rayon")]
#[test]
fn unit_par_iter() -> Result<(), Error<usize>> {
let mut store = unit::Store::<usize, _>::new(Global::new())?;

let a = store.create_point()?;
let b = store.create_point()?;
store.create_link(a, b)?;

let par_results: HashSet<_> = store.par_iter().collect();
let seq_results: HashSet<_> = store.iter().collect();

assert_eq!(par_results, seq_results);
assert_eq!(
par_results,
vec![Link::new(1, 1, 1), Link::new(2, 2, 2), Link::new(3, 1, 2),]
.into_iter()
.collect()
);

Ok(())
}

#[cfg(feature = "rayon")]
#[test]
fn unit_par_each_iter() -> Result<(), Error<usize>> {
let mut store = unit::Store::<usize, _>::new(Global::new())?;

store.create_link(1, 1)?;
store.create_link(2, 1)?;
store.create_link(3, 1)?;
store.create_link(4, 2)?;
store.create_link(5, 3)?;

let any = store.constants().any;

// Test parallel vs sequential consistency for target queries
let par_results: HashSet<_> = store.par_each_iter([any, any, 1]).collect();
let seq_results: HashSet<_> = store.each_iter([any, any, 1]).collect();
assert_eq!(par_results, seq_results);

// Test parallel vs sequential consistency for source queries
let par_results: HashSet<_> = store.par_each_iter([any, 2, any]).collect();
let seq_results: HashSet<_> = store.each_iter([any, 2, any]).collect();
assert_eq!(par_results, seq_results);

// Test parallel vs sequential consistency for all links
let par_results: HashSet<_> = store.par_each_iter([any, any, any]).collect();
let seq_results: HashSet<_> = store.each_iter([any, any, any]).collect();
assert_eq!(par_results, seq_results);

Ok(())
}

#[cfg(feature = "rayon")]
#[test]
fn split_par_iter() -> Result<(), Error<usize>> {
let mut store = split::Store::<usize, _, _>::new(Global::new(), Global::new())?;

let a = store.create_point()?;
let b = store.create_point()?;
store.create_link(a, b)?;

let par_results: HashSet<_> = store.par_iter().collect();
let seq_results: HashSet<_> = store.iter().collect();

assert_eq!(par_results, seq_results);
assert_eq!(
par_results,
vec![Link::new(1, 1, 1), Link::new(2, 2, 2), Link::new(3, 1, 2),]
.into_iter()
.collect()
);

Ok(())
}

#[cfg(feature = "rayon")]
#[test]
fn split_par_each_iter() -> Result<(), Error<usize>> {
let mut store = split::Store::<usize, _, _>::new(Global::new(), Global::new())?;

store.create_link(1, 1)?;
store.create_link(2, 1)?;
store.create_link(3, 1)?;
store.create_link(4, 2)?;
store.create_link(5, 3)?;

let any = store.constants().any;

// Test parallel vs sequential consistency for target queries
let par_results: HashSet<_> = store.par_each_iter([any, any, 1]).collect();
let seq_results: HashSet<_> = store.each_iter([any, any, 1]).collect();
assert_eq!(par_results, seq_results);

// Test parallel vs sequential consistency for source queries
let par_results: HashSet<_> = store.par_each_iter([any, 2, any]).collect();
let seq_results: HashSet<_> = store.each_iter([any, 2, any]).collect();
assert_eq!(par_results, seq_results);

// Test parallel vs sequential consistency for all links
let par_results: HashSet<_> = store.par_each_iter([any, any, any]).collect();
let seq_results: HashSet<_> = store.each_iter([any, any, any]).collect();
assert_eq!(par_results, seq_results);

Ok(())
}

#[cfg(feature = "rayon")]
#[test]
fn parallel_performance_test() -> Result<(), Error<usize>> {
let mut store = unit::Store::<usize, _>::new(Global::new())?;

// Create a larger dataset for performance testing
for i in 1..=100 {
store.create_link(i, i)?;
if i > 1 {
store.create_link(i, i - 1)?;
}
}

let any = store.constants().any;

// Just ensure both methods return the same results
let par_results: HashSet<_> = store.par_each_iter([any, any, any]).collect();
let seq_results: HashSet<_> = store.each_iter([any, any, any]).collect();

assert_eq!(par_results, seq_results);
assert!(par_results.len() > 100); // Should have many links

Ok(())
}
Loading