diff --git a/eden/mononoke/commit_rewriting/bookmarks_validator/Cargo.toml b/eden/mononoke/commit_rewriting/bookmarks_validator/Cargo.toml index 37df569690467..5d70844242814 100644 --- a/eden/mononoke/commit_rewriting/bookmarks_validator/Cargo.toml +++ b/eden/mononoke/commit_rewriting/bookmarks_validator/Cargo.toml @@ -12,8 +12,8 @@ anyhow = "1.0.95" async-trait = "0.1.71" blobstore_factory = { version = "0.1.0", path = "../../blobstore/factory" } bookmarks = { version = "0.1.0", path = "../../bookmarks" } -cmdlib = { version = "0.1.0", path = "../../cmdlib" } -cmdlib_x_repo = { version = "0.1.0", path = "../../cmdlib/x_repo" } +clap = { version = "4.5.20", features = ["derive", "env", "string", "unicode", "wrap_help"] } +cmdlib_cross_repo = { version = "0.1.0", path = "../../cmdlib/cross_repo" } context = { version = "0.1.0", path = "../../server/context" } cross_repo_sync = { version = "0.1.0", path = "../cross_repo_sync" } environment = { version = "0.1.0", path = "../../cmdlib/environment" } @@ -22,10 +22,10 @@ fbinit = { version = "0.2.0", git = "https://github.com/facebookexperimental/rus futures = { version = "0.3.30", features = ["async-await", "compat"] } justknobs = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } metadata = { version = "0.1.0", path = "../../server/metadata" } +mononoke_app = { version = "0.1.0", path = "../../cmdlib/mononoke_app" } mononoke_types = { version = "0.1.0", path = "../../mononoke_types" } pushredirect = { version = "0.1.0", path = "../../megarepo_api/pushredirect" } sapling-clientinfo = { version = "0.1.0", path = "../../../scm/lib/clientinfo" } -scuba_ext = { version = "0.1.0", path = "../../common/scuba_ext" } sharding_ext = { version = "0.1.0", path = "../../cmdlib/sharding_ext" } slog = { version = "2.7", features = ["max_level_trace", "nested-values"] } stats = { version = "0.1.0", git = "https://github.com/facebookexperimental/rust-shed.git", branch = "main" } diff --git a/eden/mononoke/commit_rewriting/bookmarks_validator/src/main.rs b/eden/mononoke/commit_rewriting/bookmarks_validator/src/main.rs index 10854968804de..2b26ecf175f0d 100644 --- a/eden/mononoke/commit_rewriting/bookmarks_validator/src/main.rs +++ b/eden/mononoke/commit_rewriting/bookmarks_validator/src/main.rs @@ -8,23 +8,20 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use std::sync::OnceLock; use std::time::Duration; use anyhow::bail; -use anyhow::format_err; use anyhow::Context; use anyhow::Error; +use anyhow::Result; use async_trait::async_trait; use blobstore_factory::MetadataSqlFactory; use bookmarks::BookmarkKey; use bookmarks::Freshness; +use clap::Parser; use clientinfo::ClientEntryPoint; use clientinfo::ClientInfo; -use cmdlib::args; -use cmdlib::args::MononokeMatches; -use cmdlib::helpers; -use cmdlib_x_repo::create_commit_syncers_from_matches; +use cmdlib_cross_repo::create_commit_syncers_from_app; use context::CoreContext; use context::SessionContainer; use cross_repo_sync::find_bookmark_diff; @@ -34,21 +31,25 @@ use cross_repo_sync::CommitSyncer; use cross_repo_sync::Repo as CrossRepo; use cross_repo_sync::Syncers; use environment::MononokeEnvironment; +use executor_lib::args::ShardedExecutorArgs; use executor_lib::RepoShardedProcess; use executor_lib::RepoShardedProcessExecutor; -use executor_lib::ShardedProcessExecutor; use fbinit::FacebookInit; use futures::future; use futures::TryStreamExt; use metadata::Metadata; +use mononoke_app::args::OptSourceAndTargetRepoArgs; +use mononoke_app::args::SourceAndTargetRepoArgs; +use mononoke_app::monitoring::AliveService; +use mononoke_app::monitoring::MonitoringAppExtension; +use mononoke_app::MononokeApp; +use mononoke_app::MononokeAppBuilder; use mononoke_types::ChangesetId; use pushredirect::PushRedirectionConfig; use pushredirect::SqlPushRedirectionConfigBuilder; -use scuba_ext::MononokeScubaSampleBuilder; use sharding_ext::RepoShard; use slog::error; use slog::info; -use slog::Logger; use stats::prelude::*; define_stats! { @@ -59,38 +60,36 @@ define_stats! { ), } -const SM_SERVICE_SCOPE: &str = "global"; const SM_CLEANUP_TIMEOUT_SECS: u64 = 120; -const APP_NAME: &str = "megarepo_bookmarks_validator"; type Repo = cross_repo_sync::ConcreteRepo; +#[derive(Debug, Parser)] +#[clap(about = "Tool to validate that small and large repo bookmarks are in sync")] +pub struct BookmarkValidatorArgs { + #[clap(flatten)] + pub sharded_executor_args: ShardedExecutorArgs, + #[clap(flatten, next_help_heading = "CROSS REPO OPTIONS")] + pub repo_args: OptSourceAndTargetRepoArgs, +} + /// Struct representing the Bookmark Validate BP. pub struct BookmarkValidateProcess { - matches: Arc>, - fb: FacebookInit, + ctx: Arc, + pub(crate) app: Arc, } impl BookmarkValidateProcess { - fn new(fb: FacebookInit) -> anyhow::Result { - let app_name = "Tool to validate that small and large repo bookmarks are in sync"; - let app = args::MononokeAppBuilder::new(app_name) - .with_source_and_target_repos() - .with_dynamic_repos() - .with_fb303_args() - .build(); - let (matches, _runtime) = app.get_matches(fb)?; - let matches = Arc::new(matches); - Ok(Self { matches, fb }) + fn new(ctx: Arc, app: Arc) -> Self { + Self { app, ctx } } } #[async_trait] impl RepoShardedProcess for BookmarkValidateProcess { - async fn setup(&self, repo: &RepoShard) -> anyhow::Result> { - let logger = self.matches.logger(); - let env = self.matches.environment(); - // For bookmark validator, two repos (i.e. source and target) are required as input + async fn setup(&self, repo: &RepoShard) -> Result> { + let logger = self.ctx.logger(); + let source_repo_name = repo.repo_name.clone(); let target_repo_name = match repo.target_repo_name.clone() { Some(repo_name) => repo_name, @@ -109,38 +108,18 @@ impl RepoShardedProcess for BookmarkValidateProcess { source_repo_name, target_repo_name, ); - let ctx = - create_core_context(self.fb, logger.clone()).clone_with_repo_name(&repo.to_string()); - let config_store = self.matches.config_store().clone(); - let source_repo_id = - args::resolve_repo_by_name(&config_store, &self.matches, &source_repo_name)?.id; - let target_repo_id = - args::resolve_repo_by_name(&config_store, &self.matches, &target_repo_name)?.id; - - let syncers = create_commit_syncers_from_matches::( - &ctx, - &self.matches, - Some((source_repo_id, target_repo_id)), - ) - .await?; - if syncers.large_to_small.get_large_repo().repo_identity().id() != source_repo_id { - let details = format!( - "Source repo must be a large repo!. Source repo: {}, Target repo: {}", - source_repo_name, target_repo_name - ); - error!(logger, "{}", details); - bail!("{}", details); - } + let repo_args = SourceAndTargetRepoArgs::with_source_and_target_repo_name( + source_repo_name.clone(), + target_repo_name.clone(), + ); + + let executor = + BookmarkValidateProcessExecutor::new(self.ctx.clone(), self.app.clone(), repo_args) + .await?; + let details = format!( "Completed bookmark validate command setup from repo {} to repo {}", - source_repo_name, target_repo_name - ); - let executor = BookmarkValidateProcessExecutor::new( - syncers, - ctx, - env.clone(), - source_repo_name, - target_repo_name, + &source_repo_name, &target_repo_name ); info!(logger, "{}", details,); Ok(Arc::new(executor)) @@ -150,8 +129,8 @@ impl RepoShardedProcess for BookmarkValidateProcess { /// Struct representing the execution of the Bookmark Validate /// BP over the context of a provided repos. pub struct BookmarkValidateProcessExecutor { - syncers: Syncers, - ctx: CoreContext, + syncers: Syncers>, + ctx: Arc, env: Arc, cancellation_requested: Arc, source_repo_name: String, @@ -159,21 +138,46 @@ pub struct BookmarkValidateProcessExecutor { } impl BookmarkValidateProcessExecutor { - fn new( - syncers: Syncers, - ctx: CoreContext, - env: Arc, - source_repo_name: String, - target_repo_name: String, - ) -> Self { - Self { + async fn new( + ctx: Arc, + app: Arc, + repo_args: SourceAndTargetRepoArgs, + ) -> Result { + let env = app.environment().clone(); + let logger = ctx.logger(); + + let source_repo: Arc = app.open_repo_unredacted(&repo_args.source_repo).await?; + let target_repo: Arc = app.open_repo_unredacted(&repo_args.target_repo).await?; + + let syncers = create_commit_syncers_from_app( + &ctx, + app.as_ref(), + source_repo.clone(), + target_repo.clone(), + ) + .await?; + + let source_repo_id = source_repo.repo_identity().id(); + let source_repo_name = source_repo.repo_identity().name(); + let target_repo_name = target_repo.repo_identity().name(); + + if syncers.large_to_small.get_large_repo().repo_identity().id() != source_repo_id { + let details = format!( + "Source repo must be a large repo!. Source repo: {}, Target repo: {}", + &source_repo_name, &target_repo_name + ); + error!(logger, "{}", details); + bail!("{}", details); + } + + Ok(Self { syncers, ctx, env, - source_repo_name, - target_repo_name, + source_repo_name: source_repo_name.to_string(), + target_repo_name: target_repo_name.to_string(), cancellation_requested: Arc::new(AtomicBool::new(false)), - } + }) } } @@ -187,7 +191,7 @@ impl RepoShardedProcessExecutor for BookmarkValidateProcessExecutor { &self.target_repo_name, ); loop_forever( - &self.ctx, + self.ctx.as_ref(), &self.env, self.syncers.clone(), Arc::clone(&self.cancellation_requested), @@ -220,72 +224,65 @@ impl RepoShardedProcessExecutor for BookmarkValidateProcessExecutor { } } -#[fbinit::main] -fn main(fb: FacebookInit) -> Result<(), Error> { - let process = BookmarkValidateProcess::new(fb)?; - let logger = process.matches.logger().clone(); - let matches = process.matches.clone(); - let env = matches.environment(); - - match process.matches.value_of("sharded-service-name") { - Some(service_name) => { - // The service name needs to be 'static to satisfy SM contract - static SM_SERVICE_NAME: OnceLock = OnceLock::new(); - let mut executor = ShardedProcessExecutor::new( - process.fb, - process.matches.runtime().clone(), - &logger, - SM_SERVICE_NAME.get_or_init(|| service_name.to_string()), - SM_SERVICE_SCOPE, - SM_CLEANUP_TIMEOUT_SECS, - Arc::new(process), - true, // enable shard (repo) level healing - )?; - helpers::block_execute( - executor.block_and_execute(&logger, Arc::new(AtomicBool::new(false))), - fb, - &std::env::var("TW_JOB_NAME").unwrap_or_else(|_| APP_NAME.to_string()), - matches.logger(), - &matches, - cmdlib::monitoring::AliveService, - ) - } - None => { - let runtime = matches.runtime(); - let ctx = create_core_context(fb, logger.clone()); - let config_store = matches.config_store(); - let source_repo_id = - args::not_shardmanager_compatible::get_source_repo_id(config_store, &matches)?; - let syncers = runtime.block_on(create_commit_syncers_from_matches::( - &ctx, &matches, None, - ))?; - if syncers.large_to_small.get_large_repo().repo_identity().id() != source_repo_id { - return Err(format_err!("Source repo must be a large repo!")); - } - helpers::block_execute( - loop_forever(&ctx, env, syncers, Arc::new(AtomicBool::new(false))), - fb, - APP_NAME, - &logger, - &matches, - cmdlib::monitoring::AliveService, - ) - } +async fn async_main(app: MononokeApp, ctx: CoreContext) -> Result<(), Error> { + let args: BookmarkValidatorArgs = app.args()?; + let ctx = Arc::new(ctx); + let app = Arc::new(app); + let repo_args = args.repo_args.clone(); + let runtime = app.runtime().clone(); + + if let Some(mut executor) = args.sharded_executor_args.clone().build_executor( + app.fb, + runtime.clone(), + ctx.logger(), + || Arc::new(BookmarkValidateProcess::new(ctx.clone(), app.clone())), + true, // enable shard (repo) level healing + SM_CLEANUP_TIMEOUT_SECS, + )? { + executor + .block_and_execute(ctx.logger(), Arc::new(AtomicBool::new(false))) + .await + } else { + let repo_args = repo_args + .into_source_and_target_args() + .context("Source and Target repos must be provided when running in non-sharded mode")?; + let x_repo_process_executor = + BookmarkValidateProcessExecutor::new(ctx.clone(), app, repo_args).await?; + x_repo_process_executor.execute().await } } -fn create_core_context(fb: FacebookInit, logger: Logger) -> CoreContext { +#[fbinit::main] +fn main(fb: FacebookInit) -> Result<()> { + let app: MononokeApp = MononokeAppBuilder::new(fb) + .with_app_extension(MonitoringAppExtension {}) + .build::()?; + let mut metadata = Metadata::default(); metadata.add_client_info(ClientInfo::default_with_entry_point( ClientEntryPoint::MegarepoBookmarksValidator, )); + let mut scuba = app.environment().scuba_sample_builder.clone(); + scuba.add_metadata(&metadata); + let session_container = SessionContainer::builder(fb) .metadata(Arc::new(metadata)) .build(); - let scuba_sample = MononokeScubaSampleBuilder::with_discard(); - session_container.new_context(logger, scuba_sample) + let ctx = session_container.new_context(app.logger().clone(), scuba); + + info!( + ctx.logger(), + "Starting session with id {}", + ctx.metadata().session_id(), + ); + + app.run_with_monitoring_and_logging( + |app| async_main(app, ctx.clone()), + "bookmarks_validator", + AliveService, + ) } async fn loop_forever(