Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(turbo-tasks): Port task statistics to the new backend #75303

Open
wants to merge 2 commits into
base: canary
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 16 additions & 22 deletions crates/napi/src/next_api/project.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc, thread, time::Duration};
use std::{io::Write, path::PathBuf, sync::Arc, thread, time::Duration};

use anyhow::{anyhow, bail, Context, Result};
use napi::{
Expand Down Expand Up @@ -382,27 +382,21 @@
persistent_caching,
memory_limit,
)?;
if !persistent_caching {
use std::io::Write;
let stats_path = std::env::var_os("NEXT_TURBOPACK_TASK_STATISTICS");
if let Some(stats_path) = stats_path {
let Some(backend) = turbo_tasks.memory_backend() else {
return Err(anyhow!("task statistics require a memory backend").into());
};
let task_stats = backend.task_statistics().enable().clone();
exit.on_exit(async move {
tokio::task::spawn_blocking(move || {
let mut file = std::fs::File::create(&stats_path)
.with_context(|| format!("failed to create or open {stats_path:?}"))?;
serde_json::to_writer(&file, &task_stats)
.context("failed to serialize or write task statistics")?;
file.flush().context("failed to flush file")
})
.await
.unwrap()
.unwrap();
});
}
let stats_path = std::env::var_os("NEXT_TURBOPACK_TASK_STATISTICS");
if let Some(stats_path) = stats_path {
let task_stats = turbo_tasks.task_statistics().enable().clone();
exit.on_exit(async move {
tokio::task::spawn_blocking(move || {
let mut file = std::fs::File::create(&stats_path)
.with_context(|| format!("failed to create or open {stats_path:?}"))?;
serde_json::to_writer(&file, &task_stats)
.context("failed to serialize or write task statistics")?;
file.flush().context("failed to flush file")
})
.await
.unwrap()
.unwrap();
});
}
let options: ProjectOptions = options.into();
let container = turbo_tasks
Expand Down Expand Up @@ -1041,7 +1035,7 @@
}
}

/// Subscribes to lifecycle events of the compilation.

Check warning on line 1038 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`

Check warning on line 1038 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 1038 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::End`

Check warning on line 1038 in crates/napi/src/next_api/project.rs

View workflow job for this annotation

GitHub Actions / rustdoc check / build

public documentation for `project_update_info_subscribe` links to private item `UpdateMessage::Start`
///
/// Emits an [UpdateMessage::Start] event when any computation starts.
/// Emits an [UpdateMessage::End] event when there was no computation for the
Expand Down
15 changes: 8 additions & 7 deletions crates/napi/src/next_api/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use napi::{
};
use serde::Serialize;
use turbo_tasks::{
trace::TraceRawVcs, OperationVc, ReadRef, TaskId, TryJoinIterExt, TurboTasks, UpdateInfo, Vc,
task_statistics::TaskStatisticsApi, trace::TraceRawVcs, OperationVc, ReadRef, TaskId,
TryJoinIterExt, TurboTasks, TurboTasksApi, UpdateInfo, Vc,
};
use turbo_tasks_backend::{
default_backing_storage, noop_backing_storage, DefaultBackingStorage, NoopBackingStorage,
Expand Down Expand Up @@ -108,17 +109,17 @@ impl NextTurboTasks {
}
}

pub fn memory_backend(&self) -> Option<&turbo_tasks_memory::MemoryBackend> {
pub async fn stop_and_wait(&self) {
match self {
NextTurboTasks::Memory(_) => None,
NextTurboTasks::PersistentCaching(_) => None,
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.stop_and_wait().await,
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.stop_and_wait().await,
}
}

pub async fn stop_and_wait(&self) {
pub fn task_statistics(&self) -> &TaskStatisticsApi {
match self {
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.stop_and_wait().await,
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.stop_and_wait().await,
NextTurboTasks::Memory(turbo_tasks) => turbo_tasks.task_statistics(),
NextTurboTasks::PersistentCaching(turbo_tasks) => turbo_tasks.task_statistics(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions turbopack/crates/turbo-tasks-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ turbo-tasks-testing = { workspace = true }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio"] }
regex = { workspace = true }
serde_json = { workspace = true }

[build-dependencies]
turbo-tasks-build = { workspace = true }
Expand Down
25 changes: 22 additions & 3 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use turbo_tasks::{
},
event::{Event, EventListener},
registry,
task_statistics::TaskStatisticsApi,
util::IdFactoryWithReuse,
CellId, FunctionId, RawVc, ReadConsistency, SessionId, TaskId, TraitTypeId,
TurboTasksBackendApi, ValueTypeId, TRANSIENT_TASK_BIT,
Expand Down Expand Up @@ -178,6 +179,8 @@ struct TurboTasksBackendInner<B: BackingStorage> {
idle_start_event: Event,
idle_end_event: Event,

task_statistics: TaskStatisticsApi,

backing_storage: B,
}

Expand Down Expand Up @@ -222,6 +225,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
stopping_event: Event::new(|| "TurboTasksBackend::stopping_event".to_string()),
idle_start_event: Event::new(|| "TurboTasksBackend::idle_start_event".to_string()),
idle_end_event: Event::new(|| "TurboTasksBackend::idle_end_event".to_string()),
task_statistics: TaskStatisticsApi::default(),
backing_storage,
}
}
Expand Down Expand Up @@ -339,6 +343,16 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
fn should_track_children(&self) -> bool {
self.options.children_tracking
}

fn track_cache_hit(&self, task_type: &CachedTaskType) {
self.task_statistics
.map(|stats| stats.increment_cache_hit(task_type.fn_type));
}

fn track_cache_miss(&self, task_type: &CachedTaskType) {
self.task_statistics
.map(|stats| stats.increment_cache_miss(task_type.fn_type));
}
}

pub(crate) struct OperationGuard<'a, B: BackingStorage> {
Expand Down Expand Up @@ -813,10 +827,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
) -> TaskId {
if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
self.track_cache_hit(&task_type);
self.connect_child(parent_task, task_id, turbo_tasks);
return task_id;
}

self.track_cache_miss(&task_type);
let tx = self
.should_restore()
.then(|| self.backing_storage.start_read_transaction())
Expand Down Expand Up @@ -871,24 +887,23 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
);
}
if let Some(task_id) = self.task_cache.lookup_forward(&task_type) {
// Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
self.track_cache_hit(&task_type);
self.connect_child(parent_task, task_id, turbo_tasks);
return task_id;
}

