diff --git a/rust/main/agents/relayer/src/lib.rs b/rust/main/agents/relayer/src/lib.rs index 9a6e1e4147..082fd0b595 100644 --- a/rust/main/agents/relayer/src/lib.rs +++ b/rust/main/agents/relayer/src/lib.rs @@ -1,5 +1,6 @@ +pub mod msg; + mod merkle_tree; -mod msg; mod processor; mod prover; mod relayer; diff --git a/rust/main/agents/relayer/src/merkle_tree/builder.rs b/rust/main/agents/relayer/src/merkle_tree/builder.rs index 876200f96a..e8b5462b51 100644 --- a/rust/main/agents/relayer/src/merkle_tree/builder.rs +++ b/rust/main/agents/relayer/src/merkle_tree/builder.rs @@ -38,6 +38,12 @@ impl Display for MerkleTreeBuilder { } } +impl Default for MerkleTreeBuilder { + fn default() -> Self { + Self::new() + } +} + /// MerkleTreeBuilder errors #[derive(Debug, thiserror::Error)] pub enum MerkleTreeBuilderError { diff --git a/rust/main/agents/relayer/src/msg/mod.rs b/rust/main/agents/relayer/src/msg/mod.rs index e47015709c..2f13832719 100644 --- a/rust/main/agents/relayer/src/msg/mod.rs +++ b/rust/main/agents/relayer/src/msg/mod.rs @@ -32,7 +32,8 @@ pub(crate) mod gas_payment; pub(crate) mod metadata; pub(crate) mod op_queue; pub(crate) mod op_submitter; -pub(crate) mod pending_message; pub(crate) mod processor; +pub mod pending_message; + pub use gas_payment::GAS_EXPENDITURE_LOG_MESSAGE; diff --git a/rust/main/agents/relayer/src/msg/op_submitter.rs b/rust/main/agents/relayer/src/msg/op_submitter.rs index 0c9b284a83..67524e80b0 100644 --- a/rust/main/agents/relayer/src/msg/op_submitter.rs +++ b/rust/main/agents/relayer/src/msg/op_submitter.rs @@ -230,14 +230,8 @@ async fn receive_task( // make sure things are getting wired up correctly; if this works in testing it // should also be valid in production. debug_assert_eq!(*op.destination_domain(), domain); - let status = op.retrieve_status_from_db().unwrap_or_else(|| { - trace!( - ?op, - "No status found for message, defaulting to FirstPrepareAttempt" - ); - PendingOperationStatus::FirstPrepareAttempt - }); - prepare_queue.push(op, Some(status)).await; + let op_status = op.status(); + prepare_queue.push(op, Some(op_status)).await; } } diff --git a/rust/main/agents/relayer/src/msg/pending_message.rs b/rust/main/agents/relayer/src/msg/pending_message.rs index 92d279856d..0d8a090c78 100644 --- a/rust/main/agents/relayer/src/msg/pending_message.rs +++ b/rust/main/agents/relayer/src/msg/pending_message.rs @@ -21,7 +21,7 @@ use hyperlane_core::{ }; use prometheus::{IntCounter, IntGauge}; use serde::Serialize; -use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument}; +use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level}; use super::{ gas_payment::{GasPaymentEnforcer, GasPolicyStatus}, @@ -36,6 +36,8 @@ pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) { Duration::from_secs(60 * 10) }; +pub const RETRIEVED_MESSAGE_LOG: &str = "Message status retrieved from db"; + /// The message context contains the links needed to submit a message. Each /// instance is for a unique origin -> destination pairing. pub struct MessageContext { @@ -510,27 +512,53 @@ impl PendingMessage { ctx: Arc, app_context: Option, ) -> Self { - let mut pm = Self::new( - message, - ctx, - // Since we don't persist the message status for now, assume it's the first attempt - PendingOperationStatus::FirstPrepareAttempt, - app_context, - ); - match pm - .ctx + // Attempt to fetch status about message from database + let message_status = match ctx.origin_db.retrieve_status_by_message_id(&message.id()) { + Ok(Some(status)) => { + // This event is used for E2E tests to ensure message statuses + // are being properly loaded from the db + tracing::event!( + if cfg!(feature = "test-utils") { + Level::DEBUG + } else { + Level::TRACE + }, + ?status, + id=?message.id(), + RETRIEVED_MESSAGE_LOG, + ); + status + } + _ => { + tracing::event!( + if cfg!(feature = "test-utils") { + Level::DEBUG + } else { + Level::TRACE + }, + "Message status not found in db" + ); + PendingOperationStatus::FirstPrepareAttempt + } + }; + + let num_retries = match ctx .origin_db - .retrieve_pending_message_retry_count_by_message_id(&pm.message.id()) + .retrieve_pending_message_retry_count_by_message_id(&message.id()) { - Ok(Some(num_retries)) => { - let next_attempt_after = PendingMessage::calculate_msg_backoff(num_retries) - .map(|dur| Instant::now() + dur); - pm.num_retries = num_retries; - pm.next_attempt_after = next_attempt_after; - } + Ok(Some(num_retries)) => num_retries, r => { - trace!(message_id = ?pm.message.id(), result = ?r, "Failed to read retry count from HyperlaneDB for message.") + trace!(message_id = ?message.id(), result = ?r, "Failed to read retry count from HyperlaneDB for message."); + 0 } + }; + + let mut pm = Self::new(message, ctx, message_status, app_context); + if num_retries > 0 { + let next_attempt_after = + PendingMessage::calculate_msg_backoff(num_retries).map(|dur| Instant::now() + dur); + pm.num_retries = num_retries; + pm.next_attempt_after = next_attempt_after; } pm } diff --git a/rust/main/utils/run-locally/src/invariants/termination_invariants.rs b/rust/main/utils/run-locally/src/invariants/termination_invariants.rs index 50450210c6..65546f6762 100644 --- a/rust/main/utils/run-locally/src/invariants/termination_invariants.rs +++ b/rust/main/utils/run-locally/src/invariants/termination_invariants.rs @@ -90,14 +90,21 @@ pub fn termination_invariants_met( const TX_ID_INDEXING_LOG_MESSAGE: &str = "Found log(s) for tx id"; let relayer_logfile = File::open(log_file_path)?; - let invariant_logs = &[ - STORING_NEW_MESSAGE_LOG_MESSAGE, - LOOKING_FOR_EVENTS_LOG_MESSAGE, - GAS_EXPENDITURE_LOG_MESSAGE, - HYPER_INCOMING_BODY_LOG_MESSAGE, - TX_ID_INDEXING_LOG_MESSAGE, + + let storing_new_msg_line_filter = vec![STORING_NEW_MESSAGE_LOG_MESSAGE]; + let looking_for_events_line_filter = vec![LOOKING_FOR_EVENTS_LOG_MESSAGE]; + let gas_expenditure_line_filter = vec![GAS_EXPENDITURE_LOG_MESSAGE]; + let hyper_incoming_body_line_filter = vec![HYPER_INCOMING_BODY_LOG_MESSAGE]; + let tx_id_indexing_line_filter = vec![TX_ID_INDEXING_LOG_MESSAGE]; + let invariant_logs = vec![ + storing_new_msg_line_filter.clone(), + looking_for_events_line_filter.clone(), + gas_expenditure_line_filter.clone(), + hyper_incoming_body_line_filter.clone(), + tx_id_indexing_line_filter.clone(), ]; let log_counts = get_matching_lines(&relayer_logfile, invariant_logs); + // Zero insertion messages don't reach `submit` stage where gas is spent, so we only expect these logs for the other messages. // TODO: Sometimes we find more logs than expected. This may either mean that gas is deducted twice for the same message due to a bug, // or that submitting the message transaction fails for some messages. Figure out which is the case and convert this check to @@ -105,23 +112,34 @@ pub fn termination_invariants_met( // EDIT: Having had a quick look, it seems like there are some legitimate reverts happening in the confirm step // (`Transaction attempting to process message either reverted or was reorged`) // in which case more gas expenditure logs than messages are expected. - let gas_expenditure_log_count = log_counts.get(GAS_EXPENDITURE_LOG_MESSAGE).unwrap(); + let gas_expenditure_log_count = *log_counts + .get(&gas_expenditure_line_filter) + .expect("Failed to get gas expenditure log count"); assert!( - gas_expenditure_log_count >= &total_messages_expected, + gas_expenditure_log_count >= total_messages_expected, "Didn't record gas payment for all delivered messages. Got {} gas payment logs, expected at least {}", gas_expenditure_log_count, total_messages_expected ); // These tests check that we fixed https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3915, where some logs would not show up + + let storing_new_msg_log_count = *log_counts + .get(&storing_new_msg_line_filter) + .expect("Failed to get storing new msg log count"); assert!( - log_counts.get(STORING_NEW_MESSAGE_LOG_MESSAGE).unwrap() > &0, + storing_new_msg_log_count > 0, "Didn't find any logs about storing messages in db" ); + let looking_for_events_log_count = *log_counts + .get(&looking_for_events_line_filter) + .expect("Failed to get looking for events log count"); assert!( - log_counts.get(LOOKING_FOR_EVENTS_LOG_MESSAGE).unwrap() > &0, + looking_for_events_log_count > 0, "Didn't find any logs about looking for events in index range" ); - let total_tx_id_log_count = log_counts.get(TX_ID_INDEXING_LOG_MESSAGE).unwrap(); + let total_tx_id_log_count = *log_counts + .get(&tx_id_indexing_line_filter) + .expect("Failed to get tx id indexing log count"); assert!( // there are 3 txid-indexed events: // - relayer: merkle insertion and gas payment @@ -129,13 +147,13 @@ pub fn termination_invariants_met( // some logs are emitted for multiple events, so requiring there to be at least // `config.kathy_messages` logs is a reasonable approximation, since all three of these events // are expected to be logged for each message. - *total_tx_id_log_count as u64 >= config.kathy_messages, + total_tx_id_log_count as u64 >= config.kathy_messages, "Didn't find as many tx id logs as expected. Found {} and expected {}", total_tx_id_log_count, config.kathy_messages ); assert!( - log_counts.get(HYPER_INCOMING_BODY_LOG_MESSAGE).is_none(), + log_counts.get(&hyper_incoming_body_line_filter).is_none(), "Verbose logs not expected at the log level set in e2e" ); diff --git a/rust/main/utils/run-locally/src/main.rs b/rust/main/utils/run-locally/src/main.rs index cdfbc0c7b2..e19fa1564d 100644 --- a/rust/main/utils/run-locally/src/main.rs +++ b/rust/main/utils/run-locally/src/main.rs @@ -33,7 +33,9 @@ use logging::log; pub use metrics::fetch_metric; use once_cell::sync::Lazy; use program::Program; -use tempfile::tempdir; +use relayer::msg::pending_message::RETRIEVED_MESSAGE_LOG; +use tempfile::{tempdir, TempDir}; +use utils::get_matching_lines; use crate::{ config::Config, @@ -179,72 +181,8 @@ fn main() -> ExitCode { .map(|i| concat_path(&rocks_db_dir, format!("validator{i}"))) .collect::>(); - let common_agent_env = Program::default() - .env("RUST_BACKTRACE", "full") - .hyp_env("LOG_FORMAT", "compact") - .hyp_env("LOG_LEVEL", "debug") - .hyp_env("CHAINS_TEST1_INDEX_CHUNK", "1") - .hyp_env("CHAINS_TEST2_INDEX_CHUNK", "1") - .hyp_env("CHAINS_TEST3_INDEX_CHUNK", "1"); - - let multicall_address_string: String = format!("0x{}", hex::encode(MULTICALL_ADDRESS)); - - let relayer_env = common_agent_env - .clone() - .bin(concat_path(AGENT_BIN_PATH, "relayer")) - .hyp_env("CHAINS_TEST1_RPCCONSENSUSTYPE", "fallback") - .hyp_env( - "CHAINS_TEST2_CONNECTION_URLS", - "http://127.0.0.1:8545,http://127.0.0.1:8545,http://127.0.0.1:8545", - ) - .hyp_env( - "CHAINS_TEST1_BATCHCONTRACTADDRESS", - multicall_address_string.clone(), - ) - .hyp_env("CHAINS_TEST1_MAXBATCHSIZE", "5") - // by setting this as a quorum provider we will cause nonce errors when delivering to test2 - // because the message will be sent to the node 3 times. - .hyp_env("CHAINS_TEST2_RPCCONSENSUSTYPE", "quorum") - .hyp_env( - "CHAINS_TEST2_BATCHCONTRACTADDRESS", - multicall_address_string.clone(), - ) - .hyp_env("CHAINS_TEST2_MAXBATCHSIZE", "5") - .hyp_env("CHAINS_TEST3_CONNECTION_URL", "http://127.0.0.1:8545") - .hyp_env( - "CHAINS_TEST3_BATCHCONTRACTADDRESS", - multicall_address_string, - ) - .hyp_env("CHAINS_TEST3_MAXBATCHSIZE", "5") - .hyp_env("METRICSPORT", RELAYER_METRICS_PORT) - .hyp_env("DB", relayer_db.to_str().unwrap()) - .hyp_env("CHAINS_TEST1_SIGNER_KEY", RELAYER_KEYS[0]) - .hyp_env("CHAINS_TEST2_SIGNER_KEY", RELAYER_KEYS[1]) - .hyp_env("CHAINS_SEALEVELTEST1_SIGNER_KEY", RELAYER_KEYS[3]) - .hyp_env("CHAINS_SEALEVELTEST2_SIGNER_KEY", RELAYER_KEYS[4]) - .hyp_env("RELAYCHAINS", "invalidchain,otherinvalid") - .hyp_env("ALLOWLOCALCHECKPOINTSYNCERS", "true") - .hyp_env( - "GASPAYMENTENFORCEMENT", - r#"[{ - "type": "minimum", - "payment": "1" - }]"#, - ) - .arg( - "chains.test1.customRpcUrls", - "http://127.0.0.1:8545,http://127.0.0.1:8545,http://127.0.0.1:8545", - ) - // default is used for TEST3 - .arg("defaultSigner.key", RELAYER_KEYS[2]); - let relayer_env = if config.sealevel_enabled { - relayer_env.arg( - "relayChains", - "test1,test2,test3,sealeveltest1,sealeveltest2", - ) - } else { - relayer_env.arg("relayChains", "test1,test2,test3") - }; + let common_agent_env = create_common_agent(); + let relayer_env = create_relayer(&config, &rocks_db_dir); let base_validator_env = common_agent_env .clone() @@ -491,17 +429,19 @@ fn main() -> ExitCode { if !post_startup_invariants(&checkpoints_dirs) { log!("Failure: Post startup invariants are not met"); - return report_test_result(true); + return report_test_result(false); } else { log!("Success: Post startup invariants are met"); } - let mut failure_occurred = false; let starting_relayer_balance: f64 = agent_balance_sum(9092).unwrap(); - while !SHUTDOWN.load(Ordering::Relaxed) { - if config.ci_mode { - // for CI we have to look for the end condition. - if termination_invariants_met( + + // wait for CI invariants to pass + let mut test_passed = wait_for_condition( + &config, + loop_start, + || { + termination_invariants_met( &config, starting_relayer_balance, solana_paths @@ -510,49 +450,225 @@ fn main() -> ExitCode { .as_deref(), solana_config_path.as_deref(), ) - .unwrap_or(false) - { - // end condition reached successfully - break; - } else if (Instant::now() - loop_start).as_secs() > config.ci_mode_timeout { - // we ran out of time - log!("CI timeout reached before queues emptied"); - failure_occurred = true; - break; - } - } - - // verify long-running tasks are still running - for (name, (child, _)) in state.agents.iter_mut() { - if let Some(status) = child.try_wait().unwrap() { - if !status.success() { - log!( - "Child process {} exited unexpectedly, with code {}. Shutting down", - name, - status.code().unwrap() - ); - failure_occurred = true; - SHUTDOWN.store(true, Ordering::Relaxed); - break; - } - } - } + }, + || !SHUTDOWN.load(Ordering::Relaxed), + || long_running_processes_exited_check(&mut state), + ); - sleep(Duration::from_secs(5)); + if !test_passed { + log!("Failure occurred during E2E"); + return report_test_result(test_passed); } + + // Here we want to restart the relayer and validate + // its restart behaviour. + restart_relayer(&config, &mut state, &rocks_db_dir); + + // give relayer a chance to fully restart. + sleep(Duration::from_secs(20)); + + let loop_start = Instant::now(); + // wait for Relayer restart invariants to pass + test_passed = wait_for_condition( + &config, + loop_start, + relayer_restart_invariants_met, + || !SHUTDOWN.load(Ordering::Relaxed), + || long_running_processes_exited_check(&mut state), + ); + // test retry request let resp = server::run_retry_request().expect("Failed to process retry request"); assert!(resp.matched > 0); - report_test_result(failure_occurred) + report_test_result(test_passed) } -fn report_test_result(failure_occurred: bool) -> ExitCode { - if failure_occurred { - log!("E2E tests failed"); - ExitCode::FAILURE +fn create_common_agent() -> Program { + Program::default() + .env("RUST_BACKTRACE", "full") + .hyp_env("LOG_FORMAT", "compact") + .hyp_env("LOG_LEVEL", "debug") + .hyp_env("CHAINS_TEST1_INDEX_CHUNK", "1") + .hyp_env("CHAINS_TEST2_INDEX_CHUNK", "1") + .hyp_env("CHAINS_TEST3_INDEX_CHUNK", "1") +} + +fn create_relayer(config: &Config, rocks_db_dir: &TempDir) -> Program { + let relayer_db = concat_path(rocks_db_dir, "relayer"); + + let common_agent_env = create_common_agent(); + + let multicall_address_string: String = format!("0x{}", hex::encode(MULTICALL_ADDRESS)); + + let relayer_env = common_agent_env + .clone() + .bin(concat_path(AGENT_BIN_PATH, "relayer")) + .hyp_env("CHAINS_TEST1_RPCCONSENSUSTYPE", "fallback") + .hyp_env( + "CHAINS_TEST2_CONNECTION_URLS", + "http://127.0.0.1:8545,http://127.0.0.1:8545,http://127.0.0.1:8545", + ) + .hyp_env( + "CHAINS_TEST1_BATCHCONTRACTADDRESS", + multicall_address_string.clone(), + ) + .hyp_env("CHAINS_TEST1_MAXBATCHSIZE", "5") + // by setting this as a quorum provider we will cause nonce errors when delivering to test2 + // because the message will be sent to the node 3 times. + .hyp_env("CHAINS_TEST2_RPCCONSENSUSTYPE", "quorum") + .hyp_env( + "CHAINS_TEST2_BATCHCONTRACTADDRESS", + multicall_address_string.clone(), + ) + .hyp_env("CHAINS_TEST2_MAXBATCHSIZE", "5") + .hyp_env("CHAINS_TEST3_CONNECTION_URL", "http://127.0.0.1:8545") + .hyp_env( + "CHAINS_TEST3_BATCHCONTRACTADDRESS", + multicall_address_string, + ) + .hyp_env("CHAINS_TEST3_MAXBATCHSIZE", "5") + .hyp_env("METRICSPORT", RELAYER_METRICS_PORT) + .hyp_env("DB", relayer_db.to_str().unwrap()) + .hyp_env("CHAINS_TEST1_SIGNER_KEY", RELAYER_KEYS[0]) + .hyp_env("CHAINS_TEST2_SIGNER_KEY", RELAYER_KEYS[1]) + .hyp_env("CHAINS_SEALEVELTEST1_SIGNER_KEY", RELAYER_KEYS[3]) + .hyp_env("CHAINS_SEALEVELTEST2_SIGNER_KEY", RELAYER_KEYS[4]) + .hyp_env("RELAYCHAINS", "invalidchain,otherinvalid") + .hyp_env("ALLOWLOCALCHECKPOINTSYNCERS", "true") + .hyp_env( + "GASPAYMENTENFORCEMENT", + r#"[{ + "type": "minimum", + "payment": "1" + }]"#, + ) + .arg( + "chains.test1.customRpcUrls", + "http://127.0.0.1:8545,http://127.0.0.1:8545,http://127.0.0.1:8545", + ) + // default is used for TEST3 + .arg("defaultSigner.key", RELAYER_KEYS[2]); + if config.sealevel_enabled { + relayer_env.arg( + "relayChains", + "test1,test2,test3,sealeveltest1,sealeveltest2", + ) } else { + relayer_env.arg("relayChains", "test1,test2,test3") + } +} + +/// Kills relayer in State and respawns the relayer again +fn restart_relayer(config: &Config, state: &mut State, rocks_db_dir: &TempDir) { + log!("Stopping relayer..."); + let (child, _) = state.agents.get_mut("RLY").expect("No relayer agent found"); + child.kill().expect("Failed to stop relayer"); + + log!("Restarting relayer..."); + let relayer_env = create_relayer(config, rocks_db_dir); + state.push_agent(relayer_env.spawn("RLY", Some(&AGENT_LOGGING_DIR))); + log!("Restarted relayer..."); +} + +/// Check relayer restart behaviour is correct. +/// So far, we only check if undelivered messages' statuses +/// are correctly retrieved from the database +fn relayer_restart_invariants_met() -> eyre::Result { + let log_file_path = AGENT_LOGGING_DIR.join("RLY-output.log"); + let relayer_logfile = File::open(log_file_path).unwrap(); + + let line_filters = vec![RETRIEVED_MESSAGE_LOG, "CouldNotFetchMetadata"]; + + log!("Checking message statuses were retrieved from logs..."); + let matched_logs = get_matching_lines(&relayer_logfile, vec![line_filters.clone()]); + + let no_metadata_message_count = *matched_logs + .get(&line_filters) + .expect("Failed to get matched message count"); + // These messages are never inserted into the merkle tree. + // So these messages will never be deliverable and will always + // be in a CouldNotFetchMetadata state. + // When the relayer restarts, these messages' statuses should be + // retrieved from the database with CouldNotFetchMetadata status. + if no_metadata_message_count < ZERO_MERKLE_INSERTION_KATHY_MESSAGES { + log!( + "No metadata message count is {}, expected {}", + no_metadata_message_count, + ZERO_MERKLE_INSERTION_KATHY_MESSAGES + ); + return Ok(false); + } + assert_eq!( + no_metadata_message_count, + ZERO_MERKLE_INSERTION_KATHY_MESSAGES + ); + Ok(true) +} + +fn wait_for_condition( + config: &Config, + start_time: Instant, + condition_fn: F1, + loop_invariant_fn: F2, + mut shutdown_criteria_fn: F3, +) -> bool +where + F1: Fn() -> eyre::Result, + F2: Fn() -> bool, + F3: FnMut() -> bool, +{ + let loop_check_interval = Duration::from_secs(5); + while loop_invariant_fn() { + sleep(loop_check_interval); + if !config.ci_mode { + continue; + } + if condition_fn().unwrap_or(false) { + // end condition reached successfully + break; + } + if check_ci_timed_out(config.ci_mode_timeout, start_time) { + // we ran out of time + log!("CI timeout reached before invariants were met"); + return false; + } + if shutdown_criteria_fn() { + SHUTDOWN.store(true, Ordering::Relaxed); + return false; + } + } + true +} + +/// check if CI has timed out based on config +fn check_ci_timed_out(timeout_secs: u64, start_time: Instant) -> bool { + (Instant::now() - start_time).as_secs() > timeout_secs +} + +/// verify long-running tasks are still running +fn long_running_processes_exited_check(state: &mut State) -> bool { + for (name, (child, _)) in state.agents.iter_mut() { + if let Some(status) = child.try_wait().unwrap() { + if !status.success() { + log!( + "Child process {} exited unexpectedly, with code {}. Shutting down", + name, + status.code().unwrap() + ); + return true; + } + } + } + false +} + +fn report_test_result(passed: bool) -> ExitCode { + if passed { log!("E2E tests passed"); ExitCode::SUCCESS + } else { + log!("E2E tests failed"); + ExitCode::FAILURE } } diff --git a/rust/main/utils/run-locally/src/utils.rs b/rust/main/utils/run-locally/src/utils.rs index 5e5dd6a126..ebb5a70245 100644 --- a/rust/main/utils/run-locally/src/utils.rs +++ b/rust/main/utils/run-locally/src/utils.rs @@ -119,15 +119,28 @@ pub fn stop_child(child: &mut Child) { }; } -pub fn get_matching_lines(file: &File, search_strings: &[&str]) -> HashMap { +/// Given a Vec>, +/// for each Vec<&str>, count how many lines in the file +/// matches all the &str in that Vec. +/// Store this count in a hashmap where the key is the vector +/// Vec<&str> +/// and return this hashmap. +pub fn get_matching_lines<'a>( + file: &File, + search_strings: Vec>, +) -> HashMap, u32> { let reader = io::BufReader::new(file); let mut matches = HashMap::new(); let mut lines = reader.lines(); while let Some(Ok(line)) = lines.next() { - search_strings.iter().for_each(|search_string| { - if line.contains(search_string) { - let count = matches.entry(search_string.to_string()).or_insert(0); + search_strings.iter().for_each(|search_string_vec| { + if search_string_vec + .iter() + .map(|search_string| line.contains(search_string)) + .all(|x| x) + { + let count = matches.entry(search_string_vec.clone()).or_insert(0); *count += 1; } });