Skip to content

Commit

Permalink
feat: Remove transaction delays to improve throughput and change mean…
Browse files Browse the repository at this point in the history
…ing of scenario runner users key (#127)

* fix: adjust runner user count to mean users/worker and adjust default value
* feat: capture worker error logs
* feat: delete long wait times between scenario tx
  • Loading branch information
dav1do committed Jan 17, 2024
1 parent 3705719 commit 6586747
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 39 deletions.
2 changes: 0 additions & 2 deletions runner/src/scenario/ceramic/model_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
let get_instance_tx = transaction!(get_instance).set_name("get_instance");

Ok(scenario!("CeramicModelReuseScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(create_instance_tx)
.register_transaction(get_instance_tx))
Expand Down
3 changes: 1 addition & 2 deletions runner/src/scenario/ceramic/new_streams.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::goose_try;
use ceramic_http_client::CeramicHttpClient;
use goose::prelude::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use crate::scenario::ceramic::util::goose_error;
use crate::scenario::ceramic::{
Expand All @@ -27,7 +27,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
transaction!(instantiate_large_model).set_name("instantiate_large_model");

Ok(scenario!("CeramicNewStreams")
.set_wait_time(Duration::from_millis(10), Duration::from_millis(100))?
.register_transaction(test_start)
.register_transaction(instantiate_small_model)
.register_transaction(instantiate_large_model))
Expand Down
4 changes: 1 addition & 3 deletions runner/src/scenario/ceramic/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ceramic_http_client::{
};
use goose::prelude::*;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use tracing::instrument;

#[derive(Clone)]
Expand Down Expand Up @@ -64,8 +64,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
transaction!(query_models_post_update).set_name("post_update_query_models");

Ok(scenario!("CeramicQueryScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(pre_query_models)
.register_transaction(update_models)
Expand Down
4 changes: 1 addition & 3 deletions runner/src/scenario/ceramic/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ceramic_http_client::ceramic_event::StreamId;
use ceramic_http_client::CeramicHttpClient;
use ceramic_http_client::{ModelAccountRelation, ModelDefinition};
use goose::prelude::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use tracing::instrument;

pub(crate) struct LoadTestUserData {
Expand Down Expand Up @@ -72,8 +72,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
let get_large_model = transaction!(get_large_model).set_name("get_large_model");

Ok(scenario!("CeramicSimpleScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(update_small_model)
.register_transaction(get_small_model)
Expand Down
3 changes: 1 addition & 2 deletions runner/src/scenario/ceramic/write_only.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ceramic_http_client::CeramicHttpClient;
use goose::prelude::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use crate::scenario::ceramic::simple::{setup, update_large_model, update_small_model};
use crate::scenario::ceramic::util::goose_error;
Expand All @@ -23,7 +23,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
let update_large_model = transaction!(update_large_model).set_name("update_large_model");

Ok(scenario!("CeramicWriteOnly")
.set_wait_time(Duration::from_millis(9000), Duration::from_millis(11000))?
.register_transaction(setup)
.register_transaction(update_small_model)
.register_transaction(update_large_model))
Expand Down
2 changes: 0 additions & 2 deletions runner/src/scenario/ipfs_block_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub fn scenario(topo: Topology) -> Result<Scenario> {
.set_on_stop();

Ok(scenario!("IpfsRpc")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
// This transaction only runs one time when the user first starts.
.register_transaction(put)
// These next two transactions run repeatedly as long as the load test is running.
Expand Down
56 changes: 31 additions & 25 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::{Arc, Mutex},
};

use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, Result};
use clap::{Args, ValueEnum};
use goose::{config::GooseConfiguration, prelude::GooseMetrics, GooseAttack};
use keramik_common::peer_info::Peer;
Expand Down Expand Up @@ -35,8 +35,14 @@ pub struct Opts {
#[arg(long, env = "SIMULATE_PEERS_PATH")]
peers: PathBuf,

/// Number of users to simulate
#[arg(long, default_value_t = 100, env = "SIMULATE_USERS")]
/// Number of users to simulate on each node. The total number of users
/// running the test scenario will be this value * N nodes.
///
/// Implmentation details: A user corresponds to a tokio task responsible
/// for making requests. They should have low memory overhead, so you can
/// create many users and then use `throttle_requests` to constrain the overall
/// throughput on the node (specifically the HTTP requests made).
#[arg(long, default_value_t = 4, env = "SIMULATE_USERS")]
users: usize,

/// Duration of the simulation
Expand All @@ -57,6 +63,7 @@ pub struct Opts {
pub struct Topology {
pub target_worker: usize,
pub total_workers: usize,
pub users: usize,
pub nonce: u64,
}

Expand Down Expand Up @@ -126,23 +133,34 @@ impl Scenario {
pub async fn simulate(opts: Opts) -> Result<()> {
let mut metrics = Metrics::init(&opts)?;

let peers: Vec<Peer> = parse_peers_info(opts.peers)
let peers: Vec<Peer> = parse_peers_info(&opts.peers)
.await?
.into_iter()
.filter(|peer| matches!(peer, Peer::Ceramic(_)))
.collect();

if opts.manager && opts.users % peers.len() != 0 {
bail!("number of users {} must be a multiple of the number of peers {}, this ensures we can deterministically identifiy each user", opts.users, peers.len())
}
// We assume exactly one worker per peer.
// This allows us to be deterministic in how each user operates.
// use user value as number of users per worker, rather than total users that must be evenly divided across all workers
let topo = Topology {
target_worker: opts.target_peer,
total_workers: peers.len(),
users: opts.users * peers.len(),
nonce: opts.nonce,
};

let config = if opts.manager {
manager_config(&topo, opts.run_time)
} else {
worker_config(
opts.scenario.target_addr(
peers
.get(opts.target_peer)
.ok_or_else(|| anyhow!("target peer too large, not enough peers"))?,
)?,
opts.throttle_requests
.or_else(|| opts.scenario.throttle_requests()),
)
};

let scenario = match opts.scenario {
Scenario::IpfsRpc => ipfs_block_fetch::scenario(topo)?,
Scenario::CeramicSimple => ceramic::simple::scenario().await?,
Expand All @@ -159,19 +177,6 @@ pub async fn simulate(opts: Opts) -> Result<()> {
.await?
}
};
let config = if opts.manager {
manager_config(peers.len(), opts.users, opts.run_time)
} else {
worker_config(
opts.scenario.target_addr(
peers
.get(opts.target_peer)
.ok_or_else(|| anyhow!("target peer too large, not enough peers"))?,
)?,
opts.throttle_requests
.or_else(|| opts.scenario.throttle_requests()),
)
};

let goose_metrics = match GooseAttack::initialize_with_config(config)?
.register_scenario(scenario)
Expand All @@ -190,13 +195,13 @@ pub async fn simulate(opts: Opts) -> Result<()> {
Ok(())
}

fn manager_config(count: usize, users: usize, run_time: String) -> GooseConfiguration {
fn manager_config(topo: &Topology, run_time: String) -> GooseConfiguration {
let mut config = GooseConfiguration::default();
config.log_level = 2;
config.users = Some(users);
config.users = Some(topo.users);
config.manager = true;
config.manager_bind_port = 5115;
config.expect_workers = Some(count);
config.expect_workers = Some(topo.total_workers);
config.startup_time = "10s".to_owned();
config.run_time = run_time;
config
Expand All @@ -206,6 +211,7 @@ fn worker_config(target_peer_addr: String, throttle_requests: Option<usize>) ->
config.scenario_log = "scenario.log".to_owned();
config.transaction_log = "transaction.log".to_owned();
config.request_log = "request.log".to_owned();
config.error_log = "error.log".to_owned();
config.log_level = 2;
config.worker = true;
config.host = target_peer_addr;
Expand Down

0 comments on commit 6586747

Please sign in to comment.