Skip to content

Commit

Permalink
feat: pass cas_controller to cas-benchmark scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
Samika Kashyap committed Jun 13, 2024
1 parent 7c65867 commit c123662
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ FROM exec as operator

COPY --from=builder /home/builder/keramik/keramik-operator /usr/bin

ENTRYPOINT ["/usr/bin/keramik-operator"]
ENTRYPOINT ["/usr/bin/keramik-operator"]
36 changes: 35 additions & 1 deletion operator/src/simulation/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,9 @@ async fn reconcile_(
throttle_requests: spec.throttle_requests,
success_request_target: spec.success_request_target,
log_level: spec.log_level.clone(),
anchor_wait_time: spec.anchor_wait_time.clone(),
anchor_wait_time: spec.anchor_wait_time,
cas_network: spec.cas_network.clone(),
cas_controller: spec.cas_controller.clone(),
};

apply_manager(cx.clone(), &ns, simulation.clone(), manager_config).await?;
Expand Down Expand Up @@ -833,4 +834,37 @@ mod tests {
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}

#[tokio::test]
#[traced_test]
async fn reconcile_cas_controller() {
let mock_rpc_client = MockIpfsRpcClientTest::new();
let (testctx, api_handle) = Context::test(mock_rpc_client);
let fakeserver = ApiServerVerifier::new(api_handle);
let simulation = Simulation::test().with_spec(SimulationSpec {
cas_controller: Some("test-cas-controller".to_owned()),
..Default::default()
});
let mut stub = Stub::default();
stub.manager_job.patch(expect![[r#"
--- original
+++ modified
@@ -87,6 +87,10 @@
"name": "ceramic-admin"
}
}
+ },
+ {
+ "name": "SIMULATE_CAS_CONTROLLER",
+ "value": "test-cas-controller"
}
],
"image": "public.ecr.aws/r5b3e0r5/3box/keramik-runner:latest",
"#]]);
let mocksrv = stub.run(fakeserver);
reconcile(Arc::new(simulation), testctx)
.await
.expect("reconciler");
timeout_after_1s(mocksrv).await;
}
}
8 changes: 8 additions & 0 deletions operator/src/simulation/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct ManagerConfig {
pub log_level: Option<String>,
pub anchor_wait_time: Option<u32>,
pub cas_network: Option<String>,
pub cas_controller: Option<String>,
}

