Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Remove transaction delays to improve throughput and change meaning of scenario runner users key #127

Merged
merged 4 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions runner/src/scenario/ceramic/model_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
let get_instance_tx = transaction!(get_instance).set_name("get_instance");

Ok(scenario!("CeramicModelReuseScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(create_instance_tx)
.register_transaction(get_instance_tx))
Expand Down
3 changes: 1 addition & 2 deletions runner/src/scenario/ceramic/new_streams.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::goose_try;
use ceramic_http_client::CeramicHttpClient;
use goose::prelude::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use crate::scenario::ceramic::util::goose_error;
use crate::scenario::ceramic::{
Expand All @@ -27,7 +27,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
transaction!(instantiate_large_model).set_name("instantiate_large_model");

Ok(scenario!("CeramicNewStreams")
.set_wait_time(Duration::from_millis(10), Duration::from_millis(100))?
.register_transaction(test_start)
.register_transaction(instantiate_small_model)
.register_transaction(instantiate_large_model))
Expand Down
4 changes: 1 addition & 3 deletions runner/src/scenario/ceramic/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ceramic_http_client::{
};
use goose::prelude::*;
use std::collections::HashMap;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use tracing::instrument;

#[derive(Clone)]
Expand Down Expand Up @@ -64,8 +64,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
transaction!(query_models_post_update).set_name("post_update_query_models");

Ok(scenario!("CeramicQueryScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(pre_query_models)
.register_transaction(update_models)
Expand Down
4 changes: 1 addition & 3 deletions runner/src/scenario/ceramic/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use ceramic_http_client::ceramic_event::StreamId;
use ceramic_http_client::CeramicHttpClient;
use ceramic_http_client::{ModelAccountRelation, ModelDefinition};
use goose::prelude::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use tracing::instrument;

pub(crate) struct LoadTestUserData {
Expand Down Expand Up @@ -72,8 +72,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
let get_large_model = transaction!(get_large_model).set_name("get_large_model");

Ok(scenario!("CeramicSimpleScenario")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
.register_transaction(test_start)
.register_transaction(update_small_model)
.register_transaction(get_small_model)
Expand Down
3 changes: 1 addition & 2 deletions runner/src/scenario/ceramic/write_only.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ceramic_http_client::CeramicHttpClient;
use goose::prelude::*;
use std::{sync::Arc, time::Duration};
use std::sync::Arc;

use crate::scenario::ceramic::simple::{setup, update_large_model, update_small_model};
use crate::scenario::ceramic::util::goose_error;
Expand All @@ -23,7 +23,6 @@ pub async fn scenario() -> Result<Scenario, GooseError> {
let update_large_model = transaction!(update_large_model).set_name("update_large_model");

Ok(scenario!("CeramicWriteOnly")
.set_wait_time(Duration::from_millis(9000), Duration::from_millis(11000))?
.register_transaction(setup)
.register_transaction(update_small_model)
.register_transaction(update_large_model))
Expand Down
2 changes: 0 additions & 2 deletions runner/src/scenario/ipfs_block_fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub fn scenario(topo: Topology) -> Result<Scenario> {
.set_on_stop();

Ok(scenario!("IpfsRpc")
// After each transactions runs, sleep randomly from 1 to 5 seconds.
.set_wait_time(Duration::from_secs(1), Duration::from_secs(5))?
// This transaction only runs one time when the user first starts.
.register_transaction(put)
// These next two transactions run repeatedly as long as the load test is running.
Expand Down
58 changes: 33 additions & 25 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
sync::{Arc, Mutex},
};

use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, Result};
use clap::{Args, ValueEnum};
use goose::{config::GooseConfiguration, prelude::GooseMetrics, GooseAttack};
use keramik_common::peer_info::Peer;
Expand Down Expand Up @@ -35,8 +35,14 @@ pub struct Opts {
#[arg(long, env = "SIMULATE_PEERS_PATH")]
peers: PathBuf,

/// Number of users to simulate
#[arg(long, default_value_t = 100, env = "SIMULATE_USERS")]
/// Number of users to simulate on each node. The total number of users
/// running the test scenario will be this value * N nodes.
///
/// Implmentation details: A user corresponds to a tokio task responsible
/// for making requests. They should have low memory overhead, so you can
/// create many users and then use `throttle_requests` to constrain the overall
/// throughput on the node (specifically the HTTP requests made).
#[arg(long, default_value_t = 4, env = "SIMULATE_USERS")]
users: usize,

/// Duration of the simulation
Expand All @@ -57,6 +63,7 @@ pub struct Opts {
pub struct Topology {
pub target_worker: usize,
pub total_workers: usize,
pub users: usize,
pub nonce: u64,
}

Expand Down Expand Up @@ -126,23 +133,34 @@ impl Scenario {
pub async fn simulate(opts: Opts) -> Result<()> {
let mut metrics = Metrics::init(&opts)?;

let peers: Vec<Peer> = parse_peers_info(opts.peers)
let peers: Vec<Peer> = parse_peers_info(&opts.peers)
.await?
.into_iter()
.filter(|peer| matches!(peer, Peer::Ceramic(_)))
.collect();

if opts.manager && opts.users % peers.len() != 0 {
bail!("number of users {} must be a multiple of the number of peers {}, this ensures we can deterministically identifiy each user", opts.users, peers.len())
}
// We assume exactly one worker per peer.
// This allows us to be deterministic in how each user operates.
// use user value as number of users per worker, rather than total users that must be evenly divided across all workers
let topo = Topology {
target_worker: opts.target_peer,
total_workers: peers.len(),
users: opts.users * peers.len(),
nonce: opts.nonce,
};

let config = if opts.manager {
manager_config(&topo, opts.run_time)
} else {
worker_config(
opts.scenario.target_addr(
peers
.get(opts.target_peer)
.ok_or_else(|| anyhow!("target peer too large, not enough peers"))?,
)?,
opts.throttle_requests
.or_else(|| opts.scenario.throttle_requests()),
)
};

let scenario = match opts.scenario {
Scenario::IpfsRpc => ipfs_block_fetch::scenario(topo)?,
Scenario::CeramicSimple => ceramic::simple::scenario().await?,
Expand All @@ -159,19 +177,6 @@ pub async fn simulate(opts: Opts) -> Result<()> {
.await?
}
};
let config = if opts.manager {
manager_config(peers.len(), opts.users, opts.run_time)
} else {
worker_config(
opts.scenario.target_addr(
peers
.get(opts.target_peer)
.ok_or_else(|| anyhow!("target peer too large, not enough peers"))?,
)?,
opts.throttle_requests
.or_else(|| opts.scenario.throttle_requests()),
)
};

let goose_metrics = match GooseAttack::initialize_with_config(config)?
.register_scenario(scenario)
Expand All @@ -190,13 +195,13 @@ pub async fn simulate(opts: Opts) -> Result<()> {
Ok(())
}

fn manager_config(count: usize, users: usize, run_time: String) -> GooseConfiguration {
fn manager_config(topo: &Topology, run_time: String) -> GooseConfiguration {
let mut config = GooseConfiguration::default();
config.log_level = 2;
config.users = Some(users);
config.users = Some(topo.users);
config.manager = true;
config.manager_bind_port = 5115;
config.expect_workers = Some(count);
config.expect_workers = Some(topo.total_workers);
config.startup_time = "10s".to_owned();
config.run_time = run_time;
config
Expand All @@ -206,6 +211,9 @@ fn worker_config(target_peer_addr: String, throttle_requests: Option<usize>) ->
config.scenario_log = "scenario.log".to_owned();
config.transaction_log = "transaction.log".to_owned();
config.request_log = "request.log".to_owned();
config.transaction_log = "transaction.log".to_owned();
config.scenario_log = "scenario.log".to_owned();
dav1do marked this conversation as resolved.
Show resolved Hide resolved
config.error_log = "error.log".to_owned();
config.log_level = 2;
config.worker = true;
config.host = target_peer_addr;
Expand Down