Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prevent multiple block proposal evals #5453

Merged
merged 28 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3b2726e
feat: prevent multiple block proposal evals
hstove Nov 12, 2024
4178fb6
fix: comments, cleanup
hstove Nov 12, 2024
a9c7794
Merge remote-tracking branch 'origin/develop' into feat/retry-pending…
hstove Dec 18, 2024
014f44b
feat: integration test for retry pending block validations
hstove Dec 18, 2024
5384045
Merge branch 'develop' into feat/retry-pending-block-proposals
hstove Dec 18, 2024
0dc1524
fix: move logic for removing/retrying pending block responses
hstove Dec 18, 2024
5f8f9eb
Merge remote-tracking branch 'origin/feat/retry-pending-block-proposa…
hstove Dec 18, 2024
949088f
when marking block as global accepted/rejected, remove pending valida…
hstove Dec 18, 2024
90b6fb3
fix: dont remove pending validation in tests
hstove Dec 18, 2024
63ae626
Merge remote-tracking branch 'origin/develop' into feat/retry-pending…
hstove Dec 19, 2024
2e30240
fix: don't hold mutex while sleeping in test injection
hstove Dec 19, 2024
e522058
feat: use TestFlag for validation delay
hstove Dec 19, 2024
b3f9c35
Merge branch 'develop' into feat/retry-pending-block-proposals
hstove Dec 20, 2024
cf345bb
fix: bump sister block timeout
hstove Dec 20, 2024
77ef010
fix: bump timeout in locally_rejected_blocks_overridden_by_global_acc…
hstove Dec 21, 2024
0c90997
Merge remote-tracking branch 'origin/develop' into feat/retry-pending…
hstove Jan 8, 2025
ae7c822
fix: delete and return pending row in one statement
hstove Jan 8, 2025
5f020df
Merge branch 'develop' into feat/retry-pending-block-proposals
hstove Jan 9, 2025
d667a4e
chore: add explicit ASC order in index
hstove Jan 13, 2025
91c38d0
Merge remote-tracking branch 'origin/develop' into feat/retry-pending…
hstove Jan 13, 2025
b897380
fix: remove unused import post-merge
hstove Jan 13, 2025
8a072d1
fix: crc feedback
hstove Jan 13, 2025
2fd4e78
feat: test for handling pending block proposal at tenure change
hstove Jan 13, 2025
fe20e24
crc: comment around `mark_block_globally_accepted`
hstove Jan 13, 2025
beb6fba
crc: add test to bitcoin-tests
hstove Jan 14, 2025
3123738
crc: remove unused function
hstove Jan 14, 2025
d22b6e0
Merge branch 'develop' into feat/retry-pending-block-proposals
hstove Jan 14, 2025
4d44a6d
chore: changelog
hstove Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions libsigner/src/v0/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,14 @@ impl BlockResponse {
) -> Self {
Self::Rejected(BlockRejection::new(hash, reject_code, private_key, mainnet))
}

/// The signer signature hash for the block response
pub fn signer_signature_hash(&self) -> Sha512Trunc256Sum {
match self {
BlockResponse::Accepted(accepted) => accepted.signer_signature_hash,
BlockResponse::Rejected(rejection) => rejection.signer_signature_hash,
}
}
}

