Skip to content

Commit

Permalink
feat: attempt to fetch operation status in database upon relayer rest…
Browse files Browse the repository at this point in the history
…art (#5182)

### Description

- Add logic to retrieve message status from db before defaulting to
`PendingOperationStatus::FirstPrepareAttempt`.
- Add e2e tests to restart relayer and check message statuses were
successfully read from db

### Drive-by changes

 - add `impl Default` to MerkleTreeBuilder to satisfy `clippy`

### Related issues

- Fixes #5060

### Backward compatibility

Yes

### Testing

- Add e2e tests to restart relayer and check message statuses were
successfully read from db

---------

Co-authored-by: Daniel Savu <[email protected]>
  • Loading branch information
kamiyaa and daniel-savu authored Jan 29, 2025
1 parent 878bb88 commit f250b19
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 151 deletions.
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod msg;

mod merkle_tree;
mod msg;
mod processor;
mod prover;
mod relayer;
Expand Down
6 changes: 6 additions & 0 deletions rust/main/agents/relayer/src/merkle_tree/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ impl Display for MerkleTreeBuilder {
}
}

impl Default for MerkleTreeBuilder {
fn default() -> Self {
Self::new()
}
}

/// MerkleTreeBuilder errors
#[derive(Debug, thiserror::Error)]
pub enum MerkleTreeBuilderError {
Expand Down
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub(crate) mod gas_payment;
pub(crate) mod metadata;
pub(crate) mod op_queue;
pub(crate) mod op_submitter;
pub(crate) mod pending_message;
pub(crate) mod processor;

pub mod pending_message;

pub use gas_payment::GAS_EXPENDITURE_LOG_MESSAGE;
10 changes: 2 additions & 8 deletions rust/main/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,8 @@ async fn receive_task(
// make sure things are getting wired up correctly; if this works in testing it
// should also be valid in production.
debug_assert_eq!(*op.destination_domain(), domain);
let status = op.retrieve_status_from_db().unwrap_or_else(|| {
trace!(
?op,
"No status found for message, defaulting to FirstPrepareAttempt"
);
PendingOperationStatus::FirstPrepareAttempt
});
prepare_queue.push(op, Some(status)).await;
let op_status = op.status();
prepare_queue.push(op, Some(op_status)).await;
}
}

Expand Down
64 changes: 46 additions & 18 deletions rust/main/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hyperlane_core::{
};
use prometheus::{IntCounter, IntGauge};
use serde::Serialize;
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level};

use super::{
gas_payment::{GasPaymentEnforcer, GasPolicyStatus},
Expand All @@ -36,6 +36,8 @@ pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) {
Duration::from_secs(60 * 10)
};

pub const RETRIEVED_MESSAGE_LOG: &str = "Message status retrieved from db";

