Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion turbopack/crates/turbo-tasks/src/parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use crate::{
scope::scope_and_block,
util::{good_chunk_size, into_chunks},
util::{Chunk, good_chunk_size, into_chunks},
};

struct Chunked {
Expand Down Expand Up @@ -231,6 +231,32 @@ where
.collect()
}

pub fn map_collect_chunked_owned<'l, Item, PerItemResult, Result>(
items: Vec<Item>,
f: impl Fn(Chunk<Item>) -> PerItemResult + Send + Sync,
) -> Result
where
Item: Send + Sync,
PerItemResult: Send + Sync + 'l,
Result: FromIterator<PerItemResult>,
{
let Some(Chunked {
chunk_size,
chunk_count,
}) = get_chunked(items.len())
else {
let len = items.len();
return Result::from_iter(into_chunks(items, len).map(f));
};
let f = &f;
scope_and_block(chunk_count, |scope| {
for chunk in into_chunks(items, chunk_size) {
scope.spawn(move || f(chunk))
}
})
.collect()
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
201 changes: 115 additions & 86 deletions turbopack/crates/turbopack-core/src/module_graph/merged_modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,9 @@ pub async fn compute_merged_modules(module_graph: Vc<ModuleGraph>) -> Result<Vc<

{
struct ChunkGroupResult {
chunk_group_idx: usize,
lists: Vec<Vec<ResolvedVc<Box<dyn MergeableModule>>>>,
first_chunk_group_idx: usize,
#[allow(clippy::type_complexity)]
list_lists: Vec<Vec<Vec<ResolvedVc<Box<dyn MergeableModule>>>>>,
lists_reverse_indices:
FxIndexMap<ResolvedVc<Box<dyn MergeableModule>>, FxIndexSet<ListOccurrence>>,
#[allow(clippy::type_complexity)]
Expand All @@ -289,26 +290,13 @@ pub async fn compute_merged_modules(module_graph: Vc<ModuleGraph>) -> Result<Vc<
FxIndexSet<ResolvedVc<Box<dyn Module>>>,
>,
}
let span = tracing::info_span!("map chunk groups").entered();

let result = turbo_tasks::parallel::map_collect_owned::<_, _, Result<Vec<_>>>(
let result = turbo_tasks::parallel::map_collect_chunked_owned::<_, _, Result<Vec<_>>>(
// TODO without collect
chunk_group_info.chunk_groups.iter().enumerate().collect(),
|(chunk_group_idx, chunk_group)| {
// A partition of all modules in the chunk into several execution traces
// (orderings), stored in the top-level lists and referenced here by
// index.
let mut chunk_lists: FxHashMap<&RoaringBitmapWrapper, usize> =
FxHashMap::with_capacity_and_hasher(
module_merged_groups.len() / chunk_group_info.chunk_groups.len(),
Default::default(),
);

// This is necessary to have the correct order with cycles: a `a -> b -> a`
// graph would otherwise be visited as `b->a`, `a->b`,
// leading to the list `a, b` which is not execution order.
let mut visited = FxHashSet::default();

let mut lists = vec![];
|chunk| {
let mut list_lists = vec![];
let mut lists_reverse_indices: FxIndexMap<
ResolvedVc<Box<dyn MergeableModule>>,
FxIndexSet<ListOccurrence>,
Expand All @@ -319,92 +307,133 @@ pub async fn compute_merged_modules(module_graph: Vc<ModuleGraph>) -> Result<Vc<
FxIndexSet<ResolvedVc<Box<dyn Module>>>,
> = FxIndexMap::default();

module_graph.traverse_edges_from_entries_dfs(
chunk_group.entries(),
&mut (),
|parent_info, node, _| {
if parent_info.is_none_or(|(_, r)| r.chunking_type.is_parallel())
&& visited.insert(node.module)
{
Ok(GraphTraversalAction::Continue)
} else {
Ok(GraphTraversalAction::Exclude)
}
},
|parent_info, node, _| {
let module = node.module;
let bitmap = module_merged_groups
.get(&module)
.context("every module should have a bitmap at this point")?;

if mergeable.contains(&module) {
let mergeable_module =
ResolvedVc::try_downcast::<Box<dyn MergeableModule>>(module)
let mut chunk = chunk.peekable();
let first_chunk_group_idx = chunk.peek().unwrap().0;

for (chunk_group_idx, chunk_group) in chunk {
let mut lists = vec![];

// A partition of all modules in the chunk into several execution traces
// (orderings), stored in the top-level lists and referenced here by
// index.
let mut chunk_lists: FxHashMap<&RoaringBitmapWrapper, usize> =
FxHashMap::with_capacity_and_hasher(
module_merged_groups.len() / chunk_group_info.chunk_groups.len(),
Default::default(),
);

// This is necessary to have the correct order with cycles: a `a -> b -> a`
// graph would otherwise be visited as `b->a`, `a->b`,
// leading to the list `a, b` which is not execution order.
let mut visited = FxHashSet::default();

module_graph.traverse_edges_from_entries_dfs(
chunk_group.entries(),
&mut (),
|parent_info, node, _| {
if parent_info.is_none_or(|(_, r)| r.chunking_type.is_parallel())
&& visited.insert(node.module)
{
Ok(GraphTraversalAction::Continue)
} else {
Ok(GraphTraversalAction::Exclude)
}
},
|parent_info, node, _| {
let module = node.module;
let bitmap = module_merged_groups
.get(&module)
.context("every module should have a bitmap at this point")?;

if mergeable.contains(&module) {
let mergeable_module =
ResolvedVc::try_downcast::<Box<dyn MergeableModule>>(
module,
)
.unwrap();
match chunk_lists.entry(bitmap) {
Entry::Vacant(e) => {
// New list, insert the module
let idx = lists.len();
e.insert(idx);
lists.push(vec![mergeable_module]);
lists_reverse_indices
.entry(mergeable_module)
.or_default()
.insert(ListOccurrence {
chunk_group: chunk_group_idx,
list: idx,
entry: 0,
});
}
Entry::Occupied(e) => {
let list_idx = *e.get();
let list = &mut lists[list_idx];
list.push(mergeable_module);
lists_reverse_indices
.entry(mergeable_module)
.or_default()
.insert(ListOccurrence {
chunk_group: chunk_group_idx,
list: list_idx,
entry: list.len() - 1,
});
match chunk_lists.entry(bitmap) {
Entry::Vacant(e) => {
// New list, insert the module
let idx = lists.len();
e.insert(idx);
lists.push(vec![mergeable_module]);
lists_reverse_indices
.entry(mergeable_module)
.or_default()
.insert(ListOccurrence {
chunk_group: chunk_group_idx,
list: idx,
entry: 0,
});
}
Entry::Occupied(e) => {
let list_idx = *e.get();
let list = &mut lists[list_idx];
list.push(mergeable_module);
lists_reverse_indices
.entry(mergeable_module)
.or_default()
.insert(ListOccurrence {
chunk_group: chunk_group_idx,
list: list_idx,
entry: list.len() - 1,
});
}
}
}
}

if let Some((parent, _)) = parent_info {
let same_bitmap = module_merged_groups.get(&parent.module).unwrap()
== module_merged_groups.get(&module).unwrap();
if let Some((parent, _)) = parent_info {
let same_bitmap =
module_merged_groups.get(&parent.module).unwrap()
== module_merged_groups.get(&module).unwrap();

if same_bitmap {
intra_group_references_rev
.entry(module)
.or_default()
.insert(parent.module);
if same_bitmap {
intra_group_references_rev
.entry(module)
.or_default()
.insert(parent.module);
}
}
}
Ok(())
},
)?;
Ok(())
},
)?;

list_lists.push(lists);
}
Ok(ChunkGroupResult {
chunk_group_idx,
lists,
first_chunk_group_idx,
list_lists,
lists_reverse_indices,
intra_group_references_rev,
})
},
)?;

lists = vec![Default::default(); result.len() + 1];
drop(span);
let _span = tracing::info_span!("merging chunk group lists").entered();

lists_reverse_indices
.reserve_exact(result.iter().map(|r| r.lists_reverse_indices.len()).sum());
intra_group_references_rev.reserve_exact(
result
.iter()
.map(|r| r.intra_group_references_rev.len())
.sum(),
);

lists = vec![Default::default(); chunk_group_info.chunk_groups.len() + 1];
LISTS_COMMON_IDX = result.len();
for ChunkGroupResult {
chunk_group_idx,
lists: result_lists,
first_chunk_group_idx,
list_lists: result_lists,
lists_reverse_indices: result_lists_reverse_indices,
intra_group_references_rev: result_intra_group_references_rev,
} in result
{
lists[chunk_group_idx] = result_lists;
lists.splice(
first_chunk_group_idx..(first_chunk_group_idx + result_lists.len()),
result_lists,
);
for (module, occurrences) in result_lists_reverse_indices {
lists_reverse_indices
.entry(module)
Expand Down
Loading