pub fn manager_job_spec(config: ManagerConfig) -> JobSpec {
Expand Down Expand Up @@ -147,6 +148,13 @@ pub fn manager_job_spec(config: ManagerConfig) -> JobSpec {
..Default::default()
})
}
if let Some(cas_controller) = config.cas_controller {
env_vars.push(EnvVar {
name: "SIMULATE_CAS_CONTROLLER".to_owned(),
value: Some(cas_controller.to_owned()),
..Default::default()
})
}
if let Some(log_level) = config.log_level {
env_vars.push(EnvVar {
name: "SIMULATE_LOG_LEVEL".to_owned(),
Expand Down
6 changes: 4 additions & 2 deletions operator/src/simulation/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ pub struct SimulationSpec {
/// which consumes RAM, and will disappear if there is no persistent volume when the pod exits.
/// Valid values: 'warn', 'info', 'debug', 'trace'. Defaults to None meaning no logging beyond RUST_LOG.
pub(crate) log_level: Option<String>,
/// Anchor wait time in seconds
/// Anchor wait time in seconds, use with ceramic-anchoring-benchmark scenario
pub anchor_wait_time: Option<u32>,
/// Network type to use for the simulation.
/// Network type to use for the simulation, use with cas-benchmark scenario
pub cas_network: Option<String>,
/// Controller DID for the simulation, use with cas-benchmark scenario
pub cas_controller: Option<String>,
}

impl Simulation {
Expand Down
21 changes: 15 additions & 6 deletions runner/src/scenario/ceramic/anchor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ pub async fn stream_tip_car(
pub async fn create_anchor_request_on_cas(
user: &mut GooseUser,
conn: Arc<tokio::sync::Mutex<MultiplexedConnection>>,
cas_network: Option<String>,
cas_controller: Option<String>,
) -> TransactionResult {
let cas_service_url = std::env::var("CAS_SERVICE_URL")
.unwrap_or_else(|_| "https://cas-dev-direct.3boxlabs.com".to_string());
let node_controller = std::env::var("node_controller")
.unwrap_or_else(|_| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string());
let cas_service_url =
cas_network.unwrap_or_else(|| "https://cas-dev-direct.3boxlabs.com".to_string());
let node_controller = cas_controller
.unwrap_or_else(|| "did:key:z6Mkh3pajt5brscshuDrCCber9nC9Ujpi7EcECveKtJPMEPo".to_string());
let (stream_id, genesis_cid, genesis_block) = crate::scenario::util::create_stream().unwrap();

let (root_cid, car_bytes) = stream_tip_car(
Expand Down Expand Up @@ -118,13 +120,20 @@ pub async fn create_anchor_request_on_cas(
Ok(())
}

pub async fn cas_benchmark() -> Result<Scenario, GooseError> {
pub async fn cas_benchmark(
cas_network: Option<String>,
cas_controller: Option<String>,
) -> Result<Scenario, GooseError> {
let redis_cli = get_redis_client().await.unwrap();
let multiplexed_conn = redis_cli.get_multiplexed_tokio_connection().await.unwrap();
let conn_mutex = Arc::new(Mutex::new(multiplexed_conn));
let create_anchor_request = Transaction::new(Arc::new(move |user| {
let conn_mutex_clone = conn_mutex.clone();
Box::pin(async move { create_anchor_request_on_cas(user, conn_mutex_clone).await })
let cas_network = cas_network.clone();
let cas_controller = cas_controller.clone();
Box::pin(async move {
create_anchor_request_on_cas(user, conn_mutex_clone, cas_network, cas_controller).await
})
}))
.set_name("create_anchor_request");

Expand Down
16 changes: 14 additions & 2 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ pub struct Opts {
// Pass ceramic network information to this
#[arg(long, env = "SIMULATE_CAS_NETWORK")]
cas_network: Option<String>,

#[arg(long, env = "SIMULATE_CAS_CONTROLLER")]
cas_controller: Option<String>,
}

#[derive(Debug, Clone, ValueEnum)]
Expand Down Expand Up @@ -131,6 +134,7 @@ pub struct Topology {
pub struct ScenarioOptions {
pub anchor_wait_time: Option<usize>,
pub cas_network: Option<String>,
pub cas_controller: Option<String>,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -419,6 +423,7 @@ impl ScenarioState {
let scenario_opts: ScenarioOptions = ScenarioOptions {
anchor_wait_time: opts.anchor_wait_time.map(|t| t as usize),
cas_network: opts.cas_network,
cas_controller: opts.cas_controller,
};
Ok(Self {
topo,
Expand Down Expand Up @@ -453,7 +458,13 @@ impl ScenarioState {
}
Scenario::CeramicQuery => ceramic::query::scenario(self.scenario.into()).await?,
Scenario::ReconEventSync => recon_sync::event_sync_scenario().await?,
Scenario::CASBenchmark => ceramic::anchor::cas_benchmark().await?,
Scenario::CASBenchmark => {
ceramic::anchor::cas_benchmark(
self.scenario_opts.cas_network.clone(),
self.scenario_opts.cas_controller.clone(),
)
.await?
}
};
self.collect_before_metrics().await?;
Ok(scenario)
Expand Down Expand Up @@ -727,7 +738,7 @@ impl ScenarioState {
Duration::from_secs(self.scenario_opts.anchor_wait_time.unwrap_or_default() as u64);
sleep(wait_duration).await;
info!("Waiting for {} seconds", wait_duration.as_secs());
// Pick a peer at random
// Pick the first peer as the validator. It should have all the streams synced to it
let peer = self.peers.first().unwrap();

let ids = self.get_set_from_redis(ANCHOR_REQUEST_MIDS_KEY).await?;
Expand Down Expand Up @@ -1318,6 +1329,7 @@ mod test {
target_request_rate,
anchor_wait_time: None,
cas_network: None,
cas_controller: None,
}
}

Expand Down

0 comments on commit c123662

Please sign in to comment.