From c80481501b3b65261a6e48e6127698e6cb58d7ae Mon Sep 17 00:00:00 2001 From: Andrea Campi Date: Mon, 3 Feb 2025 04:19:44 -0800 Subject: [PATCH] Log SCS request stats to ODS3 Summary: This is a simple, direct translation of the same stats we had in ODS. With ODS3 we should be able to add more, possibly most of what we have in Scuba; but that will come in a later diff. Reviewed By: clara-9 Differential Revision: D68956752 fbshipit-source-id: 9f4ed45776368eba9820d7ba09ec39044ec09ef4 --- eden/mononoke/scs/scs_methods/Cargo.toml | 1 + .../scs_methods/src/source_control_impl.rs | 191 ++++++++++++++---- 2 files changed, 158 insertions(+), 34 deletions(-) diff --git a/eden/mononoke/scs/scs_methods/Cargo.toml b/eden/mononoke/scs/scs_methods/Cargo.toml index 4bd40bc49c374..399f093bd000b 100644 --- a/eden/mononoke/scs/scs_methods/Cargo.toml +++ b/eden/mononoke/scs/scs_methods/Cargo.toml @@ -37,6 +37,7 @@ gix-hash = "0.15.1" hooks = { version = "0.1.0", path = "../../hooks" } itertools = "0.14.0" justknobs = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } +lazy_static = "1.4" maplit = "1.0" megarepo_api = { version = "0.1.0", path = "../../megarepo_api" } megarepo_config = { version = "0.1.0", path = "../../megarepo_api/megarepo_config" } diff --git a/eden/mononoke/scs/scs_methods/src/source_control_impl.rs b/eden/mononoke/scs/scs_methods/src/source_control_impl.rs index f121c2bf1abdc..4688598697ae2 100644 --- a/eden/mononoke/scs/scs_methods/src/source_control_impl.rs +++ b/eden/mononoke/scs/scs_methods/src/source_control_impl.rs @@ -39,6 +39,8 @@ use futures_stats::TimedTryStreamExt; use futures_stats::TryStreamStats; use futures_watchdog::WatchdogExt; use identity::Identity; +#[cfg(fbcode_build)] +use lazy_static::lazy_static; use login_objects_thrift::EnvironmentType; use megarepo_api::MegarepoApi; use memory::MemoryStats; @@ -78,6 +80,16 @@ use source_control_services::SourceControlService; use srserver::RequestContext; use stats::prelude::*; use time_ext::DurationExt; +#[cfg(fbcode_build)] +use MononokeScsRequest_ods3::Instrument_MononokeScsRequest; +#[cfg(fbcode_build)] +use MononokeScsRequest_ods3_types::MononokeScsRequest; +#[cfg(fbcode_build)] +use MononokeScsRequest_ods3_types::SCSRequestEvent; +#[cfg(fbcode_build)] +use MononokeScsRequest_ods3_types::SCSRequestOutcome as Outcome; +#[cfg(fbcode_build)] +use MononokeScsRequest_ods3_types::SCSRequestType; use crate::from_request::FromRequest; use crate::scuba_params::AddScubaParams; @@ -92,6 +104,12 @@ const FORWARDED_OTHER_CATS_HEADER: &str = "scm_forwarded_other_cats"; const PER_REQUEST_READ_QPS: usize = 4000; const PER_REQUEST_WRITE_QPS: usize = 4000; +#[cfg(fbcode_build)] +lazy_static! { + static ref SCS_REQUEST_STATS_INSTRUMENT: Instrument_MononokeScsRequest = + Instrument_MononokeScsRequest::new(); +} + define_stats! { prefix = "mononoke.scs_server"; total_request_start: timeseries(Rate, Sum), @@ -186,7 +204,7 @@ impl SourceControlServiceImpl { req_ctxt: &RequestContext, specifier: Option<&dyn SpecifierExt>, params: &dyn AddScubaParams, - ) -> Result<(CoreContext, String), scs_errors::ServiceError> { + ) -> Result<(CoreContext, String, Option), scs_errors::ServiceError> { let session = self.create_session(req_ctxt).await?; let identities = session.metadata().identities(); let mut scuba = self.create_scuba(name, req_ctxt, specifier, params, identities)?; @@ -197,7 +215,13 @@ impl SourceControlServiceImpl { scuba.add("session_uuid", session_uuid.clone()); let ctx = session.new_context_with_scribe(self.logger.clone(), scuba, self.scribe.clone()); - Ok((ctx, session_uuid)) + + let repo_name = if let Some(specifier) = specifier { + specifier.scuba_reponame() + } else { + None + }; + Ok((ctx, session_uuid, repo_name)) } /// Create and configure a scuba sample builder for a request. @@ -669,7 +693,10 @@ fn add_request_end_memory_stats( fn log_result( ctx: CoreContext, tag: &'static str, + request_type: SCSRequestType, + event: SCSRequestEvent, method: &str, + repo: Option, stats: &FutureStats, result: &Result, start_mem_stats: Option<&MemoryStats>, @@ -678,21 +705,38 @@ fn log_result( add_request_end_memory_stats(&mut scuba, method, start_mem_stats); - let (status, error, invalid_request, internal_failure, overloaded) = match result { + let (status, outcome, error, invalid_request, internal_failure, overloaded) = match result { Ok(response) => { response.add_scuba_response(&mut scuba); - ("SUCCESS", None, 0, 0, 0) + ("SUCCESS", Outcome::Success, None, 0, 0, 0) } Err(err) => { let (status, desc) = err.status_and_description(); match status { - Status::RequestError => ("REQUEST_ERROR", Some(desc), 1, 0, 0), - Status::InternalError => ("INTERNAL_ERROR", Some(desc), 0, 1, 0), - Status::OverloadError => ("OVERLOAD_ERROR", Some(desc), 0, 0, 1), - Status::PollError => ("POLL_ERROR", Some(desc), 0, 1, 0), + Status::RequestError => { + ("REQUEST_ERROR", Outcome::RequestError, Some(desc), 1, 0, 0) + } + Status::InternalError => ( + "INTERNAL_ERROR", + Outcome::InternalError, + Some(desc), + 0, + 1, + 0, + ), + Status::OverloadError => ( + "OVERLOAD_ERROR", + Outcome::OverloadError, + Some(desc), + 0, + 0, + 1, + ), + Status::PollError => ("POLL_ERROR", Outcome::PollError, Some(desc), 0, 1, 0), } } }; + if let Ok(true) = justknobs::eval("scm/mononoke:scs_alert_on_methods", None, Some(method)) { STATS::total_method_requests.add_value(1, (method.to_string(),)); if status == "INTERNAL_ERROR" { @@ -728,28 +772,57 @@ fn log_result( scuba.add("error", error.as_str()); } scuba.log_with_msg(tag, None); + + #[cfg(fbcode_build)] + SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest { + method: Some(method.to_string()), + repo, + request_type: Some(request_type), + event: Some(event), + outcome: Some(outcome), + requests: Some(1.0), + duration_ms: Some(stats.completion_time.as_millis_unchecked() as f64), + ..Default::default() + }); } fn log_stream_chunk( ctx: CoreContext, method: &str, + repo: Option, result: &Result, count: u64, ) { let mut scuba = ctx.scuba().clone(); - let (status, error, invalid_request, internal_failure, overloaded) = match result { + let (status, outcome, error, invalid_request, internal_failure, overloaded) = match result { Ok(response) => { response.add_scuba_response(&mut scuba); - ("SUCCESS", None, 0, 0, 0) + ("SUCCESS", Outcome::Success, None, 0, 0, 0) } Err(err) => { let (status, desc) = err.status_and_description(); match status { - Status::RequestError => ("REQUEST_ERROR", Some(desc), 1, 0, 0), - Status::InternalError => ("INTERNAL_ERROR", Some(desc), 0, 1, 0), - Status::OverloadError => ("OVERLOAD_ERROR", Some(desc), 0, 0, 1), - Status::PollError => ("POLL_ERROR", Some(desc), 0, 0, 1), + Status::RequestError => { + ("REQUEST_ERROR", Outcome::RequestError, Some(desc), 1, 0, 0) + } + Status::InternalError => ( + "INTERNAL_ERROR", + Outcome::InternalError, + Some(desc), + 0, + 1, + 0, + ), + Status::OverloadError => ( + "OVERLOAD_ERROR", + Outcome::OverloadError, + Some(desc), + 0, + 0, + 1, + ), + Status::PollError => ("POLL_ERROR", Outcome::PollError, Some(desc), 0, 0, 1), } } }; @@ -780,11 +853,23 @@ fn log_stream_chunk( scuba.sampled(sampling_rate); } scuba.log_with_msg("Request stream chunk", None); + + #[cfg(fbcode_build)] + SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest { + method: Some(method.to_string()), + repo, + request_type: Some(SCSRequestType::Stream), + event: Some(SCSRequestEvent::StreamChunk), + outcome: Some(outcome), + requests: Some(1.0), + ..Default::default() + }); } fn log_stream_complete( ctx: CoreContext, method: &str, + repo: Option, initial_future_stats: &FutureStats, stream_stats: &TryStreamStats, mb_status_and_description: Option<(Status, String)>, @@ -794,15 +879,31 @@ fn log_stream_complete( add_request_end_memory_stats(&mut scuba, method, start_mem_stats); - let (status, error, invalid_request, internal_failure, overloaded) = + let (status, outcome, error, invalid_request, internal_failure, overloaded) = match mb_status_and_description { Some((status, desc)) => match status { - Status::RequestError => ("REQUEST_ERROR", Some(desc), 1, 0, 0), - Status::InternalError => ("INTERNAL_ERROR", Some(desc), 0, 1, 0), - Status::OverloadError => ("OVERLOAD_ERROR", Some(desc), 0, 0, 1), - Status::PollError => ("POLL_ERROR", Some(desc), 0, 1, 0), + Status::RequestError => { + ("REQUEST_ERROR", Outcome::RequestError, Some(desc), 1, 0, 0) + } + Status::InternalError => ( + "INTERNAL_ERROR", + Outcome::InternalError, + Some(desc), + 0, + 1, + 0, + ), + Status::OverloadError => ( + "OVERLOAD_ERROR", + Outcome::OverloadError, + Some(desc), + 0, + 0, + 1, + ), + Status::PollError => ("POLL_ERROR", Outcome::PollError, Some(desc), 0, 1, 0), }, - None => ("SUCCESS", None, 0, 0, 0), + None => ("SUCCESS", Outcome::Success, None, 0, 0, 0), }; if let Ok(true) = justknobs::eval("scm/mononoke:scs_alert_on_methods", None, Some(method)) { @@ -868,11 +969,24 @@ fn log_stream_complete( scuba.add("error", error.as_str()); } scuba.log_with_msg("Request complete", None); + + #[cfg(fbcode_build)] + SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest { + method: Some(method.to_string()), + repo, + request_type: Some(SCSRequestType::Stream), + event: Some(SCSRequestEvent::StreamComplete), + outcome: Some(outcome), + requests: Some(1.0), + duration_ms: Some(initial_future_stats.completion_time.as_millis_unchecked() as f64), + ..Default::default() + }); } fn log_cancelled( ctx: &CoreContext, method: &str, + repo: Option, stats: &FutureStats, start_mem_stats: Option<&MemoryStats>, ) { @@ -887,6 +1001,16 @@ fn log_cancelled( scuba.add_future_stats(stats); scuba.add("status", "CANCELLED"); scuba.log_with_msg("Request cancelled", None); + + #[cfg(fbcode_build)] + SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest { + method: Some(method.to_string()), + repo, + request_type: Some(SCSRequestType::Stream), + event: Some(SCSRequestEvent::Cancelled), + requests: Some(1.0), + ..Default::default() + }); } fn check_memory_usage( @@ -992,7 +1116,7 @@ macro_rules! impl_thrift_methods { let fut = async move { let svc = self.0.clone(); let watchdog_max_poll = self.0.watchdog_max_poll; - let (ctx, session_uuid) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?; + let (ctx, session_uuid, repo_name) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?; let handler = { cloned!(ctx); async move { @@ -1007,9 +1131,9 @@ macro_rules! impl_thrift_methods { .with_max_poll(watchdog_max_poll).await } .timed() - .on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), &stats, start_mem_stats.as_ref())) + .on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), repo_name.clone(), &stats, start_mem_stats.as_ref())) .await; - log_result(ctx, "Request complete", stringify!($method_name), &stats, &res, start_mem_stats.as_ref()); + log_result(ctx, "Request complete", SCSRequestType::Normal, SCSRequestEvent::Complete, stringify!($method_name), repo_name, &stats, &res, start_mem_stats.as_ref()); res.map_err(Into::into) } }; @@ -1045,9 +1169,9 @@ macro_rules! impl_thrift_stream_methods { { let fut = async move { let svc = self.0.clone(); - let (ctx, session_uuid) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?; + let (ctx, session_uuid, repo_name) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?; let handler = { - cloned!(ctx); + cloned!(ctx, repo_name); async move { let start_mem_stats = log_start(&ctx, stringify!($method_name)); STATS::total_request_start.add_value(1); @@ -1060,31 +1184,30 @@ macro_rules! impl_thrift_stream_methods { .with_max_poll(50).await } .timed() - .on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), &stats, start_mem_stats.as_ref())) + .on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), repo_name.clone(), &stats, start_mem_stats.as_ref())) .await; if res.is_ok() { - log_result(ctx.clone(), "Request stream started", stringify!($method_name), &stats, &res, start_mem_stats.as_ref()); - } - else { - log_result(ctx.clone(), "Request complete", stringify!($method_name), &stats, &res, start_mem_stats.as_ref()); + log_result(ctx.clone(), "Request stream started", SCSRequestType::Stream, SCSRequestEvent::StreamStarted, stringify!($method_name), repo_name.clone(), &stats, &res, start_mem_stats.as_ref()); + } else { + log_result(ctx.clone(), "Request complete", SCSRequestType::Stream, SCSRequestEvent::Complete, stringify!($method_name), repo_name.clone(), &stats, &res, start_mem_stats.as_ref()); } let first_error = Arc::new(OnceLock::new()); let chunk_counter = AtomicU64::new(0); res.map_err(Into::into).map(move |(res, stream)| { let stream = stream.inspect({ - cloned!(ctx, first_error); + cloned!(ctx, repo_name, first_error); move |res| { let count = chunk_counter.fetch_add(1, Ordering::Relaxed); - log_stream_chunk(ctx.clone(), stringify!($method_name), &res, count); + log_stream_chunk(ctx.clone(), stringify!($method_name), repo_name.clone(), &res, count); if let Err(err) = res { let (status, desc) = err.status_and_description(); let _ = first_error.set((status, desc)); } } }).boxed().try_timed({ - cloned!(ctx, first_error); + cloned!(ctx, repo_name, first_error); move |stream_stats| { - log_stream_complete(ctx, stringify!($method_name), &stats, &stream_stats, first_error.get().cloned() ,start_mem_stats.as_ref()); + log_stream_complete(ctx, stringify!($method_name), repo_name, &stats, &stream_stats, first_error.get().cloned() ,start_mem_stats.as_ref()); } }).map_err(Into::into).boxed(); (res, stream)