From 23fa3107f81d23e9b34e910369a93d9340c8630c Mon Sep 17 00:00:00 2001 From: David Estes Date: Tue, 9 Jan 2024 14:10:31 -0700 Subject: [PATCH 1/4] fix: adjust meaning of user count and capture worker error/tx logs Now, the user value defines how many users to execute for each node. Previously, it was the total users and needed to be evenly divisible by the number of nodes. --- runner/src/simulate.rs | 48 ++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index add9e7fb..a2dced46 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -3,7 +3,7 @@ use std::{ sync::{Arc, Mutex}, }; -use anyhow::{anyhow, bail, Result}; +use anyhow::{anyhow, Result}; use clap::{Args, ValueEnum}; use goose::{config::GooseConfiguration, prelude::GooseMetrics, GooseAttack}; use keramik_common::peer_info::Peer; @@ -57,6 +57,7 @@ pub struct Opts { pub struct Topology { pub target_worker: usize, pub total_workers: usize, + pub users: usize, pub nonce: u64, } @@ -126,23 +127,34 @@ impl Scenario { pub async fn simulate(opts: Opts) -> Result<()> { let mut metrics = Metrics::init(&opts)?; - let peers: Vec = parse_peers_info(opts.peers) + let peers: Vec = parse_peers_info(&opts.peers) .await? .into_iter() .filter(|peer| matches!(peer, Peer::Ceramic(_))) .collect(); - if opts.manager && opts.users % peers.len() != 0 { - bail!("number of users {} must be a multiple of the number of peers {}, this ensures we can deterministically identifiy each user", opts.users, peers.len()) - } - // We assume exactly one worker per peer. - // This allows us to be deterministic in how each user operates. + // use user value as number of users per worker, rather than total users that must be evenly divided across all workers let topo = Topology { target_worker: opts.target_peer, total_workers: peers.len(), + users: opts.users * peers.len(), nonce: opts.nonce, }; + let config = if opts.manager { + manager_config(&topo, opts.run_time) + } else { + worker_config( + opts.scenario.target_addr( + peers + .get(opts.target_peer) + .ok_or_else(|| anyhow!("target peer too large, not enough peers"))?, + )?, + opts.throttle_requests + .or_else(|| opts.scenario.throttle_requests()), + ) + }; + let scenario = match opts.scenario { Scenario::IpfsRpc => ipfs_block_fetch::scenario(topo)?, Scenario::CeramicSimple => ceramic::simple::scenario().await?, @@ -159,19 +171,6 @@ pub async fn simulate(opts: Opts) -> Result<()> { .await? } }; - let config = if opts.manager { - manager_config(peers.len(), opts.users, opts.run_time) - } else { - worker_config( - opts.scenario.target_addr( - peers - .get(opts.target_peer) - .ok_or_else(|| anyhow!("target peer too large, not enough peers"))?, - )?, - opts.throttle_requests - .or_else(|| opts.scenario.throttle_requests()), - ) - }; let goose_metrics = match GooseAttack::initialize_with_config(config)? .register_scenario(scenario) @@ -190,13 +189,13 @@ pub async fn simulate(opts: Opts) -> Result<()> { Ok(()) } -fn manager_config(count: usize, users: usize, run_time: String) -> GooseConfiguration { +fn manager_config(topo: &Topology, run_time: String) -> GooseConfiguration { let mut config = GooseConfiguration::default(); config.log_level = 2; - config.users = Some(users); + config.users = Some(topo.users); config.manager = true; config.manager_bind_port = 5115; - config.expect_workers = Some(count); + config.expect_workers = Some(topo.total_workers); config.startup_time = "10s".to_owned(); config.run_time = run_time; config @@ -206,6 +205,9 @@ fn worker_config(target_peer_addr: String, throttle_requests: Option) -> config.scenario_log = "scenario.log".to_owned(); config.transaction_log = "transaction.log".to_owned(); config.request_log = "request.log".to_owned(); + config.transaction_log = "transaction.log".to_owned(); + config.scenario_log = "scenario.log".to_owned(); + config.error_log = "error.log".to_owned(); config.log_level = 2; config.worker = true; config.host = target_peer_addr; From 367b93862899146cda239ce2c31ef2b43c31c3d2 Mon Sep 17 00:00:00 2001 From: David Estes Date: Tue, 9 Jan 2024 14:11:12 -0700 Subject: [PATCH 2/4] feat: delete long wait times between scenario tx --- runner/src/scenario/ceramic/model_reuse.rs | 2 -- runner/src/scenario/ceramic/new_streams.rs | 3 +-- runner/src/scenario/ceramic/query.rs | 4 +--- runner/src/scenario/ceramic/simple.rs | 4 +--- runner/src/scenario/ceramic/write_only.rs | 3 +-- runner/src/scenario/ipfs_block_fetch.rs | 2 -- 6 files changed, 4 insertions(+), 14 deletions(-) diff --git a/runner/src/scenario/ceramic/model_reuse.rs b/runner/src/scenario/ceramic/model_reuse.rs index 34b0f935..300bb27f 100644 --- a/runner/src/scenario/ceramic/model_reuse.rs +++ b/runner/src/scenario/ceramic/model_reuse.rs @@ -37,8 +37,6 @@ pub async fn scenario() -> Result { let get_instance_tx = transaction!(get_instance).set_name("get_instance"); Ok(scenario!("CeramicModelReuseScenario") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? .register_transaction(test_start) .register_transaction(create_instance_tx) .register_transaction(get_instance_tx)) diff --git a/runner/src/scenario/ceramic/new_streams.rs b/runner/src/scenario/ceramic/new_streams.rs index c25588df..ac7c7e81 100644 --- a/runner/src/scenario/ceramic/new_streams.rs +++ b/runner/src/scenario/ceramic/new_streams.rs @@ -1,7 +1,7 @@ use crate::goose_try; use ceramic_http_client::CeramicHttpClient; use goose::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use crate::scenario::ceramic::util::goose_error; use crate::scenario::ceramic::{ @@ -27,7 +27,6 @@ pub async fn scenario() -> Result { transaction!(instantiate_large_model).set_name("instantiate_large_model"); Ok(scenario!("CeramicNewStreams") - .set_wait_time(Duration::from_millis(10), Duration::from_millis(100))? .register_transaction(test_start) .register_transaction(instantiate_small_model) .register_transaction(instantiate_large_model)) diff --git a/runner/src/scenario/ceramic/query.rs b/runner/src/scenario/ceramic/query.rs index 4e1f1eda..586b2d3a 100644 --- a/runner/src/scenario/ceramic/query.rs +++ b/runner/src/scenario/ceramic/query.rs @@ -9,7 +9,7 @@ use ceramic_http_client::{ }; use goose::prelude::*; use std::collections::HashMap; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tracing::instrument; #[derive(Clone)] @@ -64,8 +64,6 @@ pub async fn scenario() -> Result { transaction!(query_models_post_update).set_name("post_update_query_models"); Ok(scenario!("CeramicQueryScenario") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? .register_transaction(test_start) .register_transaction(pre_query_models) .register_transaction(update_models) diff --git a/runner/src/scenario/ceramic/simple.rs b/runner/src/scenario/ceramic/simple.rs index 343c87bb..f057485f 100644 --- a/runner/src/scenario/ceramic/simple.rs +++ b/runner/src/scenario/ceramic/simple.rs @@ -9,7 +9,7 @@ use ceramic_http_client::ceramic_event::StreamId; use ceramic_http_client::CeramicHttpClient; use ceramic_http_client::{ModelAccountRelation, ModelDefinition}; use goose::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use tracing::instrument; pub(crate) struct LoadTestUserData { @@ -72,8 +72,6 @@ pub async fn scenario() -> Result { let get_large_model = transaction!(get_large_model).set_name("get_large_model"); Ok(scenario!("CeramicSimpleScenario") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? .register_transaction(test_start) .register_transaction(update_small_model) .register_transaction(get_small_model) diff --git a/runner/src/scenario/ceramic/write_only.rs b/runner/src/scenario/ceramic/write_only.rs index 6f429b74..c9ee37dc 100644 --- a/runner/src/scenario/ceramic/write_only.rs +++ b/runner/src/scenario/ceramic/write_only.rs @@ -1,6 +1,6 @@ use ceramic_http_client::CeramicHttpClient; use goose::prelude::*; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; use crate::scenario::ceramic::simple::{setup, update_large_model, update_small_model}; use crate::scenario::ceramic::util::goose_error; @@ -23,7 +23,6 @@ pub async fn scenario() -> Result { let update_large_model = transaction!(update_large_model).set_name("update_large_model"); Ok(scenario!("CeramicWriteOnly") - .set_wait_time(Duration::from_millis(9000), Duration::from_millis(11000))? .register_transaction(setup) .register_transaction(update_small_model) .register_transaction(update_large_model)) diff --git a/runner/src/scenario/ipfs_block_fetch.rs b/runner/src/scenario/ipfs_block_fetch.rs index fbc6fdb2..4d57e397 100644 --- a/runner/src/scenario/ipfs_block_fetch.rs +++ b/runner/src/scenario/ipfs_block_fetch.rs @@ -27,8 +27,6 @@ pub fn scenario(topo: Topology) -> Result { .set_on_stop(); Ok(scenario!("IpfsRpc") - // After each transactions runs, sleep randomly from 1 to 5 seconds. - .set_wait_time(Duration::from_secs(1), Duration::from_secs(5))? // This transaction only runs one time when the user first starts. .register_transaction(put) // These next two transactions run repeatedly as long as the load test is running. From 35867a62d29aa22c2030042f4c7222a0a7ebe9ad Mon Sep 17 00:00:00 2001 From: David Estes Date: Wed, 10 Jan 2024 10:03:56 -0700 Subject: [PATCH 3/4] feat: adjust runner users default and comment about impl --- runner/src/simulate.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index a2dced46..eea1adcb 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -35,8 +35,14 @@ pub struct Opts { #[arg(long, env = "SIMULATE_PEERS_PATH")] peers: PathBuf, - /// Number of users to simulate - #[arg(long, default_value_t = 100, env = "SIMULATE_USERS")] + /// Number of users to simulate on each node. The total number of users + /// running the test scenario will be this value * N nodes. + /// + /// Implmentation details: A user corresponds to a tokio task responsible + /// for making requests. They should have low memory overhead, so you can + /// create many users and then use `throttle_requests` to constrain the overall + /// throughput on the node (specifically the HTTP requests made). + #[arg(long, default_value_t = 4, env = "SIMULATE_USERS")] users: usize, /// Duration of the simulation From b80c8671ba73efe238b637e716a6c69812ca99d4 Mon Sep 17 00:00:00 2001 From: David Estes Date: Thu, 11 Jan 2024 10:04:46 -0700 Subject: [PATCH 4/4] fix: setting log paths twice --- runner/src/simulate.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/runner/src/simulate.rs b/runner/src/simulate.rs index eea1adcb..bbe44566 100644 --- a/runner/src/simulate.rs +++ b/runner/src/simulate.rs @@ -211,8 +211,6 @@ fn worker_config(target_peer_addr: String, throttle_requests: Option) -> config.scenario_log = "scenario.log".to_owned(); config.transaction_log = "transaction.log".to_owned(); config.request_log = "request.log".to_owned(); - config.transaction_log = "transaction.log".to_owned(); - config.scenario_log = "scenario.log".to_owned(); config.error_log = "error.log".to_owned(); config.log_level = 2; config.worker = true;