Skip to content

Commit

Permalink
feat: check metric values before starting scenario (and a bit of refa…
Browse files Browse the repository at this point in the history
…ctoring)

Basically made a new pattern of `Opts -> ScenarioState` so state can collect metrics and verify behavior
  • Loading branch information
dav1do committed Jan 17, 2024
1 parent 49a672e commit efdecdf
Showing 1 changed file with 111 additions and 59 deletions.
170 changes: 111 additions & 59 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{

// FIXME: is it worth attaching metrics to the peer info?
const IPFS_SERVICE_METRICS_PORT: u32 = 9465;
const EVENT_SYNC_METRIC_NAME: &str = "recon_key_insert_count_total";

/// Options to Simulate command
#[derive(Args, Debug)]
Expand Down Expand Up @@ -139,13 +140,16 @@ pub struct ScenarioState {
pub manager: bool,
pub scenario: Scenario,
pub target_request_rate: Option<usize>,
before_metrics: Option<Vec<u64>>,
run_time: String,
throttle_requests: Option<usize>,
}

impl ScenarioState {
async fn new_from_opts(opts: &Opts) -> Result<Self> {
async fn try_from_opts(opts: Opts) -> Result<Self> {
// We assume exactly one worker per peer.
// This allows us to be deterministic in how each user operates.
tracing::warn!(?opts, "opts");
tracing::debug!(?opts, "building state from opts");
let peers: Vec<Peer> = parse_peers_info(opts.peers.clone())
.await?
.into_iter()
Expand All @@ -166,9 +170,28 @@ impl ScenarioState {
manager: opts.manager,
scenario: opts.scenario,
target_request_rate: opts.target_request_rate,
before_metrics: None,
run_time: opts.run_time,
throttle_requests: opts.throttle_requests,
})
}

async fn build_goose_scenario(&mut self) -> Result<goose::prelude::Scenario> {
let scenario = match self.scenario {
Scenario::IpfsRpc => ipfs_block_fetch::scenario(self.topo)?,
Scenario::CeramicSimple => ceramic::simple::scenario().await?,
Scenario::CeramicWriteOnly => ceramic::write_only::scenario().await?,
Scenario::CeramicNewStreams => ceramic::new_streams::scenario().await?,
Scenario::CeramicQuery => ceramic::query::scenario().await?,
Scenario::CeramicModelReuse => ceramic::model_reuse::scenario().await?,
Scenario::EventIdSync => {
ceramic::event_id_sync::event_id_sync_scenario(self.ipfs_peer_addr()).await?
}
};
self.collect_before_metrics().await?;
Ok(scenario)
}

fn target_peer_addr(&self) -> Result<String> {
self.scenario.target_addr(
self.peers
Expand Down Expand Up @@ -233,6 +256,36 @@ impl ScenarioState {
Ok(results)
}

async fn collect_before_metrics(&mut self) -> Result<()> {
if !self.manager {
Ok(())
} else {
match self.scenario {
Scenario::IpfsRpc
| Scenario::CeramicSimple
| Scenario::CeramicWriteOnly
| Scenario::CeramicNewStreams
| Scenario::CeramicQuery
| Scenario::CeramicModelReuse => Ok(()),
Scenario::EventIdSync => {
let peers = self
.get_peers_counter_metric(EVENT_SYNC_METRIC_NAME, IPFS_SERVICE_METRICS_PORT)
.await?;
let res: Vec<u64> = peers.iter().filter_map(|v| *v).collect();
if res.len() != peers.len() {
bail!(
"Failed to collect metrics for all peers before scenario {:?}: {:?}",
self.scenario,
peers
)
}
self.before_metrics = Some(res);
Ok(())
}
}
}
}

/// For now, most scenarios are successful if they complete without error and only EventIdSync has a criteria.
/// Not a result to ensure we always proceed with cleanup, even if we fail to validate the scenario.
async fn validate_scenario_success(&self, metrics: &GooseMetrics) -> CommandResult {
Expand All @@ -254,7 +307,7 @@ impl ScenarioState {
// trait, but we don't yet have a use case, and might need to use transactions, or multiple requests, or something
// entirely different. Anyway, to avoid generalizing the exception we keep it simple.
let default_rate = 300;
let metric_name = "recon_key_insert_count_total";
let metric_name = EVENT_SYNC_METRIC_NAME;
let req_name = ceramic::event_id_sync::CREATE_EVENT_REQ_NAME;

let peer_req_cnts = match self
Expand All @@ -280,15 +333,29 @@ impl ScenarioState {
let run_time_seconds = metrics.duration;
let create_rps = metric.success_count as f64 / run_time_seconds as f64;

let before_metrics = match self
.before_metrics
.as_ref()
.ok_or_else(|| {
anyhow!(
"failed to get before metrics for scenario {}",
self.scenario.name()
)
})
.map_err(CommandResult::Failure)
{
Ok(v) => v,
Err(e) => return e,
};

// For now, assume writer and all peers must meet the threshold rate
let mut errors = peer_req_cnts
.into_iter()
let mut errors = peer_req_cnts.into_iter().zip(before_metrics.into_iter())
.enumerate()
.flat_map(|(idx, c)| {
if let Some(c) = c {
let rps = c as f64 / run_time_seconds as f64;
.flat_map(|(idx, (current, before))| {
if let Some(c) = current {
let rps = (c - *before) as f64 / run_time_seconds as f64;
if rps < threshold {
warn!(?c, ?run_time_seconds, ?threshold, %rps, "rps less than threshold");
warn!(current=%c, %before, %run_time_seconds, %threshold, %rps, "rps less than threshold");
Some(format!(
"Peer {} RPS less than threshold: {} < {}",
idx, rps, threshold
Expand Down Expand Up @@ -330,31 +397,47 @@ impl ScenarioState {
}
}
}

fn goose_config(&self) -> Result<GooseConfiguration> {
let config = if self.manager {
let mut config = GooseConfiguration::default();
config.log_level = 2;
config.users = Some(self.topo.users);
config.manager = true;
config.manager_bind_port = 5115;
config.expect_workers = Some(self.topo.total_workers);
config.startup_time = "10s".to_owned();
config.run_time = self.run_time.clone();
config
} else {
let mut config = GooseConfiguration::default();
config.scenario_log = "scenario.log".to_owned();
config.transaction_log = "transaction.log".to_owned();
config.request_log = "request.log".to_owned();
config.error_log = "error.log".to_owned();
config.log_level = 2;
config.worker = true;
config.host = self.target_peer_addr()?;
// We are leveraging k8s dns search path so we do not have to specify the fully qualified
// domain name explicitly.
config.manager_host = "manager.goose".to_owned();
config.manager_port = 5115;
if let Some(throttle_requests) = self.throttle_requests {
config.throttle_requests = throttle_requests
}
config
};
Ok(config)
}
}

#[tracing::instrument]
pub async fn simulate(opts: Opts) -> Result<CommandResult> {
let mut metrics = Metrics::init(&opts)?;

let state = ScenarioState::new_from_opts(&opts).await?;

let scenario = match state.scenario {
Scenario::IpfsRpc => ipfs_block_fetch::scenario(state.topo)?,
Scenario::CeramicSimple => ceramic::simple::scenario().await?,
Scenario::CeramicWriteOnly => ceramic::write_only::scenario().await?,
Scenario::CeramicNewStreams => ceramic::new_streams::scenario().await?,
Scenario::CeramicQuery => ceramic::query::scenario().await?,
Scenario::CeramicModelReuse => ceramic::model_reuse::scenario().await?,
Scenario::EventIdSync => {
ceramic::event_id_sync::event_id_sync_scenario(state.ipfs_peer_addr()).await?
}
};

let config = if opts.manager {
manager_config(&state.topo, opts.run_time)
} else {
worker_config(state.target_peer_addr()?, opts.throttle_requests)
};
let mut state = ScenarioState::try_from_opts(opts).await?;
let scenario = state.build_goose_scenario().await?;
let config: GooseConfiguration = state.goose_config()?;

let goose_metrics = match GooseAttack::initialize_with_config(config)?
.register_scenario(scenario)
Expand All @@ -374,37 +457,6 @@ pub async fn simulate(opts: Opts) -> Result<CommandResult> {
Ok(success)
}

fn manager_config(topo: &Topology, run_time: String) -> GooseConfiguration {
tracing::warn!("manager config: {:#?}", topo);
let mut config = GooseConfiguration::default();
config.log_level = 2;
config.users = Some(topo.users);
config.manager = true;
config.manager_bind_port = 5115;
config.expect_workers = Some(topo.total_workers);
config.startup_time = "10s".to_owned();
config.run_time = run_time;
config
}
fn worker_config(target_peer_addr: String, throttle_requests: Option<usize>) -> GooseConfiguration {
let mut config = GooseConfiguration::default();
config.scenario_log = "scenario.log".to_owned();
config.transaction_log = "transaction.log".to_owned();
config.request_log = "request.log".to_owned();
config.error_log = "error.log".to_owned();
config.log_level = 2;
config.worker = true;
config.host = target_peer_addr;
// We are leveraging k8s dns search path so we do not have to specify the fully qualified
// domain name explicitly.
config.manager_host = "manager.goose".to_owned();
config.manager_port = 5115;
if let Some(throttle_requests) = throttle_requests {
config.throttle_requests = throttle_requests
}
config
}

struct Metrics {
inner: Arc<Mutex<MetricsInner>>,
}
Expand Down

0 comments on commit efdecdf

Please sign in to comment.