Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 48 additions & 19 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,19 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
self.assert_not_persistent_calling_transient(reader, task_id, /* cell_id */ None);

let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::All);
let (mut task, reader_task) = if self.should_track_dependencies()
&& !matches!(options.tracking, ReadTracking::Untracked)
&& let Some(reader_id) = reader
&& reader_id != task_id
{
// Having a task_pair here is not optimal, but otherwise this would lead to a race
// condition. See below.
// TODO(sokra): solve that in a more performant way.
let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::All);
(task, Some(reader))
} else {
(ctx.task(task_id, TaskDataCategory::All), None)
};

fn listen_to_done_event<B: BackingStorage>(
this: &TurboTasksBackendInner<B>,
Expand Down Expand Up @@ -710,18 +722,22 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
)))
}
};
if self.should_track_dependencies()
&& let Some(reader) = reader
if let Some(mut reader_task) = reader_task
&& options.tracking.should_track(result.is_err())
&& (!task.is_immutable() || cfg!(feature = "verify_immutable"))
{
let reader = reader.unwrap();
let _ = task.add(CachedDataItem::OutputDependent {
task: reader,
value: (),
});
drop(task);

let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
// Note: We use `task_pair` earlier to lock the task and its reader at the same
// time. If we didn't and just locked the reader here, an invalidation could occur
// between grabbing the locks. If that happened, and if the task is "outdated" or
// doesn't have the dependency edge yet, the invalidation would be lost.

if reader_task
.remove(&CachedDataItemKey::OutdatedOutputDependency { target: task_id })
.is_none()
Expand All @@ -735,6 +751,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

return result;
}
drop(reader_task);

