Skip to content

Commit

Permalink
feat: update recon sync test to create random car files
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 29, 2024
1 parent fccfc89 commit 82f3f06
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 71 deletions.
27 changes: 26 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ anyhow = "1"
async-trait = "0.1"
clap = { version = "4", features = ["derive", "env"] }
console-subscriber = "0.2"
ceramic-core = { git = "https://github.com/ceramicnetwork/rust-ceramic.git", branch = "main"}
ceramic-core = { git = "https://github.com/ceramicnetwork/rust-ceramic.git", branch = "main" }
iroh-car = { git = "https://github.com/ceramicnetwork/rust-ceramic.git", branch = "main" }
env_logger = "0.10.0"
expect-patch = { path = "./expect-patch/" }
keramik-common = { path = "./common/", default-features = false }
Expand Down
4 changes: 3 additions & 1 deletion runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ edition = "2021"
anyhow.workspace = true
async-trait.workspace = true
ceramic-core.workspace = true
iroh-car.workspace = true
ceramic-http-client = { git = "https://github.com/3box/ceramic-http-client-rs.git", branch = "main", default-features = false }
#ceramic-http-client = { path = "../../ceramic-http-client-rs", default-features = false }
clap.workspace = true
Expand All @@ -31,4 +32,5 @@ tracing-subscriber.workspace = true
multibase.workspace = true

[dev-dependencies]
test-log = "0.2"
test-log = "0.2"

79 changes: 27 additions & 52 deletions runner/src/scenario/ceramic/recon_sync.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
use crate::scenario::ceramic::model_reuse::{get_model_id, set_model_id};
use crate::scenario::ceramic::models;
use crate::scenario::ceramic::util::setup_model;
use crate::scenario::get_redis_client;
use ceramic_core::{Cid, EventId};
use ceramic_http_client::ceramic_event::StreamId;
use ceramic_http_client::{CeramicHttpClient, ModelAccountRelation, ModelDefinition};
use ceramic_http_client::ceramic_event::{StreamId, StreamIdType};
use goose::prelude::*;
use libipld::cid;
use multihash::{Code, MultihashDigest};
use rand::rngs::ThreadRng;
use rand::Rng;
use reqwest::Url;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::{sync::Arc, time::Duration};
use tracing::{info, instrument};

use super::util::goose_error;
use super::{CeramicClient, Credentials};

const MODEL_ID_KEY: &str = "event_id_sync_model_id";
pub(crate) const CREATE_EVENT_TX_NAME: &str = "create_new_event";
// goose stores the HTTP method + transaction name as the request name
Expand All @@ -43,25 +36,11 @@ struct ReconLoadTestUserData {
with_data: bool,
}

