diff --git a/runner/src/scenario/ceramic/model_reuse.rs b/runner/src/scenario/ceramic/model_reuse.rs index 34b0f935..300bb27f 100644 --- a/runner/src/scenario/ceramic/model_reuse.rs +++ b/runner/src/scenario/ceramic/model_reuse.rs @@ -37,8 +37,6 @@ pub async fn scenario() -> Result { let get_instance_tx = transaction!(get_instance).set_name("get_instance"); Ok(scenario!("CeramicModelReuseScenario") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? .register_transaction(test_start) .register_transaction(create_instance_tx) .register_transaction(get_instance_tx)) diff --git a/runner/src/scenario/ceramic/new_streams.rs b/runner/src/scenario/ceramic/new_streams.rs index c25588df..ac7c7e81 100644 --- a/runner/src/scenario/ceramic/new_streams.rs +++ b/runner/src/scenario/ceramic/new_streams.rs @@ -1,7 +1,7 @@ use crate::goose_try; use ceramic_http_client::CeramicHttpClient; use goose::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use crate::scenario::ceramic::util::goose_error; use crate::scenario::ceramic::{ @@ -27,7 +27,6 @@ pub async fn scenario() -> Result { transaction!(instantiate_large_model).set_name("instantiate_large_model"); Ok(scenario!("CeramicNewStreams") - .set_wait_time(Duration::from_millis(10), Duration::from_millis(100))? .register_transaction(test_start) .register_transaction(instantiate_small_model) .register_transaction(instantiate_large_model)) diff --git a/runner/src/scenario/ceramic/query.rs b/runner/src/scenario/ceramic/query.rs index 4e1f1eda..586b2d3a 100644 --- a/runner/src/scenario/ceramic/query.rs +++ b/runner/src/scenario/ceramic/query.rs @@ -9,7 +9,7 @@ use ceramic_http_client::{ }; use goose::prelude::*; use std::collections::HashMap; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tracing::instrument; #[derive(Clone)] @@ -64,8 +64,6 @@ pub async fn scenario() -> Result { transaction!(query_models_post_update).set_name("post_update_query_models"); Ok(scenario!("CeramicQueryScenario") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? .register_transaction(test_start) .register_transaction(pre_query_models) .register_transaction(update_models) diff --git a/runner/src/scenario/ceramic/simple.rs b/runner/src/scenario/ceramic/simple.rs index 343c87bb..f057485f 100644 --- a/runner/src/scenario/ceramic/simple.rs +++ b/runner/src/scenario/ceramic/simple.rs @@ -9,7 +9,7 @@ use ceramic_http_client::ceramic_event::StreamId; use ceramic_http_client::CeramicHttpClient; use ceramic_http_client::{ModelAccountRelation, ModelDefinition}; use goose::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tracing::instrument; pub(crate) struct LoadTestUserData { @@ -72,8 +72,6 @@ pub async fn scenario() -> Result { let get_large_model = transaction!(get_large_model).set_name("get_large_model"); Ok(scenario!("CeramicSimpleScenario") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? .register_transaction(test_start) .register_transaction(update_small_model) .register_transaction(get_small_model) diff --git a/runner/src/scenario/ceramic/write_only.rs b/runner/src/scenario/ceramic/write_only.rs index 6f429b74..c9ee37dc 100644 --- a/runner/src/scenario/ceramic/write_only.rs +++ b/runner/src/scenario/ceramic/write_only.rs @@ -1,6 +1,6 @@ use ceramic_http_client::CeramicHttpClient; use goose::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use crate::scenario::ceramic::simple::{setup, update_large_model, update_small_model}; use crate::scenario::ceramic::util::goose_error; @@ -23,7 +23,6 @@ pub async fn scenario() -> Result { let update_large_model = transaction!(update_large_model).set_name("update_large_model"); Ok(scenario!("CeramicWriteOnly") - .set_wait_time(Duration::from_millis(9000), Duration::from_millis(11000))? .register_transaction(setup) .register_transaction(update_small_model) .register_transaction(update_large_model)) diff --git a/runner/src/scenario/ipfs_block_fetch.rs b/runner/src/scenario/ipfs_block_fetch.rs index fbc6fdb2..4d57e397 100644 --- a/runner/src/scenario/ipfs_block_fetch.rs +++ b/runner/src/scenario/ipfs_block_fetch.rs @@ -27,8 +27,6 @@ pub fn scenario(topo: Topology) -> Result { .set_on_stop(); Ok(scenario!("IpfsRpc") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? // This transaction only runs one time when the user first starts. .register_transaction(put) // These next two transactions run repeatedly as long as the load test is running. diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index add9e7fb..bbe44566 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, Mutex}, }; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use clap::{Args, ValueEnum}; use goose::{config::GooseConfiguration, prelude::GooseMetrics, GooseAttack}; use keramik_common::peer_info::Peer; @@ -35,8 +35,14 @@ pub struct Opts { #[arg(long, env = "SIMULATE_PEERS_PATH")] peers: PathBuf, - /// Number of users to simulate - #[arg(long, default_value_t = 100, env = "SIMULATE_USERS")] + /// Number of users to simulate on each node. The total number of users + /// running the test scenario will be this value * N nodes. + /// + /// Implmentation details: A user corresponds to a tokio task responsible + /// for making requests. They should have low memory overhead, so you can + /// create many users and then use `throttle_requests` to constrain the overall + /// throughput on the node (specifically the HTTP requests made). + #[arg(long, default_value_t = 4, env = "SIMULATE_USERS")] users: usize, /// Duration of the simulation @@ -57,6 +63,7 @@ pub struct Opts { pub struct Topology { pub target_worker: usize, pub total_workers: usize, + pub users: usize, pub nonce: u64, } @@ -126,23 +133,34 @@ impl Scenario { pub async fn simulate(opts: Opts) -> Result<()> { let mut metrics = Metrics::init(&opts)?; - let peers: Vec = parse_peers_info(opts.peers) + let peers: Vec = parse_peers_info(&opts.peers) .await? .into_iter() .filter(|peer| matches!(peer, Peer::Ceramic(_))) .collect(); - if opts.manager && opts.users % peers.len() != 0 { - bail!("number of users {} must be a multiple of the number of peers {}, this ensures we can deterministically identifiy each user", opts.users, peers.len()) - } - // We assume exactly one worker per peer. - // This allows us to be deterministic in how each user operates. + // use user value as number of users per worker, rather than total users that must be evenly divided across all workers let topo = Topology { target_worker: opts.target_peer, total_workers: peers.len(), + users: opts.users * peers.len(), nonce: opts.nonce, }; + let config = if opts.manager { + manager_config(&topo, opts.run_time) + } else { + worker_config( + opts.scenario.target_addr( + peers + .get(opts.target_peer) + .ok_or_else(|| anyhow!("target peer too large, not enough peers"))?, + )?, + opts.throttle_requests + .or_else(|| opts.scenario.throttle_requests()), + ) + }; + let scenario = match opts.scenario { Scenario::IpfsRpc => ipfs_block_fetch::scenario(topo)?, Scenario::CeramicSimple => ceramic::simple::scenario().await?, @@ -159,19 +177,6 @@ pub async fn simulate(opts: Opts) -> Result<()> { .await? } }; - let config = if opts.manager { - manager_config(peers.len(), opts.users, opts.run_time) - } else { - worker_config( - opts.scenario.target_addr( - peers - .get(opts.target_peer) - .ok_or_else(|| anyhow!("target peer too large, not enough peers"))?, - )?, - opts.throttle_requests - .or_else(|| opts.scenario.throttle_requests()), - ) - }; let goose_metrics = match GooseAttack::initialize_with_config(config)? .register_scenario(scenario) @@ -190,13 +195,13 @@ pub async fn simulate(opts: Opts) -> Result<()> { Ok(()) } -fn manager_config(count: usize, users: usize, run_time: String) -> GooseConfiguration { +fn manager_config(topo: &Topology, run_time: String) -> GooseConfiguration { let mut config = GooseConfiguration::default(); config.log_level = 2; - config.users = Some(users); + config.users = Some(topo.users); config.manager = true; config.manager_bind_port = 5115; - config.expect_workers = Some(count); + config.expect_workers = Some(topo.total_workers); config.startup_time = "10s".to_owned(); config.run_time = run_time; config @@ -206,6 +211,7 @@ fn worker_config(target_peer_addr: String, throttle_requests: Option) -> config.scenario_log = "scenario.log".to_owned(); config.transaction_log = "transaction.log".to_owned(); config.request_log = "request.log".to_owned(); + config.error_log = "error.log".to_owned(); config.log_level = 2; config.worker = true; config.host = target_peer_addr;