let note = move || {
let reader_desc = reader.map(|r| self.get_task_desc_fn(r));
Expand Down Expand Up @@ -771,29 +788,28 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
) -> Result<Result<TypedCellContent, EventListener>> {
self.assert_not_persistent_calling_transient(reader, task_id, Some(cell));

fn add_cell_dependency<B: BackingStorage>(
backend: &TurboTasksBackendInner<B>,
fn add_cell_dependency(
task_id: TaskId,
mut task: impl TaskGuard,
reader: Option<TaskId>,
reader_task: Option<impl TaskGuard>,
cell: CellId,
task_id: TaskId,
ctx: &mut impl ExecuteContext<'_>,
) {
if backend.should_track_dependencies()
&& let Some(reader) = reader
// We never want to have a dependency on ourselves, otherwise we end up in a
// loop of re-executing the same task.
&& reader != task_id
if let Some(mut reader_task) = reader_task
&& (!task.is_immutable() || cfg!(feature = "verify_immutable"))
{
let _ = task.add(CachedDataItem::CellDependent {
cell,
task: reader,
task: reader.unwrap(),
value: (),
});
drop(task);

let mut reader_task = ctx.task(reader, TaskDataCategory::Data);
// Note: We use `task_pair` earlier to lock the task and its reader at the same
// time. If we didn't and just locked the reader here, an invalidation could occur
// between grabbing the locks. If that happened, and if the task is "outdated" or
// doesn't have the dependency edge yet, the invalidation would be lost.

let target = CellRef {
task: task_id,
cell,
Expand All @@ -808,7 +824,20 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
}

let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::Data);
let (mut task, reader_task) = if self.should_track_dependencies()
&& !matches!(options.tracking, ReadTracking::Untracked)
&& let Some(reader_id) = reader
&& reader_id != task_id
{
// Having a task_pair here is not optimal, but otherwise this would lead to a race
// condition. See below.
// TODO(sokra): solve that in a more performant way.
let (task, reader) = ctx.task_pair(task_id, reader_id, TaskDataCategory::Data);
(task, Some(reader))
} else {
(ctx.task(task_id, TaskDataCategory::Data), None)
};

let content = if options.final_read_hint {
remove!(task, CellData { cell })
} else if let Some(content) = get!(task, CellData { cell }) {
Expand All @@ -819,7 +848,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
};
if let Some(content) = content {
if options.tracking.should_track(false) {
add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
add_cell_dependency(task_id, task, reader, reader_task, cell);
}
return Ok(Ok(TypedCellContent(
cell.type_id,
Expand All @@ -846,7 +875,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.copied();
let Some(max_id) = max_id else {
if options.tracking.should_track(true) {
add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
add_cell_dependency(task_id, task, reader, reader_task, cell);
}
bail!(
"Cell {cell:?} no longer exists in task {} (no cell of this type exists)",
Expand All @@ -855,7 +884,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
};
if cell.index >= max_id {
if options.tracking.should_track(true) {
add_cell_dependency(self, task, reader, cell, task_id, &mut ctx);
add_cell_dependency(task_id, task, reader, reader_task, cell);
}
bail!(
"Cell {cell:?} no longer exists in task {} (index out of bounds)",
Expand Down
17 changes: 10 additions & 7 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@ enum TransactionState<'a, 'tx, B: BackingStorage> {
}

pub trait ExecuteContext<'e>: Sized {
type TaskGuardImpl: TaskGuard + 'e;
fn child_context<'l, 'r>(&'r self) -> impl ChildExecuteContext<'l> + use<'e, 'l, Self>
where
'e: 'l;
fn session_id(&self) -> SessionId;
fn task(&mut self, task_id: TaskId, category: TaskDataCategory) -> impl TaskGuard + 'e;
fn task(&mut self, task_id: TaskId, category: TaskDataCategory) -> Self::TaskGuardImpl;
fn is_once_task(&self, task_id: TaskId) -> bool;
fn task_pair(
&mut self,
task_id1: TaskId,
task_id2: TaskId,
category: TaskDataCategory,
) -> (impl TaskGuard + 'e, impl TaskGuard + 'e);
) -> (Self::TaskGuardImpl, Self::TaskGuardImpl);
fn schedule(&mut self, task_id: TaskId);
fn schedule_task(&self, task: impl TaskGuard + '_);
fn schedule_task(&self, task: Self::TaskGuardImpl);
fn operation_suspend_point<T>(&mut self, op: &T)
where
T: Clone + Into<AnyOperation>;
Expand Down Expand Up @@ -162,6 +163,8 @@ impl<'e, 'tx, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, '
where
'tx: 'e,
{
type TaskGuardImpl = TaskGuardImpl<'e, B>;

fn child_context<'l, 'r>(&'r self) -> impl ChildExecuteContext<'l> + use<'e, 'tx, 'l, B>
where
'e: 'l,
Expand All @@ -176,7 +179,7 @@ where
self.backend.session_id()
}

fn task(&mut self, task_id: TaskId, category: TaskDataCategory) -> impl TaskGuard + 'e {
fn task(&mut self, task_id: TaskId, category: TaskDataCategory) -> Self::TaskGuardImpl {
let mut task = self.backend.storage.access_mut(task_id);
if !task.state().is_restored(category) {
if task_id.is_transient() {
Expand Down Expand Up @@ -224,7 +227,7 @@ where
task_id1: TaskId,
task_id2: TaskId,
category: TaskDataCategory,
) -> (impl TaskGuard + 'e, impl TaskGuard + 'e) {
) -> (Self::TaskGuardImpl, Self::TaskGuardImpl) {
let (mut task1, mut task2) = self.backend.storage.access_pair_mut(task_id1, task_id2);
let is_restored1 = task1.state().is_restored(category);
let is_restored2 = task2.state().is_restored(category);
Expand Down Expand Up @@ -277,7 +280,7 @@ where
self.schedule_task(task);
}

fn schedule_task(&self, mut task: impl TaskGuard + '_) {
fn schedule_task(&self, mut task: Self::TaskGuardImpl) {
if let Some(tasks_to_prefetch) = task.prefetch() {
self.turbo_tasks
.schedule_backend_background_job(TurboTasksBackendJob::Prefetch {
Expand Down Expand Up @@ -367,7 +370,7 @@ pub trait TaskGuard: Debug {
fn is_immutable(&self) -> bool;
}

struct TaskGuardImpl<'a, B: BackingStorage> {
pub struct TaskGuardImpl<'a, B: BackingStorage> {
task_id: TaskId,
task: StorageWriteGuard<'a>,
backend: &'a TurboTasksBackendInner<B>,
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1761,6 +1761,7 @@ impl CurrentCellRef {
self.current_task,
self.index,
ReadCellOptions {
// INVALIDATION: Reading our own cell must be untracked
tracking: ReadTracking::Untracked,
..Default::default()
},
Expand Down Expand Up @@ -1879,6 +1880,7 @@ impl CurrentCellRef {
self.current_task,
self.index,
ReadCellOptions {
// INVALIDATION: Reading our own cell must be untracked
tracking: ReadTracking::Untracked,
..Default::default()
},
Expand Down
Loading