impl StacksMessageCodec for BlockResponse {
Expand Down
78 changes: 75 additions & 3 deletions stacks-signer/src/signerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ static CREATE_INDEXES_3: &str = r#"
CREATE INDEX IF NOT EXISTS block_rejection_signer_addrs_on_block_signature_hash ON block_rejection_signer_addrs(signer_signature_hash);
"#;

static CREATE_INDEXES_4: &str = r#"
CREATE INDEX IF NOT EXISTS block_validations_pending_on_added_time ON block_validations_pending(added_time);
"#;

static CREATE_SIGNER_STATE_TABLE: &str = "
CREATE TABLE IF NOT EXISTS signer_states (
reward_cycle INTEGER PRIMARY KEY,
Expand Down Expand Up @@ -369,6 +373,14 @@ CREATE TABLE IF NOT EXISTS block_rejection_signer_addrs (
PRIMARY KEY (signer_addr)
) STRICT;"#;

static CREATE_BLOCK_VALIDATION_PENDING_TABLE: &str = r#"
CREATE TABLE IF NOT EXISTS block_validations_pending (
signer_signature_hash TEXT NOT NULL,
-- the time at which the block was added to the pending table
added_time INTEGER NOT NULL,
PRIMARY KEY (signer_signature_hash)
) STRICT;"#;

static SCHEMA_1: &[&str] = &[
DROP_SCHEMA_0,
CREATE_DB_CONFIG,
Expand Down Expand Up @@ -405,9 +417,15 @@ static SCHEMA_3: &[&str] = &[
"INSERT INTO db_config (version) VALUES (3);",
];

static SCHEMA_4: &[&str] = &[
CREATE_BLOCK_VALIDATION_PENDING_TABLE,
CREATE_INDEXES_4,
"INSERT OR REPLACE INTO db_config (version) VALUES (4);",
];

impl SignerDb {
/// The current schema version used in this build of the signer binary.
pub const SCHEMA_VERSION: u32 = 3;
pub const SCHEMA_VERSION: u32 = 4;

/// Create a new `SignerState` instance.
/// This will create a new SQLite database at the given path
Expand All @@ -427,7 +445,7 @@ impl SignerDb {
return Ok(0);
}
let result = conn
.query_row("SELECT version FROM db_config LIMIT 1", [], |row| {
.query_row("SELECT MAX(version) FROM db_config LIMIT 1", [], |row| {
row.get(0)
})
.optional();
Expand Down Expand Up @@ -479,6 +497,20 @@ impl SignerDb {
Ok(())
}

/// Migrate from schema 3 to schema 4
fn schema_4_migration(tx: &Transaction) -> Result<(), DBError> {
if Self::get_schema_version(tx)? >= 4 {
// no migration necessary
return Ok(());
}

for statement in SCHEMA_4.iter() {
tx.execute_batch(statement)?;
}

Ok(())
}

/// Either instantiate a new database, or migrate an existing one
/// If the detected version of the existing database is 0 (i.e., a pre-migration
/// logic DB, the DB will be dropped).
Expand All @@ -490,7 +522,8 @@ impl SignerDb {
0 => Self::schema_1_migration(&sql_tx)?,
1 => Self::schema_2_migration(&sql_tx)?,
2 => Self::schema_3_migration(&sql_tx)?,
3 => break,
3 => Self::schema_4_migration(&sql_tx)?,
4 => break,
x => return Err(DBError::Other(format!(
"Database schema is newer than supported by this binary. Expected version = {}, Database version = {x}",
Self::SCHEMA_VERSION,
Expand Down Expand Up @@ -809,6 +842,45 @@ impl SignerDb {
BlockState::try_from(state.as_str()).map_err(|_| DBError::Corruption)?,
))
}

/// Get a pending block validation, sorted by the time at which it was added to the pending table.
/// If found, remove it from the pending table.
pub fn get_pending_block_validation(&self) -> Result<Option<Sha512Trunc256Sum>, DBError> {
hstove marked this conversation as resolved.
Show resolved Hide resolved
let qry =
"SELECT signer_signature_hash FROM block_validations_pending ORDER BY added_time ASC";
let sighash_opt: Option<String> = query_row(&self.db, qry, params![])?;
if let Some(sighash) = sighash_opt {
let sighash = Sha512Trunc256Sum::from_hex(&sighash).map_err(|_| DBError::Corruption)?;
self.remove_pending_block_validation(&sighash)?;
return Ok(Some(sighash));
}
Ok(None)
}

/// Remove a pending block validation
pub fn remove_pending_block_validation(
&self,
sighash: &Sha512Trunc256Sum,
) -> Result<(), DBError> {
self.db.execute(
"DELETE FROM block_validations_pending WHERE signer_signature_hash = ?1",
params![sighash.to_string()],
)?;
Ok(())
}

/// Insert a pending block validation
pub fn insert_pending_block_validation(
&self,
sighash: &Sha512Trunc256Sum,
ts: u64,
) -> Result<(), DBError> {
self.db.execute(
"INSERT INTO block_validations_pending (signer_signature_hash, added_time) VALUES (?1, ?2)",
params![sighash.to_string(), u64_to_sql(ts)?],
)?;
Ok(())
}
}

fn try_deserialize<T>(s: Option<String>) -> Result<Option<T>, DBError>
Expand Down
111 changes: 80 additions & 31 deletions stacks-signer/src/v0/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::{Duration, Instant};

use blockstack_lib::chainstate::nakamoto::{NakamotoBlock, NakamotoBlockHeader};
use blockstack_lib::net::api::postblock_proposal::{
BlockValidateOk, BlockValidateReject, BlockValidateResponse,
BlockValidateOk, BlockValidateReject, BlockValidateResponse, TOO_MANY_REQUESTS_STATUS,
};
use clarity::types::chainstate::StacksPrivateKey;
use clarity::types::{PrivateKey, StacksEpochId};
Expand All @@ -33,11 +33,12 @@ use libsigner::{BlockProposal, SignerEvent};
use slog::{slog_debug, slog_error, slog_info, slog_warn};
use stacks_common::types::chainstate::StacksAddress;
use stacks_common::util::get_epoch_time_secs;
use stacks_common::util::hash::Sha512Trunc256Sum;
use stacks_common::util::secp256k1::MessageSignature;
use stacks_common::{debug, error, info, warn};

use crate::chainstate::{ProposalEvalConfig, SortitionsView};
use crate::client::{SignerSlotID, StackerDB, StacksClient};
use crate::client::{ClientError, SignerSlotID, StackerDB, StacksClient};
use crate::config::SignerConfig;
use crate::runloop::SignerResult;
use crate::signerdb::{BlockInfo, BlockState, SignerDb};
Expand Down Expand Up @@ -90,7 +91,7 @@ pub struct Signer {
/// marking a submitted block as invalid
pub block_proposal_validation_timeout: Duration,
/// The current submitted block proposal and its submission time
pub submitted_block_proposal: Option<(BlockProposal, Instant)>,
pub submitted_block_proposal: Option<(Sha512Trunc256Sum, Instant)>,
hstove marked this conversation as resolved.
Show resolved Hide resolved
}

impl std::fmt::Display for Signer {
Expand Down Expand Up @@ -476,15 +477,8 @@ impl Signer {
"block_height" => block_proposal.block.header.chain_length,
"burn_height" => block_proposal.burn_height,
);
match stacks_client.submit_block_for_validation(block_info.block.clone()) {
Ok(_) => {
self.submitted_block_proposal =
Some((block_proposal.clone(), Instant::now()));
}
Err(e) => {
warn!("{self}: Failed to submit block for validation: {e:?}");
}
};

self.submit_block_for_validation(stacks_client, &block_proposal.block);
} else {
// Still store the block but log we can't submit it for validation. We may receive enough signatures/rejections
// from other signers to push the proposed block into a global rejection/acceptance regardless of our participation.
Expand Down Expand Up @@ -513,8 +507,39 @@ impl Signer {
BlockResponse::Rejected(block_rejection) => {
self.handle_block_rejection(block_rejection);
}
};

// Remove this block validation from the pending table
let signer_sig_hash = block_response.signer_signature_hash();
self.signer_db
.remove_pending_block_validation(&signer_sig_hash)
hstove marked this conversation as resolved.
Show resolved Hide resolved
.unwrap_or_else(|e| warn!("{self}: Failed to remove pending block validation: {e:?}"));

// Check if there is a pending block validation that we need to submit to the node
match self.signer_db.get_pending_block_validation() {
Ok(Some(signer_sig_hash)) => {
info!("{self}: Found a pending block validation: {signer_sig_hash:?}");
match self
.signer_db
.block_lookup(self.reward_cycle, &signer_sig_hash)
{
Ok(Some(block_info)) => {
self.submit_block_for_validation(stacks_client, &block_info.block);
}
Ok(None) => {
// This should never happen
error!(
"{self}: Pending block validation not found in DB: {signer_sig_hash:?}"
);
}
Err(e) => error!("{self}: Failed to get block info: {e:?}"),
}
}
Ok(None) => {}
Err(e) => warn!("{self}: Failed to get pending block validation: {e:?}"),
}
}

/// Handle the block validate ok response. Returns our block response if we have one
fn handle_block_validate_ok(
&mut self,
Expand All @@ -525,10 +550,7 @@ impl Signer {
let signer_signature_hash = block_validate_ok.signer_signature_hash;
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| {
proposal.block.header.signer_signature_hash() == signer_signature_hash
})
.map(|(proposal_hash, _)| proposal_hash == signer_signature_hash)
.unwrap_or(false)
{
self.submitted_block_proposal = None;
Expand Down Expand Up @@ -584,10 +606,7 @@ impl Signer {
let signer_signature_hash = block_validate_reject.signer_signature_hash;
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| {
proposal.block.header.signer_signature_hash() == signer_signature_hash
})
.map(|(proposal_hash, _)| proposal_hash == signer_signature_hash)
.unwrap_or(false)
{
self.submitted_block_proposal = None;
Expand Down Expand Up @@ -670,20 +689,21 @@ impl Signer {
/// Check the current tracked submitted block proposal to see if it has timed out.
/// Broadcasts a rejection and marks the block locally rejected if it has.
fn check_submitted_block_proposal(&mut self) {
let Some((block_proposal, block_submission)) = self.submitted_block_proposal.take() else {
let Some((proposal_signer_sighash, block_submission)) =
self.submitted_block_proposal.take()
else {
// Nothing to check.
return;
};
if block_submission.elapsed() < self.block_proposal_validation_timeout {
// Not expired yet. Put it back!
self.submitted_block_proposal = Some((block_proposal, block_submission));
self.submitted_block_proposal = Some((proposal_signer_sighash, block_submission));
return;
}
let signature_sighash = block_proposal.block.header.signer_signature_hash();
// For mutability reasons, we need to take the block_info out of the map and add it back after processing
let mut block_info = match self
.signer_db
.block_lookup(self.reward_cycle, &signature_sighash)
.block_lookup(self.reward_cycle, &proposal_signer_sighash)
{
Ok(Some(block_info)) => {
if block_info.state == BlockState::GloballyRejected
Expand All @@ -698,8 +718,7 @@ impl Signer {
// This is weird. If this is reached, its probably an error in code logic or the db was flushed.
// Why are we tracking a block submission for a block we have never seen / stored before.
error!("{self}: tracking an unknown block validation submission.";
"signer_sighash" => %signature_sighash,
"block_id" => %block_proposal.block.block_id(),
"signer_sighash" => %proposal_signer_sighash,
);
return;
}
Expand All @@ -712,11 +731,10 @@ impl Signer {
// Reject it so we aren't holding up the network because of our inaction.
warn!(
"{self}: Failed to receive block validation response within {} ms. Rejecting block.", self.block_proposal_validation_timeout.as_millis();
"signer_sighash" => %signature_sighash,
"block_id" => %block_proposal.block.block_id(),
"signer_sighash" => %proposal_signer_sighash,
);
let rejection = BlockResponse::rejected(
block_proposal.block.header.signer_signature_hash(),
proposal_signer_sighash,
RejectCode::ConnectivityIssues,
&self.private_key,
self.mainnet,
Expand Down Expand Up @@ -851,7 +869,7 @@ impl Signer {
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| &proposal.block.header.signer_signature_hash() == block_hash)
.map(|(proposal_signer_sighash, _)| proposal_signer_sighash == block_hash)
.unwrap_or(false)
{
// Consensus reached! No longer bother tracking its validation submission to the node as we are too late to participate in the decision anyway.
Expand Down Expand Up @@ -1002,7 +1020,7 @@ impl Signer {
if self
.submitted_block_proposal
.as_ref()
.map(|(proposal, _)| &proposal.block.header.signer_signature_hash() == block_hash)
.map(|(proposal_hash, _)| proposal_hash == block_hash)
.unwrap_or(false)
{
// Consensus reached! No longer bother tracking its validation submission to the node as we are too late to participate in the decision anyway.
Expand Down Expand Up @@ -1046,6 +1064,37 @@ impl Signer {
}
}

/// Submit a block for validation, and mark it as pending if the node
/// is busy with a previous request.
fn submit_block_for_validation(&mut self, stacks_client: &StacksClient, block: &NakamotoBlock) {
let signer_signature_hash = block.header.signer_signature_hash();
match stacks_client.submit_block_for_validation(block.clone()) {
Ok(_) => {
self.submitted_block_proposal = Some((signer_signature_hash, Instant::now()));
}
Err(ClientError::RequestFailure(status)) => {
if status.as_u16() == TOO_MANY_REQUESTS_STATUS {
info!("{self}: Received 429 from stacks node for block validation request. Inserting pending block validation...";
"signer_signature_hash" => %signer_signature_hash,
);
self.signer_db
.insert_pending_block_validation(
&signer_signature_hash,
get_epoch_time_secs(),
)
.unwrap_or_else(|e| {
warn!("{self}: Failed to insert pending block validation: {e:?}")
});
} else {
warn!("{self}: Received non-429 status from stacks node: {status}");
}
}
Err(e) => {
warn!("{self}: Failed to submit block for validation: {e:?}");
}
}
}

#[cfg(any(test, feature = "testing"))]
fn test_skip_block_broadcast(&self, block: &NakamotoBlock) -> bool {
if *TEST_SKIP_BLOCK_BROADCAST.lock().unwrap() == Some(true) {
Expand Down
6 changes: 4 additions & 2 deletions stackslib/src/net/api/postblock_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ define_u8_enum![ValidateRejectCode {
NoSuchTenure = 6
}];

pub static TOO_MANY_REQUESTS_STATUS: u16 = 429;

impl TryFrom<u8> for ValidateRejectCode {
type Error = CodecError;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Expand Down Expand Up @@ -687,7 +689,7 @@ impl RPCRequestHandler for RPCBlockProposalRequestHandler {
let res = node.with_node_state(|network, sortdb, chainstate, _mempool, rpc_args| {
if network.is_proposal_thread_running() {
return Err((
429,
TOO_MANY_REQUESTS_STATUS,
NetError::SendError("Proposal currently being evaluated".into()),
));
}
Expand All @@ -708,7 +710,7 @@ impl RPCRequestHandler for RPCBlockProposalRequestHandler {
.spawn_validation_thread(sortdb, chainstate, receiver)
.map_err(|_e| {
(
429,
TOO_MANY_REQUESTS_STATUS,
NetError::SendError(
"IO error while spawning proposal callback thread".into(),
),
Expand Down
2 changes: 1 addition & 1 deletion testnet/stacks-node/src/tests/epoch_25.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use stacks_common::types::chainstate::StacksPrivateKey;

use crate::config::InitialBalance;
use crate::tests::bitcoin_regtest::BitcoinCoreController;
use crate::tests::nakamoto_integrations::{next_block_and, wait_for};
use crate::tests::nakamoto_integrations::wait_for;
use crate::tests::neon_integrations::{
get_account, get_chain_info, neon_integration_test_conf, next_block_and_wait, submit_tx,
test_observer, wait_for_runloop,
Expand Down