Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(optimistic_block): pass optimistic block to producers over T1 #12888

Merged
merged 12 commits into from
Feb 13, 2025
3 changes: 2 additions & 1 deletion chain/client/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub fn client_sender_for_network(
announce_account: view_client_addr.into_sender(),
chunk_endorsement: client_addr.clone().into_sender(),
epoch_sync_request: client_addr.clone().into_sender(),
epoch_sync_response: client_addr.into_sender(),
epoch_sync_response: client_addr.clone().into_sender(),
optimistic_block_receiver: client_addr.into_sender(),
}
}
6 changes: 4 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,11 @@ impl Client {
res
}

#[allow(unused)]
pub fn receive_optimistic_block(&mut self, block: OptimisticBlock) {
/// Check optimistic block and start processing if is valid.
pub fn receive_optimistic_block(&mut self, block: OptimisticBlock, _peer_id: PeerId) {
let _span = debug_span!(target: "client", "receive_optimistic_block").entered();
// TODO(#10584): Validate the optimistic block.
// TODO(#10584): Discard the block if it is not from the the block producer.
self.chain.optimistic_block_chunks.add_block(block);
self.maybe_process_optimistic_block();
}
Expand Down
19 changes: 14 additions & 5 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ use near_client_primitives::types::{
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_network::client::{
BlockApproval, BlockHeadersResponse, BlockResponse, ChunkEndorsementMessage, ProcessTxRequest,
ProcessTxResponse, RecvChallenge, SetNetworkInfo, StateResponseReceived,
BlockApproval, BlockHeadersResponse, BlockResponse, ChunkEndorsementMessage,
OptimisticBlockMessage, ProcessTxRequest, ProcessTxResponse, RecvChallenge, SetNetworkInfo,
StateResponseReceived,
};
use near_network::types::ReasonForBan;
use near_network::types::{
Expand Down Expand Up @@ -503,6 +504,15 @@ impl Handler<ProcessTxRequest> for ClientActorInner {
}
}

impl Handler<OptimisticBlockMessage> for ClientActorInner {
fn handle(&mut self, msg: OptimisticBlockMessage) {
let OptimisticBlockMessage { optimistic_block, from_peer } = msg;
debug!(target: "client", block_height = optimistic_block.inner.block_height, prev_block_hash = ?optimistic_block.inner.prev_block_hash, ?from_peer, "OptimisticBlockMessage");

self.client.receive_optimistic_block(optimistic_block, from_peer);
}
}

impl Handler<BlockResponse> for ClientActorInner {
fn handle(&mut self, msg: BlockResponse) {
let BlockResponse { block, peer_id, was_requested } = msg;
Expand Down Expand Up @@ -1385,11 +1395,10 @@ impl ClientActorInner {
return Ok(());
};

/* TODO(#10584): If we produced the optimistic block, send it out before we save it.
// If we produced the optimistic block, send it out before we save it.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::OptimisticBlock { optimistic_block: block.clone() },
NetworkRequests::OptimisticBlock { optimistic_block: optimistic_block.clone() },
));
*/

// We’ve produced the optimistic block, mark it as done so we don't produce it again.
self.client.save_optimistic_block(&optimistic_block);
Expand Down
16 changes: 14 additions & 2 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManagerAdapter;
use near_network::client::{
AnnounceAccountRequest, BlockApproval, BlockHeadersRequest, BlockHeadersResponse, BlockRequest,
BlockResponse, ChunkEndorsementMessage, SetNetworkInfo, StateRequestHeader, StateRequestPart,
StateResponseReceived,
BlockResponse, ChunkEndorsementMessage, OptimisticBlockMessage, SetNetworkInfo,
StateRequestHeader, StateRequestPart, StateResponseReceived,
};
use near_network::shards_manager::ShardsManagerRequestFromNetwork;
use near_network::state_witness::{
Expand Down Expand Up @@ -511,6 +511,18 @@ fn process_peer_manager_message_default(

hash_to_height.write().unwrap().insert(*block.header().hash(), block.header().height());
}
NetworkRequests::OptimisticBlock { optimistic_block } => {
// TODO(#10584): maybe go through an adapter to facilitate testing.
for actor_handles in connectors {
actor_handles.client_actor.do_send(
OptimisticBlockMessage {
optimistic_block: optimistic_block.clone(),
from_peer: PeerInfo::random().id,
}
.with_span_context(),
);
}
}
NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => {
send_chunks(
connectors,
Expand Down
9 changes: 9 additions & 0 deletions chain/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, EpochId, ShardId};
Expand Down Expand Up @@ -138,6 +139,13 @@ pub struct EpochSyncResponseMessage {
pub proof: CompressedEpochSyncProof,
}

#[derive(actix::Message, Debug, Clone, PartialEq, Eq)]
#[rtype(result = "()")]
pub struct OptimisticBlockMessage {
pub optimistic_block: OptimisticBlock,
pub from_peer: PeerId,
}

#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
#[multi_send_message_derive(Debug)]
#[multi_send_input_derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -160,4 +168,5 @@ pub struct ClientSenderForNetwork {
pub chunk_endorsement: AsyncSender<ChunkEndorsementMessage, ()>,
pub epoch_sync_request: Sender<EpochSyncRequestMessage>,
pub epoch_sync_response: Sender<EpochSyncResponseMessage>,
pub optimistic_block_receiver: Sender<OptimisticBlockMessage>,
}
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/borsh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use near_primitives::block::{Block, BlockHeader, GenesisId};
use near_primitives::challenge::Challenge;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::ShardId;
use near_schema_checker_lib::ProtocolSchema;
Expand Down Expand Up @@ -170,6 +171,7 @@ pub(super) enum PeerMessage {

EpochSyncRequest,
EpochSyncResponse(CompressedEpochSyncProof),
OptimisticBlock(OptimisticBlock),
}
#[cfg(target_arch = "x86_64")] // Non-x86_64 doesn't match this requirement yet but it's not bad as it's not production-ready
const _: () = assert!(std::mem::size_of::<PeerMessage>() <= 1500, "PeerMessage > 1500 bytes");
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/borsh_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl TryFrom<&net::PeerMessage> for mem::PeerMessage {
net::PeerMessage::BlockHeaders(bhs) => mem::PeerMessage::BlockHeaders(bhs),
net::PeerMessage::BlockRequest(bh) => mem::PeerMessage::BlockRequest(bh),
net::PeerMessage::Block(b) => mem::PeerMessage::Block(b),
net::PeerMessage::OptimisticBlock(ob) => mem::PeerMessage::OptimisticBlock(ob),
net::PeerMessage::Transaction(t) => mem::PeerMessage::Transaction(t),
net::PeerMessage::Routed(r) => mem::PeerMessage::Routed(Box::new(RoutedMessageV2 {
msg: *r,
Expand Down Expand Up @@ -246,6 +247,7 @@ impl From<&mem::PeerMessage> for net::PeerMessage {
mem::PeerMessage::BlockHeaders(bhs) => net::PeerMessage::BlockHeaders(bhs),
mem::PeerMessage::BlockRequest(bh) => net::PeerMessage::BlockRequest(bh),
mem::PeerMessage::Block(b) => net::PeerMessage::Block(b),
mem::PeerMessage::OptimisticBlock(ob) => net::PeerMessage::OptimisticBlock(ob),
mem::PeerMessage::Transaction(t) => net::PeerMessage::Transaction(t),
mem::PeerMessage::Routed(r) => net::PeerMessage::Routed(Box::new(r.msg.clone())),
mem::PeerMessage::Disconnect(_) => net::PeerMessage::Disconnect,
Expand Down
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::combine_hash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::sharding::{
ChunkHash, PartialEncodedChunk, PartialEncodedChunkPart, ReceiptProof, ShardChunkHeader,
};
Expand Down Expand Up @@ -433,6 +434,7 @@ pub enum PeerMessage {

BlockRequest(CryptoHash),
Block(Block),
OptimisticBlock(OptimisticBlock),

Transaction(SignedTransaction),
Routed(Box<RoutedMessageV2>),
Expand Down
8 changes: 8 additions & 0 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ message EpochSyncResponse {
bytes compressed_proof = 1;
}

message OptimisticBlock {
bytes inner = 1;
Signature signature = 2;
CryptoHash hash = 3;
}

// PeerMessage is a wrapper of all message types exchanged between NEAR nodes.
// The wire format of a single message M consists of len(M)+4 bytes:
// <len(M)> : 4 bytes : little endian uint32
Expand Down Expand Up @@ -508,5 +514,7 @@ message PeerMessage {

EpochSyncRequest epoch_sync_request = 34;
EpochSyncResponse epoch_sync_response = 35;

OptimisticBlock optimistic_block = 36;
}
}
41 changes: 41 additions & 0 deletions chain/network/src/network_protocol/proto_conv/peer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use borsh::BorshDeserialize as _;
use near_async::time::error::ComponentRange;
use near_primitives::block::{Block, BlockHeader};
use near_primitives::challenge::Challenge;
use near_primitives::optimistic_block::{OptimisticBlock, OptimisticBlockInner};
use near_primitives::transaction::SignedTransaction;
use near_primitives::utils::compression::CompressedData;
use protobuf::MessageField as MF;
Expand Down Expand Up @@ -202,6 +203,40 @@ impl TryFrom<&proto::SnapshotHostInfo> for SnapshotHostInfo {

//////////////////////////////////////////

#[derive(thiserror::Error, Debug)]
pub enum ParseOptimisticBlockError {
#[error("inner")]
Inner(std::io::Error),
#[error("sync_hash {0}")]
Hash(ParseRequiredError<ParseCryptoHashError>),
#[error("signature {0}")]
Signature(ParseRequiredError<ParseSignatureError>),
}

impl From<&OptimisticBlock> for proto::OptimisticBlock {
fn from(ob: &OptimisticBlock) -> Self {
Self {
inner: borsh::to_vec(&ob.inner).unwrap(),
signature: MF::some((&ob.signature).into()),
hash: MF::some((&ob.hash).into()),
..Default::default()
}
}
}

impl TryFrom<&proto::OptimisticBlock> for OptimisticBlock {
type Error = ParseOptimisticBlockError;
fn try_from(p_ob: &proto::OptimisticBlock) -> Result<Self, Self::Error> {
Ok(Self {
inner: OptimisticBlockInner::try_from_slice(&p_ob.inner).map_err(Self::Error::Inner)?,
signature: try_from_required(&p_ob.signature).map_err(Self::Error::Signature)?,
hash: try_from_required(&p_ob.hash).map_err(Self::Error::Hash)?,
})
}
}

//////////////////////////////////////////

#[derive(thiserror::Error, Debug)]
pub enum ParseSyncSnapshotHostsError {
#[error("hosts {0}")]
Expand Down Expand Up @@ -293,6 +328,7 @@ impl From<&PeerMessage> for proto::PeerMessage {
block: MF::some(b.into()),
..Default::default()
}),
PeerMessage::OptimisticBlock(ob) => ProtoMT::OptimisticBlock(ob.into()),
PeerMessage::Transaction(t) => ProtoMT::Transaction(proto::SignedTransaction {
borsh: borsh::to_vec(&t).unwrap(),
..Default::default()
Expand Down Expand Up @@ -397,6 +433,8 @@ pub enum ParsePeerMessageError {
StateResponse(ParseRequiredError<ParseStateInfoError>),
#[error("sync_snapshot_hosts: {0}")]
SyncSnapshotHosts(ParseSyncSnapshotHostsError),
#[error("optimistic_block: {0}")]
OptimisticBlock(ParseOptimisticBlockError),
}

impl TryFrom<&proto::PeerMessage> for PeerMessage {
Expand Down Expand Up @@ -467,6 +505,9 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage {
ProtoMT::BlockResponse(br) => PeerMessage::Block(
try_from_required(&br.block).map_err(Self::Error::BlockResponse)?,
),
ProtoMT::OptimisticBlock(ob) => {
PeerMessage::OptimisticBlock(ob.try_into().map_err(Self::Error::OptimisticBlock)?)
}
ProtoMT::Transaction(t) => PeerMessage::Transaction(
SignedTransaction::try_from_slice(&t.borsh).map_err(Self::Error::Transaction)?,
),
Expand Down
10 changes: 8 additions & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::accounts_data::AccountDataError;
use crate::client::{
AnnounceAccountRequest, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, BlockResponse,
EpochSyncRequestMessage, EpochSyncResponseMessage, ProcessTxRequest, RecvChallenge,
StateRequestHeader, StateRequestPart, StateResponseReceived,
EpochSyncRequestMessage, EpochSyncResponseMessage, OptimisticBlockMessage, ProcessTxRequest,
RecvChallenge, StateRequestHeader, StateRequestPart, StateResponseReceived,
};
use crate::concurrency::atomic_cell::AtomicCell;
use crate::concurrency::demux;
Expand Down Expand Up @@ -1127,6 +1127,12 @@ impl PeerActor {
.send(EpochSyncResponseMessage { from_peer: peer_id, proof });
None
}
PeerMessage::OptimisticBlock(ob) => {
network_state
.client
.send(OptimisticBlockMessage { from_peer: peer_id, optimistic_block: ob });
None
}
msg => {
tracing::error!(target: "network", "Peer received unexpected type: {:?}", msg);
None
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer_manager/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl tcp::Tier {
PeerMessage::VersionedStateResponse(_) => {
self == tcp::Tier::T2 || self == tcp::Tier::T3
}
PeerMessage::OptimisticBlock(..) => true,
PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body),
PeerMessage::SyncRoutingTable(..)
| PeerMessage::DistanceVector(..)
Expand Down
8 changes: 8 additions & 0 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,14 @@ impl PeerManagerActor {
self.state.tier2.broadcast_message(Arc::new(PeerMessage::Block(block)));
NetworkResponses::NoResponse
}
NetworkRequests::OptimisticBlock { optimistic_block } => {
// TODO(#10584): send this message to all the producers.
// Maybe we just need to send this to the next producers.
Comment on lines +787 to +788
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't tier1 the same as "all validators"? Yeah we need to send it to all chunk producers in this epoch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question here was between thw whole of T1 or just next N blocks worth of producers. 99 vs 8(shards) * N(blocks). But we can go with all for now.

self.state
.tier1
.broadcast_message(Arc::new(PeerMessage::OptimisticBlock(optimistic_block)));
NetworkResponses::NoResponse
}
NetworkRequests::Approval { approval_message } => {
self.state.send_message_to_account(
&self.clock,
Expand Down
2 changes: 2 additions & 0 deletions chain/network/src/rate_limits/messages_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub enum RateLimitedPeerMessageKey {
ContractCodeResponse,
PartialEncodedContractDeploys,
EpochSyncRequest,
OptimisticBlock,
}

/// Given a `PeerMessage` returns a tuple containing the `RateLimitedPeerMessageKey`
Expand All @@ -206,6 +207,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa
PeerMessage::BlockHeaders(_) => Some((BlockHeaders, 1)),
PeerMessage::BlockRequest(_) => Some((BlockRequest, 1)),
PeerMessage::Block(_) => Some((Block, 1)),
PeerMessage::OptimisticBlock(_) => Some((OptimisticBlock, 1)),
PeerMessage::Transaction(_) => Some((Transaction, 1)),
PeerMessage::Routed(msg) => match msg.body {
RoutedMessageBody::BlockApproval(_) => Some((BlockApproval, 1)),
Expand Down
19 changes: 17 additions & 2 deletions chain/network/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::{Arc, Mutex};

use crate::client::{
BlockApproval, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, BlockResponse,
ChunkEndorsementMessage, EpochSyncRequestMessage, EpochSyncResponseMessage, ProcessTxRequest,
ProcessTxResponse,
ChunkEndorsementMessage, EpochSyncRequestMessage, EpochSyncResponseMessage,
OptimisticBlockMessage, ProcessTxRequest, ProcessTxResponse,
};
use crate::shards_manager::ShardsManagerRequestFromNetwork;
use crate::state_witness::{
Expand Down Expand Up @@ -37,6 +37,7 @@ pub struct ClientSenderForTestLoopNetwork {
pub chunk_endorsement: AsyncSender<ChunkEndorsementMessage, ()>,
pub epoch_sync_request: Sender<EpochSyncRequestMessage>,
pub epoch_sync_response: Sender<EpochSyncResponseMessage>,
pub optimistic_block_receiver: Sender<OptimisticBlockMessage>,
}

#[derive(Clone, MultiSend, MultiSenderFrom)]
Expand Down Expand Up @@ -242,6 +243,20 @@ fn network_message_to_client_handler(
}
None
}
NetworkRequests::OptimisticBlock { optimistic_block } => {
let my_peer_id = shared_state.account_to_peer_id.get(&my_account_id).unwrap();
for account_id in shared_state.accounts() {
if account_id != &my_account_id {
let _ = shared_state.senders_for_account(account_id).client_sender.send(
OptimisticBlockMessage {
optimistic_block: optimistic_block.clone(),
from_peer: my_peer_id.clone(),
},
);
}
}
None
}
NetworkRequests::Approval { approval_message } => {
assert_ne!(
approval_message.target, my_account_id,
Expand Down
3 changes: 3 additions & 0 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use near_primitives::challenge::Challenge;
use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::sharding::PartialEncodedChunkWithArcReceipts;
use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement;
use near_primitives::stateless_validation::contract_distribution::{
Expand Down Expand Up @@ -240,6 +241,8 @@ impl From<NetworkResponses> for PeerManagerMessageResponse {
pub enum NetworkRequests {
/// Sends block, either when block was just produced or when requested.
Block { block: Block },
/// Sends optimistic block as soon as the production window for the height starts.
OptimisticBlock { optimistic_block: OptimisticBlock },
/// Sends approval.
Approval { approval_message: ApprovalMessage },
/// Request block with given hash from given peer.
Expand Down
Loading
Loading