/// The message context contains the links needed to submit a message. Each
/// instance is for a unique origin -> destination pairing.
pub struct MessageContext {
Expand Down Expand Up @@ -510,27 +512,53 @@ impl PendingMessage {
ctx: Arc<MessageContext>,
app_context: Option<String>,
) -> Self {
let mut pm = Self::new(
message,
ctx,
// Since we don't persist the message status for now, assume it's the first attempt
PendingOperationStatus::FirstPrepareAttempt,
app_context,
);
match pm
.ctx
// Attempt to fetch status about message from database
let message_status = match ctx.origin_db.retrieve_status_by_message_id(&message.id()) {
Ok(Some(status)) => {
// This event is used for E2E tests to ensure message statuses
// are being properly loaded from the db
tracing::event!(
if cfg!(feature = "test-utils") {
Level::DEBUG
} else {
Level::TRACE
},
?status,
id=?message.id(),
RETRIEVED_MESSAGE_LOG,
);
status
}
_ => {
tracing::event!(
if cfg!(feature = "test-utils") {
Level::DEBUG
} else {
Level::TRACE
},
"Message status not found in db"
);
PendingOperationStatus::FirstPrepareAttempt
}
};

let num_retries = match ctx
.origin_db
.retrieve_pending_message_retry_count_by_message_id(&pm.message.id())
.retrieve_pending_message_retry_count_by_message_id(&message.id())
{
Ok(Some(num_retries)) => {
let next_attempt_after = PendingMessage::calculate_msg_backoff(num_retries)
.map(|dur| Instant::now() + dur);
pm.num_retries = num_retries;
pm.next_attempt_after = next_attempt_after;
}
Ok(Some(num_retries)) => num_retries,
r => {
trace!(message_id = ?pm.message.id(), result = ?r, "Failed to read retry count from HyperlaneDB for message.")
trace!(message_id = ?message.id(), result = ?r, "Failed to read retry count from HyperlaneDB for message.");
0
}
};

let mut pm = Self::new(message, ctx, message_status, app_context);
if num_retries > 0 {
let next_attempt_after =
PendingMessage::calculate_msg_backoff(num_retries).map(|dur| Instant::now() + dur);
pm.num_retries = num_retries;
pm.next_attempt_after = next_attempt_after;
}
pm
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,52 +90,70 @@ pub fn termination_invariants_met(
const TX_ID_INDEXING_LOG_MESSAGE: &str = "Found log(s) for tx id";

let relayer_logfile = File::open(log_file_path)?;
let invariant_logs = &[
STORING_NEW_MESSAGE_LOG_MESSAGE,
LOOKING_FOR_EVENTS_LOG_MESSAGE,
GAS_EXPENDITURE_LOG_MESSAGE,
HYPER_INCOMING_BODY_LOG_MESSAGE,
TX_ID_INDEXING_LOG_MESSAGE,

let storing_new_msg_line_filter = vec![STORING_NEW_MESSAGE_LOG_MESSAGE];
let looking_for_events_line_filter = vec![LOOKING_FOR_EVENTS_LOG_MESSAGE];
let gas_expenditure_line_filter = vec![GAS_EXPENDITURE_LOG_MESSAGE];
let hyper_incoming_body_line_filter = vec![HYPER_INCOMING_BODY_LOG_MESSAGE];
let tx_id_indexing_line_filter = vec![TX_ID_INDEXING_LOG_MESSAGE];
let invariant_logs = vec![
storing_new_msg_line_filter.clone(),
looking_for_events_line_filter.clone(),
gas_expenditure_line_filter.clone(),
hyper_incoming_body_line_filter.clone(),
tx_id_indexing_line_filter.clone(),
];
let log_counts = get_matching_lines(&relayer_logfile, invariant_logs);

// Zero insertion messages don't reach `submit` stage where gas is spent, so we only expect these logs for the other messages.
// TODO: Sometimes we find more logs than expected. This may either mean that gas is deducted twice for the same message due to a bug,
// or that submitting the message transaction fails for some messages. Figure out which is the case and convert this check to
// strict equality.
// EDIT: Having had a quick look, it seems like there are some legitimate reverts happening in the confirm step
// (`Transaction attempting to process message either reverted or was reorged`)
// in which case more gas expenditure logs than messages are expected.
let gas_expenditure_log_count = log_counts.get(GAS_EXPENDITURE_LOG_MESSAGE).unwrap();
let gas_expenditure_log_count = *log_counts
.get(&gas_expenditure_line_filter)
.expect("Failed to get gas expenditure log count");
assert!(
gas_expenditure_log_count >= &total_messages_expected,
gas_expenditure_log_count >= total_messages_expected,
"Didn't record gas payment for all delivered messages. Got {} gas payment logs, expected at least {}",
gas_expenditure_log_count,
total_messages_expected
);
// These tests check that we fixed https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3915, where some logs would not show up

let storing_new_msg_log_count = *log_counts
.get(&storing_new_msg_line_filter)
.expect("Failed to get storing new msg log count");
assert!(
log_counts.get(STORING_NEW_MESSAGE_LOG_MESSAGE).unwrap() > &0,
storing_new_msg_log_count > 0,
"Didn't find any logs about storing messages in db"
);
let looking_for_events_log_count = *log_counts
.get(&looking_for_events_line_filter)
.expect("Failed to get looking for events log count");
assert!(
log_counts.get(LOOKING_FOR_EVENTS_LOG_MESSAGE).unwrap() > &0,
looking_for_events_log_count > 0,
"Didn't find any logs about looking for events in index range"
);
let total_tx_id_log_count = log_counts.get(TX_ID_INDEXING_LOG_MESSAGE).unwrap();
let total_tx_id_log_count = *log_counts
.get(&tx_id_indexing_line_filter)
.expect("Failed to get tx id indexing log count");
assert!(
// there are 3 txid-indexed events:
// - relayer: merkle insertion and gas payment
// - scraper: gas payment
// some logs are emitted for multiple events, so requiring there to be at least
// `config.kathy_messages` logs is a reasonable approximation, since all three of these events
// are expected to be logged for each message.
*total_tx_id_log_count as u64 >= config.kathy_messages,
total_tx_id_log_count as u64 >= config.kathy_messages,
"Didn't find as many tx id logs as expected. Found {} and expected {}",
total_tx_id_log_count,
config.kathy_messages
);
assert!(
log_counts.get(HYPER_INCOMING_BODY_LOG_MESSAGE).is_none(),
log_counts.get(&hyper_incoming_body_line_filter).is_none(),
"Verbose logs not expected at the log level set in e2e"
);

Expand Down
Loading

0 comments on commit f250b19

Please sign in to comment.