self.track_cache_miss(&task_type);
let task_type = Arc::new(task_type);
let task_id = self.transient_task_id_factory.get();
if let Err(existing_task_id) = self.task_cache.try_insert(task_type, task_id) {
// Safety: We just created the id and failed to insert it.
unsafe {
self.transient_task_id_factory.reuse(task_id);
}
// Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
self.connect_child(parent_task, existing_task_id, turbo_tasks);
return existing_task_id;
}

// Safety: `tx` is a valid transaction from `self.backend.backing_storage`.
self.connect_child(parent_task, task_id, turbo_tasks);

task_id
Expand Down Expand Up @@ -1962,6 +1977,10 @@ impl<B: BackingStorage> Backend for TurboTasksBackend<B> {
fn dispose_root_task(&self, task_id: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
self.0.dispose_root_task(task_id, turbo_tasks);
}

fn task_statistics(&self) -> &TaskStatisticsApi {
&self.0.task_statistics
}
}

// from https://github.com/tokio-rs/tokio/blob/29cd6ec1ec6f90a7ee1ad641c03e0e00badbcb0e/tokio/src/time/instant.rs#L57-L63
Expand Down
2 changes: 0 additions & 2 deletions turbopack/crates/turbo-tasks-memory/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,5 @@ mod map_guard;
mod memory_backend;
mod output;
mod task;
mod task_statistics;

pub use memory_backend::MemoryBackend;
pub use task_statistics::{TaskStatistics, TaskStatisticsApi};
14 changes: 7 additions & 7 deletions turbopack/crates/turbo-tasks-memory/src/memory_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use turbo_tasks::{
TransientTaskType, TypedCellContent,
},
event::EventListener,
task_statistics::TaskStatisticsApi,
util::{IdFactoryWithReuse, NoMoveVec},
CellId, FunctionId, RawVc, ReadConsistency, TaskId, TaskIdSet, TraitTypeId,
TurboTasksBackendApi, Unused, ValueTypeId, TRANSIENT_TASK_BIT,
Expand All @@ -36,7 +37,6 @@ use crate::{
},
output::Output,
task::{ReadCellError, Task, TaskType},
task_statistics::TaskStatisticsApi,
};

fn prehash_task_type(task_type: CachedTaskType) -> PreHashed<CachedTaskType> {
Expand Down Expand Up @@ -333,16 +333,12 @@ impl MemoryBackend {
}
}

pub fn task_statistics(&self) -> &TaskStatisticsApi {
&self.task_statistics
}

fn track_cache_hit(&self, task_type: &PreHashed<CachedTaskType>) {
fn track_cache_hit(&self, task_type: &CachedTaskType) {
self.task_statistics()
.map(|stats| stats.increment_cache_hit(task_type.fn_type));
}

fn track_cache_miss(&self, task_type: &PreHashed<CachedTaskType>) {
fn track_cache_miss(&self, task_type: &CachedTaskType) {
self.task_statistics()
.map(|stats| stats.increment_cache_miss(task_type.fn_type));
}
Expand Down Expand Up @@ -777,6 +773,10 @@ impl Backend for MemoryBackend {
fn dispose_root_task(&self, task: TaskId, turbo_tasks: &dyn TurboTasksBackendApi<Self>) {
Task::unset_root(task, self, turbo_tasks);
}

fn task_statistics(&self) -> &TaskStatisticsApi {
&self.task_statistics
}
}

pub(crate) enum Job {
Expand Down
Loading
Loading