Skip to content

Commit

Permalink
Log SCS request stats to ODS3
Browse files Browse the repository at this point in the history
Summary: This is a simple, direct translation of the same stats we had in ODS. With ODS3 we should be able to add more, possibly most of what we have in Scuba; but that will come in a later diff.

Reviewed By: clara-9

Differential Revision: D68956752

fbshipit-source-id: 9f4ed45776368eba9820d7ba09ec39044ec09ef4
  • Loading branch information
andreacampi authored and facebook-github-bot committed Feb 3, 2025
1 parent 27610e4 commit c804815
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 34 deletions.
1 change: 1 addition & 0 deletions eden/mononoke/scs/scs_methods/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ gix-hash = "0.15.1"
hooks = { version = "0.1.0", path = "../../hooks" }
itertools = "0.14.0"
justknobs = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" }
lazy_static = "1.4"
maplit = "1.0"
megarepo_api = { version = "0.1.0", path = "../../megarepo_api" }
megarepo_config = { version = "0.1.0", path = "../../megarepo_api/megarepo_config" }
Expand Down
191 changes: 157 additions & 34 deletions eden/mononoke/scs/scs_methods/src/source_control_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use futures_stats::TimedTryStreamExt;
use futures_stats::TryStreamStats;
use futures_watchdog::WatchdogExt;
use identity::Identity;
#[cfg(fbcode_build)]
use lazy_static::lazy_static;
use login_objects_thrift::EnvironmentType;
use megarepo_api::MegarepoApi;
use memory::MemoryStats;
Expand Down Expand Up @@ -78,6 +80,16 @@ use source_control_services::SourceControlService;
use srserver::RequestContext;
use stats::prelude::*;
use time_ext::DurationExt;
#[cfg(fbcode_build)]
use MononokeScsRequest_ods3::Instrument_MononokeScsRequest;
#[cfg(fbcode_build)]
use MononokeScsRequest_ods3_types::MononokeScsRequest;
#[cfg(fbcode_build)]
use MononokeScsRequest_ods3_types::SCSRequestEvent;
#[cfg(fbcode_build)]
use MononokeScsRequest_ods3_types::SCSRequestOutcome as Outcome;
#[cfg(fbcode_build)]
use MononokeScsRequest_ods3_types::SCSRequestType;

use crate::from_request::FromRequest;
use crate::scuba_params::AddScubaParams;
Expand All @@ -92,6 +104,12 @@ const FORWARDED_OTHER_CATS_HEADER: &str = "scm_forwarded_other_cats";
const PER_REQUEST_READ_QPS: usize = 4000;
const PER_REQUEST_WRITE_QPS: usize = 4000;

#[cfg(fbcode_build)]
lazy_static! {
static ref SCS_REQUEST_STATS_INSTRUMENT: Instrument_MononokeScsRequest =
Instrument_MononokeScsRequest::new();
}