async fn init_scenario(
ipfs_peer_addr: Option<String>,
with_data: bool,
) -> Result<Transaction, GooseError> {
let ipfs_addr: Url = ipfs_peer_addr
.map(|u| u.parse().unwrap())
.expect("missing ipfs peer address in event ID scenario");
let creds = Credentials::from_env().await.map_err(goose_error)?;
let cli = CeramicHttpClient::new(creds.signer);
async fn init_scenario(with_data: bool) -> Result<Transaction, GooseError> {
let redis_cli = get_redis_client().await?;

let test_start = Transaction::new(Arc::new(move |user| {
Box::pin(setup(
user,
cli.clone(),
redis_cli.clone(),
ipfs_addr.clone(),
with_data,
))
Box::pin(setup(user, redis_cli.clone(), with_data))
}))
.set_name("setup")
.set_on_start();
Expand All @@ -82,8 +61,8 @@ async fn log_results(_user: &mut GooseUser) -> TransactionResult {
Ok(())
}

pub async fn event_sync_scenario(ipfs_peer_addr: Option<String>) -> Result<Scenario, GooseError> {
let test_start = init_scenario(ipfs_peer_addr, true).await?;
pub async fn event_sync_scenario() -> Result<Scenario, GooseError> {
let test_start = init_scenario(true).await?;
let create_new_event = transaction!(create_new_event).set_name(CREATE_EVENT_TX_NAME);
let stop = transaction!(log_results)
.set_name("log_results")
Expand All @@ -95,10 +74,8 @@ pub async fn event_sync_scenario(ipfs_peer_addr: Option<String>) -> Result<Scena
}

// accept option as goose manager builds the scenario as well, but doesn't need any peers and won't run it so it will always be Some in execution
pub async fn event_key_sync_scenario(
ipfs_peer_addr: Option<String>,
) -> Result<Scenario, GooseError> {
let test_start = init_scenario(ipfs_peer_addr, false).await?;
pub async fn event_key_sync_scenario() -> Result<Scenario, GooseError> {
let test_start = init_scenario(false).await?;

let create_new_event = transaction!(create_new_event).set_name(CREATE_EVENT_TX_NAME);

Expand All @@ -112,31 +89,17 @@ pub async fn event_key_sync_scenario(
#[instrument(skip_all, fields(user.index = user.weighted_users_index), ret)]
async fn setup(
user: &mut GooseUser,
cli: CeramicClient,
redis_cli: redis::Client,
ipfs_peer_addr: Url,
with_data: bool,
) -> TransactionResult {
let mut conn = redis_cli.get_async_connection().await.unwrap();
let first = is_first_user();
let model_id = if should_request_events() && first {
info!("creating model for event ID sync test");
let small_model = match ModelDefinition::new::<models::SmallModel>(
"load_test_small_model",
ModelAccountRelation::List,
) {
Ok(model) => model,
Err(e) => {
tracing::error!("failed to create model: {}", e);
panic!("failed to create model: {}", e);
}
};
let model_id = match setup_model(user, &cli, small_model).await {
Ok(model_id) => model_id,
Err(e) => {
tracing::error!("failed to setup model: {:?}", e);
return Err(e);
}
// We only need a model ID we do not need it to be a real model.
let model_id = StreamId {
r#type: StreamIdType::Model,
cid: random_cid(),
};
set_model_id(&mut conn, &model_id, MODEL_ID_KEY).await;
model_id
Expand All @@ -152,7 +115,6 @@ async fn setup(
with_data,
};
user.set_session_data(user_data);
user.base_url = Some(ipfs_peer_addr); // Recon is only available on IPFS address right now

let request_builder = user
.get_request_builder(&GooseMethod::Get, &path)?
Expand Down Expand Up @@ -186,7 +148,7 @@ async fn create_new_event(user: &mut GooseUser) -> TransactionResult {
// eventId needs to be a multibase encoded string for the API to accept it
let event_id = format!("F{}", random_event_id(&user_data.model_id.to_string()));
let event_key_body = if user_data.with_data {
let payload = random_body_1kb_body();
let payload = random_car_1kb_body().await;
serde_json::json!({"eventId": event_id, "eventData": payload})
} else {
serde_json::json!({"eventId": event_id})
Expand Down Expand Up @@ -235,11 +197,24 @@ fn random_event_id(sort_value: &str) -> ceramic_core::EventId {
)
}

fn random_body_1kb_body() -> String {
fn random_block() -> (Cid, Vec<u8>) {
let mut rng = rand::thread_rng();
TOTAL_BYTES_GENERATED.fetch_add(1000, std::sync::atomic::Ordering::Relaxed);
let unique: [u8; 1000] = gen_rand_bytes(&mut rng);
multibase::encode(multibase::Base::Base36Lower, unique)

let hash = ::multihash::MultihashDigest::digest(&::multihash::Code::Sha2_256, &unique);
(Cid::new_v1(0x00, hash), unique.to_vec())
}

async fn random_car_1kb_body() -> String {
let mut bytes = Vec::with_capacity(1500);
let (cid, block) = random_block();
let roots = vec![cid];
let mut writer = iroh_car::CarWriter::new(iroh_car::CarHeader::V1(roots.into()), &mut bytes);
writer.write(cid, block).await.unwrap();
writer.finish().await.unwrap();

multibase::encode(multibase::Base::Base36Lower, bytes)
}

fn gen_rand_bytes<const SIZE: usize>(rng: &mut ThreadRng) -> [u8; SIZE] {
Expand Down
22 changes: 6 additions & 16 deletions runner/src/simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,14 @@ impl Scenario {

fn target_addr(&self, peer: &Peer) -> Result<String> {
match self {
Self::IpfsRpc => Ok(peer.ipfs_rpc_addr().to_owned()),
Self::IpfsRpc | Self::ReconEventSync | Self::ReconEventKeySync => {
Ok(peer.ipfs_rpc_addr().to_owned())
}
Self::CeramicSimple
| Self::CeramicWriteOnly
| Self::CeramicNewStreams
| Self::CeramicQuery
| Self::CeramicModelReuse
| Self::ReconEventSync
| Self::ReconEventKeySync => Ok(peer
| Self::CeramicModelReuse => Ok(peer
.ceramic_addr()
.ok_or_else(|| {
anyhow!(
Expand Down Expand Up @@ -248,12 +248,8 @@ impl ScenarioState {
Scenario::CeramicNewStreams => ceramic::new_streams::scenario().await?,
Scenario::CeramicQuery => ceramic::query::scenario().await?,
Scenario::CeramicModelReuse => ceramic::model_reuse::scenario().await?,
Scenario::ReconEventSync => {
ceramic::recon_sync::event_sync_scenario(self.ipfs_peer_addr()).await?
}
Scenario::ReconEventKeySync => {
ceramic::recon_sync::event_key_sync_scenario(self.ipfs_peer_addr()).await?
}
Scenario::ReconEventSync => ceramic::recon_sync::event_sync_scenario().await?,
Scenario::ReconEventKeySync => ceramic::recon_sync::event_key_sync_scenario().await?,
};
self.collect_before_metrics().await?;
Ok(scenario)
Expand All @@ -267,12 +263,6 @@ impl ScenarioState {
)
}

fn ipfs_peer_addr(&self) -> Option<String> {
self.peers
.get(self.topo.target_worker)
.map(|p| p.ipfs_rpc_addr().to_owned())
}

/// Returns the counter value (or None) for each peer in order of the peers list
async fn get_peers_counter_metric(
&self,
Expand Down

0 comments on commit 82f3f06

Please sign in to comment.