define_stats! {
prefix = "mononoke.scs_server";
total_request_start: timeseries(Rate, Sum),
Expand Down Expand Up @@ -186,7 +204,7 @@ impl SourceControlServiceImpl {
req_ctxt: &RequestContext,
specifier: Option<&dyn SpecifierExt>,
params: &dyn AddScubaParams,
) -> Result<(CoreContext, String), scs_errors::ServiceError> {
) -> Result<(CoreContext, String, Option<String>), scs_errors::ServiceError> {
let session = self.create_session(req_ctxt).await?;
let identities = session.metadata().identities();
let mut scuba = self.create_scuba(name, req_ctxt, specifier, params, identities)?;
Expand All @@ -197,7 +215,13 @@ impl SourceControlServiceImpl {
scuba.add("session_uuid", session_uuid.clone());

let ctx = session.new_context_with_scribe(self.logger.clone(), scuba, self.scribe.clone());
Ok((ctx, session_uuid))

let repo_name = if let Some(specifier) = specifier {
specifier.scuba_reponame()
} else {
None
};
Ok((ctx, session_uuid, repo_name))
}

/// Create and configure a scuba sample builder for a request.
Expand Down Expand Up @@ -669,7 +693,10 @@ fn add_request_end_memory_stats(
fn log_result<T: AddScubaResponse>(
ctx: CoreContext,
tag: &'static str,
request_type: SCSRequestType,
event: SCSRequestEvent,
method: &str,
repo: Option<String>,
stats: &FutureStats,
result: &Result<T, impl scs_errors::LoggableError>,
start_mem_stats: Option<&MemoryStats>,
Expand All @@ -678,21 +705,38 @@ fn log_result<T: AddScubaResponse>(

add_request_end_memory_stats(&mut scuba, method, start_mem_stats);

let (status, error, invalid_request, internal_failure, overloaded) = match result {
let (status, outcome, error, invalid_request, internal_failure, overloaded) = match result {
Ok(response) => {
response.add_scuba_response(&mut scuba);
("SUCCESS", None, 0, 0, 0)
("SUCCESS", Outcome::Success, None, 0, 0, 0)
}
Err(err) => {
let (status, desc) = err.status_and_description();
match status {
Status::RequestError => ("REQUEST_ERROR", Some(desc), 1, 0, 0),
Status::InternalError => ("INTERNAL_ERROR", Some(desc), 0, 1, 0),
Status::OverloadError => ("OVERLOAD_ERROR", Some(desc), 0, 0, 1),
Status::PollError => ("POLL_ERROR", Some(desc), 0, 1, 0),
Status::RequestError => {
("REQUEST_ERROR", Outcome::RequestError, Some(desc), 1, 0, 0)
}
Status::InternalError => (
"INTERNAL_ERROR",
Outcome::InternalError,
Some(desc),
0,
1,
0,
),
Status::OverloadError => (
"OVERLOAD_ERROR",
Outcome::OverloadError,
Some(desc),
0,
0,
1,
),
Status::PollError => ("POLL_ERROR", Outcome::PollError, Some(desc), 0, 1, 0),
}
}
};

if let Ok(true) = justknobs::eval("scm/mononoke:scs_alert_on_methods", None, Some(method)) {
STATS::total_method_requests.add_value(1, (method.to_string(),));
if status == "INTERNAL_ERROR" {
Expand Down Expand Up @@ -728,28 +772,57 @@ fn log_result<T: AddScubaResponse>(
scuba.add("error", error.as_str());
}
scuba.log_with_msg(tag, None);

#[cfg(fbcode_build)]
SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest {
method: Some(method.to_string()),
repo,
request_type: Some(request_type),
event: Some(event),
outcome: Some(outcome),
requests: Some(1.0),
duration_ms: Some(stats.completion_time.as_millis_unchecked() as f64),
..Default::default()
});
}

fn log_stream_chunk<T: AddScubaResponse>(
ctx: CoreContext,
method: &str,
repo: Option<String>,
result: &Result<T, impl scs_errors::LoggableError>,
count: u64,
) {
let mut scuba = ctx.scuba().clone();

let (status, error, invalid_request, internal_failure, overloaded) = match result {
let (status, outcome, error, invalid_request, internal_failure, overloaded) = match result {
Ok(response) => {
response.add_scuba_response(&mut scuba);
("SUCCESS", None, 0, 0, 0)
("SUCCESS", Outcome::Success, None, 0, 0, 0)
}
Err(err) => {
let (status, desc) = err.status_and_description();
match status {
Status::RequestError => ("REQUEST_ERROR", Some(desc), 1, 0, 0),
Status::InternalError => ("INTERNAL_ERROR", Some(desc), 0, 1, 0),
Status::OverloadError => ("OVERLOAD_ERROR", Some(desc), 0, 0, 1),
Status::PollError => ("POLL_ERROR", Some(desc), 0, 0, 1),
Status::RequestError => {
("REQUEST_ERROR", Outcome::RequestError, Some(desc), 1, 0, 0)
}
Status::InternalError => (
"INTERNAL_ERROR",
Outcome::InternalError,
Some(desc),
0,
1,
0,
),
Status::OverloadError => (
"OVERLOAD_ERROR",
Outcome::OverloadError,
Some(desc),
0,
0,
1,
),
Status::PollError => ("POLL_ERROR", Outcome::PollError, Some(desc), 0, 0, 1),
}
}
};
Expand Down Expand Up @@ -780,11 +853,23 @@ fn log_stream_chunk<T: AddScubaResponse>(
scuba.sampled(sampling_rate);
}
scuba.log_with_msg("Request stream chunk", None);

#[cfg(fbcode_build)]
SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest {
method: Some(method.to_string()),
repo,
request_type: Some(SCSRequestType::Stream),
event: Some(SCSRequestEvent::StreamChunk),
outcome: Some(outcome),
requests: Some(1.0),
..Default::default()
});
}

fn log_stream_complete(
ctx: CoreContext,
method: &str,
repo: Option<String>,
initial_future_stats: &FutureStats,
stream_stats: &TryStreamStats,
mb_status_and_description: Option<(Status, String)>,
Expand All @@ -794,15 +879,31 @@ fn log_stream_complete(

add_request_end_memory_stats(&mut scuba, method, start_mem_stats);

let (status, error, invalid_request, internal_failure, overloaded) =
let (status, outcome, error, invalid_request, internal_failure, overloaded) =
match mb_status_and_description {
Some((status, desc)) => match status {
Status::RequestError => ("REQUEST_ERROR", Some(desc), 1, 0, 0),
Status::InternalError => ("INTERNAL_ERROR", Some(desc), 0, 1, 0),
Status::OverloadError => ("OVERLOAD_ERROR", Some(desc), 0, 0, 1),
Status::PollError => ("POLL_ERROR", Some(desc), 0, 1, 0),
Status::RequestError => {
("REQUEST_ERROR", Outcome::RequestError, Some(desc), 1, 0, 0)
}
Status::InternalError => (
"INTERNAL_ERROR",
Outcome::InternalError,
Some(desc),
0,
1,
0,
),
Status::OverloadError => (
"OVERLOAD_ERROR",
Outcome::OverloadError,
Some(desc),
0,
0,
1,
),
Status::PollError => ("POLL_ERROR", Outcome::PollError, Some(desc), 0, 1, 0),
},
None => ("SUCCESS", None, 0, 0, 0),
None => ("SUCCESS", Outcome::Success, None, 0, 0, 0),
};

if let Ok(true) = justknobs::eval("scm/mononoke:scs_alert_on_methods", None, Some(method)) {
Expand Down Expand Up @@ -868,11 +969,24 @@ fn log_stream_complete(
scuba.add("error", error.as_str());
}
scuba.log_with_msg("Request complete", None);

#[cfg(fbcode_build)]
SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest {
method: Some(method.to_string()),
repo,
request_type: Some(SCSRequestType::Stream),
event: Some(SCSRequestEvent::StreamComplete),
outcome: Some(outcome),
requests: Some(1.0),
duration_ms: Some(initial_future_stats.completion_time.as_millis_unchecked() as f64),
..Default::default()
});
}

fn log_cancelled(
ctx: &CoreContext,
method: &str,
repo: Option<String>,
stats: &FutureStats,
start_mem_stats: Option<&MemoryStats>,
) {
Expand All @@ -887,6 +1001,16 @@ fn log_cancelled(
scuba.add_future_stats(stats);
scuba.add("status", "CANCELLED");
scuba.log_with_msg("Request cancelled", None);

#[cfg(fbcode_build)]
SCS_REQUEST_STATS_INSTRUMENT.observe(MononokeScsRequest {
method: Some(method.to_string()),
repo,
request_type: Some(SCSRequestType::Stream),
event: Some(SCSRequestEvent::Cancelled),
requests: Some(1.0),
..Default::default()
});
}

fn check_memory_usage(
Expand Down Expand Up @@ -992,7 +1116,7 @@ macro_rules! impl_thrift_methods {
let fut = async move {
let svc = self.0.clone();
let watchdog_max_poll = self.0.watchdog_max_poll;
let (ctx, session_uuid) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?;
let (ctx, session_uuid, repo_name) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?;
let handler = {
cloned!(ctx);
async move {
Expand All @@ -1007,9 +1131,9 @@ macro_rules! impl_thrift_methods {
.with_max_poll(watchdog_max_poll).await
}
.timed()
.on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), &stats, start_mem_stats.as_ref()))
.on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), repo_name.clone(), &stats, start_mem_stats.as_ref()))
.await;
log_result(ctx, "Request complete", stringify!($method_name), &stats, &res, start_mem_stats.as_ref());
log_result(ctx, "Request complete", SCSRequestType::Normal, SCSRequestEvent::Complete, stringify!($method_name), repo_name, &stats, &res, start_mem_stats.as_ref());
res.map_err(Into::into)
}
};
Expand Down Expand Up @@ -1045,9 +1169,9 @@ macro_rules! impl_thrift_stream_methods {
{
let fut = async move {
let svc = self.0.clone();
let (ctx, session_uuid) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?;
let (ctx, session_uuid, repo_name) = create_ctx!(svc, $method_name, req_ctxt, $( $param_name ),*).await?;
let handler = {
cloned!(ctx);
cloned!(ctx, repo_name);
async move {
let start_mem_stats = log_start(&ctx, stringify!($method_name));
STATS::total_request_start.add_value(1);
Expand All @@ -1060,31 +1184,30 @@ macro_rules! impl_thrift_stream_methods {
.with_max_poll(50).await
}
.timed()
.on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), &stats, start_mem_stats.as_ref()))
.on_cancel_with_data(|stats| log_cancelled(&ctx, stringify!($method_name), repo_name.clone(), &stats, start_mem_stats.as_ref()))
.await;
if res.is_ok() {
log_result(ctx.clone(), "Request stream started", stringify!($method_name), &stats, &res, start_mem_stats.as_ref());
}
else {
log_result(ctx.clone(), "Request complete", stringify!($method_name), &stats, &res, start_mem_stats.as_ref());
log_result(ctx.clone(), "Request stream started", SCSRequestType::Stream, SCSRequestEvent::StreamStarted, stringify!($method_name), repo_name.clone(), &stats, &res, start_mem_stats.as_ref());
} else {
log_result(ctx.clone(), "Request complete", SCSRequestType::Stream, SCSRequestEvent::Complete, stringify!($method_name), repo_name.clone(), &stats, &res, start_mem_stats.as_ref());
}
let first_error = Arc::new(OnceLock::new());
let chunk_counter = AtomicU64::new(0);
res.map_err(Into::into).map(move |(res, stream)| {
let stream = stream.inspect({
cloned!(ctx, first_error);
cloned!(ctx, repo_name, first_error);
move |res| {
let count = chunk_counter.fetch_add(1, Ordering::Relaxed);
log_stream_chunk(ctx.clone(), stringify!($method_name), &res, count);
log_stream_chunk(ctx.clone(), stringify!($method_name), repo_name.clone(), &res, count);
if let Err(err) = res {
let (status, desc) = err.status_and_description();
let _ = first_error.set((status, desc));
}
}
}).boxed().try_timed({
cloned!(ctx, first_error);
cloned!(ctx, repo_name, first_error);
move |stream_stats| {
log_stream_complete(ctx, stringify!($method_name), &stats, &stream_stats, first_error.get().cloned() ,start_mem_stats.as_ref());
log_stream_complete(ctx, stringify!($method_name), repo_name, &stats, &stream_stats, first_error.get().cloned() ,start_mem_stats.as_ref());
}
}).map_err(Into::into).boxed();
(res, stream)
Expand Down

0 comments on commit c804815

Please sign in to comment.