Skip to content

Commit

Permalink
feat(optimistic_block): pass optimistic block to producers over T1 (#…
Browse files Browse the repository at this point in the history
…12888)

Pass the optimistic block to all producers over the T1 network.
  • Loading branch information
VanBarbascu authored Feb 13, 2025
1 parent a425867 commit 8b3fc09
Show file tree
Hide file tree
Showing 19 changed files with 143 additions and 21 deletions.
3 changes: 2 additions & 1 deletion chain/client/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub fn client_sender_for_network(
announce_account: view_client_addr.into_sender(),
chunk_endorsement: client_addr.clone().into_sender(),
epoch_sync_request: client_addr.clone().into_sender(),
epoch_sync_response: client_addr.into_sender(),
epoch_sync_response: client_addr.clone().into_sender(),
optimistic_block_receiver: client_addr.into_sender(),
}
}
6 changes: 4 additions & 2 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,9 +1031,11 @@ impl Client {
res
}

#[allow(unused)]
pub fn receive_optimistic_block(&mut self, block: OptimisticBlock) {
/// Check optimistic block and start processing if is valid.
pub fn receive_optimistic_block(&mut self, block: OptimisticBlock, _peer_id: PeerId) {
let _span = debug_span!(target: "client", "receive_optimistic_block").entered();
// TODO(#10584): Validate the optimistic block.
// TODO(#10584): Discard the block if it is not from the the block producer.
self.chain.optimistic_block_chunks.add_block(block);
self.maybe_process_optimistic_block();
}
Expand Down
19 changes: 14 additions & 5 deletions chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ use near_client_primitives::types::{
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::EpochManagerAdapter;
use near_network::client::{
BlockApproval, BlockHeadersResponse, BlockResponse, ChunkEndorsementMessage, ProcessTxRequest,
ProcessTxResponse, RecvChallenge, SetNetworkInfo, StateResponseReceived,
BlockApproval, BlockHeadersResponse, BlockResponse, ChunkEndorsementMessage,
OptimisticBlockMessage, ProcessTxRequest, ProcessTxResponse, RecvChallenge, SetNetworkInfo,
StateResponseReceived,
};
use near_network::types::ReasonForBan;
use near_network::types::{
Expand Down Expand Up @@ -503,6 +504,15 @@ impl Handler<ProcessTxRequest> for ClientActorInner {
}
}

impl Handler<OptimisticBlockMessage> for ClientActorInner {
fn handle(&mut self, msg: OptimisticBlockMessage) {
let OptimisticBlockMessage { optimistic_block, from_peer } = msg;
debug!(target: "client", block_height = optimistic_block.inner.block_height, prev_block_hash = ?optimistic_block.inner.prev_block_hash, ?from_peer, "OptimisticBlockMessage");

self.client.receive_optimistic_block(optimistic_block, from_peer);
}
}

impl Handler<BlockResponse> for ClientActorInner {
fn handle(&mut self, msg: BlockResponse) {
let BlockResponse { block, peer_id, was_requested } = msg;
Expand Down Expand Up @@ -1385,11 +1395,10 @@ impl ClientActorInner {
return Ok(());
};

/* TODO(#10584): If we produced the optimistic block, send it out before we save it.
// If we produced the optimistic block, send it out before we save it.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::OptimisticBlock { optimistic_block: block.clone() },
NetworkRequests::OptimisticBlock { optimistic_block: optimistic_block.clone() },
));
*/

// We’ve produced the optimistic block, mark it as done so we don't produce it again.
self.client.save_optimistic_block(&optimistic_block);
Expand Down
16 changes: 14 additions & 2 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig};
use near_epoch_manager::EpochManagerAdapter;
use near_network::client::{
AnnounceAccountRequest, BlockApproval, BlockHeadersRequest, BlockHeadersResponse, BlockRequest,
BlockResponse, ChunkEndorsementMessage, SetNetworkInfo, StateRequestHeader, StateRequestPart,
StateResponseReceived,
BlockResponse, ChunkEndorsementMessage, OptimisticBlockMessage, SetNetworkInfo,
StateRequestHeader, StateRequestPart, StateResponseReceived,
};
use near_network::shards_manager::ShardsManagerRequestFromNetwork;
use near_network::state_witness::{
Expand Down Expand Up @@ -511,6 +511,18 @@ fn process_peer_manager_message_default(

hash_to_height.write().unwrap().insert(*block.header().hash(), block.header().height());
}
NetworkRequests::OptimisticBlock { optimistic_block } => {
// TODO(#10584): maybe go through an adapter to facilitate testing.
for actor_handles in connectors {
actor_handles.client_actor.do_send(
OptimisticBlockMessage {
optimistic_block: optimistic_block.clone(),
from_peer: PeerInfo::random().id,
}
.with_span_context(),
);
}
}
NetworkRequests::PartialEncodedChunkRequest { target, request, .. } => {
send_chunks(
connectors,
Expand Down
9 changes: 9 additions & 0 deletions chain/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::errors::InvalidTxError;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::{AccountId, EpochId, ShardId};
Expand Down Expand Up @@ -138,6 +139,13 @@ pub struct EpochSyncResponseMessage {
pub proof: CompressedEpochSyncProof,
}

#[derive(actix::Message, Debug, Clone, PartialEq, Eq)]
#[rtype(result = "()")]
pub struct OptimisticBlockMessage {
pub optimistic_block: OptimisticBlock,
pub from_peer: PeerId,
}

#[derive(Clone, MultiSend, MultiSenderFrom, MultiSendMessage)]
#[multi_send_message_derive(Debug)]
#[multi_send_input_derive(Debug, Clone, PartialEq, Eq)]
Expand All @@ -160,4 +168,5 @@ pub struct ClientSenderForNetwork {
pub chunk_endorsement: AsyncSender<ChunkEndorsementMessage, ()>,
pub epoch_sync_request: Sender<EpochSyncRequestMessage>,
pub epoch_sync_response: Sender<EpochSyncResponseMessage>,
pub optimistic_block_receiver: Sender<OptimisticBlockMessage>,
}
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/borsh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use near_primitives::block::{Block, BlockHeader, GenesisId};
use near_primitives::challenge::Challenge;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::ShardId;
use near_schema_checker_lib::ProtocolSchema;
Expand Down Expand Up @@ -170,6 +171,7 @@ pub(super) enum PeerMessage {

EpochSyncRequest,
EpochSyncResponse(CompressedEpochSyncProof),
OptimisticBlock(OptimisticBlock),
}
#[cfg(target_arch = "x86_64")] // Non-x86_64 doesn't match this requirement yet but it's not bad as it's not production-ready
const _: () = assert!(std::mem::size_of::<PeerMessage>() <= 1500, "PeerMessage > 1500 bytes");
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/borsh_conv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl TryFrom<&net::PeerMessage> for mem::PeerMessage {
net::PeerMessage::BlockHeaders(bhs) => mem::PeerMessage::BlockHeaders(bhs),
net::PeerMessage::BlockRequest(bh) => mem::PeerMessage::BlockRequest(bh),
net::PeerMessage::Block(b) => mem::PeerMessage::Block(b),
net::PeerMessage::OptimisticBlock(ob) => mem::PeerMessage::OptimisticBlock(ob),
net::PeerMessage::Transaction(t) => mem::PeerMessage::Transaction(t),
net::PeerMessage::Routed(r) => mem::PeerMessage::Routed(Box::new(RoutedMessageV2 {
msg: *r,
Expand Down Expand Up @@ -246,6 +247,7 @@ impl From<&mem::PeerMessage> for net::PeerMessage {
mem::PeerMessage::BlockHeaders(bhs) => net::PeerMessage::BlockHeaders(bhs),
mem::PeerMessage::BlockRequest(bh) => net::PeerMessage::BlockRequest(bh),
mem::PeerMessage::Block(b) => net::PeerMessage::Block(b),
mem::PeerMessage::OptimisticBlock(ob) => net::PeerMessage::OptimisticBlock(ob),
mem::PeerMessage::Transaction(t) => net::PeerMessage::Transaction(t),
mem::PeerMessage::Routed(r) => net::PeerMessage::Routed(Box::new(r.msg.clone())),
mem::PeerMessage::Disconnect(_) => net::PeerMessage::Disconnect,
Expand Down
2 changes: 2 additions & 0 deletions chain/network/src/network_protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::merkle::combine_hash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::sharding::{
ChunkHash, PartialEncodedChunk, PartialEncodedChunkPart, ReceiptProof, ShardChunkHeader,
};
Expand Down Expand Up @@ -433,6 +434,7 @@ pub enum PeerMessage {

BlockRequest(CryptoHash),
Block(Block),
OptimisticBlock(OptimisticBlock),

Transaction(SignedTransaction),
Routed(Box<RoutedMessageV2>),
Expand Down
8 changes: 8 additions & 0 deletions chain/network/src/network_protocol/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,12 @@ message EpochSyncResponse {
bytes compressed_proof = 1;
}

message OptimisticBlock {
bytes inner = 1;
Signature signature = 2;
CryptoHash hash = 3;
}

// PeerMessage is a wrapper of all message types exchanged between NEAR nodes.
// The wire format of a single message M consists of len(M)+4 bytes:
// <len(M)> : 4 bytes : little endian uint32
Expand Down Expand Up @@ -508,5 +514,7 @@ message PeerMessage {

EpochSyncRequest epoch_sync_request = 34;
EpochSyncResponse epoch_sync_response = 35;

OptimisticBlock optimistic_block = 36;
}
}
41 changes: 41 additions & 0 deletions chain/network/src/network_protocol/proto_conv/peer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use borsh::BorshDeserialize as _;
use near_async::time::error::ComponentRange;
use near_primitives::block::{Block, BlockHeader};
use near_primitives::challenge::Challenge;
use near_primitives::optimistic_block::{OptimisticBlock, OptimisticBlockInner};
use near_primitives::transaction::SignedTransaction;
use near_primitives::utils::compression::CompressedData;
use protobuf::MessageField as MF;
Expand Down Expand Up @@ -202,6 +203,40 @@ impl TryFrom<&proto::SnapshotHostInfo> for SnapshotHostInfo {

//////////////////////////////////////////

#[derive(thiserror::Error, Debug)]
pub enum ParseOptimisticBlockError {
#[error("inner")]
Inner(std::io::Error),
#[error("sync_hash {0}")]
Hash(ParseRequiredError<ParseCryptoHashError>),
#[error("signature {0}")]
Signature(ParseRequiredError<ParseSignatureError>),
}

impl From<&OptimisticBlock> for proto::OptimisticBlock {
fn from(ob: &OptimisticBlock) -> Self {
Self {
inner: borsh::to_vec(&ob.inner).unwrap(),
signature: MF::some((&ob.signature).into()),
hash: MF::some((&ob.hash).into()),
..Default::default()
}
}
}

impl TryFrom<&proto::OptimisticBlock> for OptimisticBlock {
type Error = ParseOptimisticBlockError;
fn try_from(p_ob: &proto::OptimisticBlock) -> Result<Self, Self::Error> {
Ok(Self {
inner: OptimisticBlockInner::try_from_slice(&p_ob.inner).map_err(Self::Error::Inner)?,
signature: try_from_required(&p_ob.signature).map_err(Self::Error::Signature)?,
hash: try_from_required(&p_ob.hash).map_err(Self::Error::Hash)?,
})
}
}

//////////////////////////////////////////

#[derive(thiserror::Error, Debug)]
pub enum ParseSyncSnapshotHostsError {
#[error("hosts {0}")]
Expand Down Expand Up @@ -293,6 +328,7 @@ impl From<&PeerMessage> for proto::PeerMessage {
block: MF::some(b.into()),
..Default::default()
}),
PeerMessage::OptimisticBlock(ob) => ProtoMT::OptimisticBlock(ob.into()),
PeerMessage::Transaction(t) => ProtoMT::Transaction(proto::SignedTransaction {
borsh: borsh::to_vec(&t).unwrap(),
..Default::default()
Expand Down Expand Up @@ -397,6 +433,8 @@ pub enum ParsePeerMessageError {
StateResponse(ParseRequiredError<ParseStateInfoError>),
#[error("sync_snapshot_hosts: {0}")]
SyncSnapshotHosts(ParseSyncSnapshotHostsError),
#[error("optimistic_block: {0}")]
OptimisticBlock(ParseOptimisticBlockError),
}

impl TryFrom<&proto::PeerMessage> for PeerMessage {
Expand Down Expand Up @@ -467,6 +505,9 @@ impl TryFrom<&proto::PeerMessage> for PeerMessage {
ProtoMT::BlockResponse(br) => PeerMessage::Block(
try_from_required(&br.block).map_err(Self::Error::BlockResponse)?,
),
ProtoMT::OptimisticBlock(ob) => {
PeerMessage::OptimisticBlock(ob.try_into().map_err(Self::Error::OptimisticBlock)?)
}
ProtoMT::Transaction(t) => PeerMessage::Transaction(
SignedTransaction::try_from_slice(&t.borsh).map_err(Self::Error::Transaction)?,
),
Expand Down
10 changes: 8 additions & 2 deletions chain/network/src/peer/peer_actor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::accounts_data::AccountDataError;
use crate::client::{
AnnounceAccountRequest, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, BlockResponse,
EpochSyncRequestMessage, EpochSyncResponseMessage, ProcessTxRequest, RecvChallenge,
StateRequestHeader, StateRequestPart, StateResponseReceived,
EpochSyncRequestMessage, EpochSyncResponseMessage, OptimisticBlockMessage, ProcessTxRequest,
RecvChallenge, StateRequestHeader, StateRequestPart, StateResponseReceived,
};
use crate::concurrency::atomic_cell::AtomicCell;
use crate::concurrency::demux;
Expand Down Expand Up @@ -1127,6 +1127,12 @@ impl PeerActor {
.send(EpochSyncResponseMessage { from_peer: peer_id, proof });
None
}
PeerMessage::OptimisticBlock(ob) => {
network_state
.client
.send(OptimisticBlockMessage { from_peer: peer_id, optimistic_block: ob });
None
}
msg => {
tracing::error!(target: "network", "Peer received unexpected type: {:?}", msg);
None
Expand Down
1 change: 1 addition & 0 deletions chain/network/src/peer_manager/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ impl tcp::Tier {
PeerMessage::VersionedStateResponse(_) => {
self == tcp::Tier::T2 || self == tcp::Tier::T3
}
PeerMessage::OptimisticBlock(..) => true,
PeerMessage::Routed(msg) => self.is_allowed_routed(&msg.body),
PeerMessage::SyncRoutingTable(..)
| PeerMessage::DistanceVector(..)
Expand Down
8 changes: 8 additions & 0 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,14 @@ impl PeerManagerActor {
self.state.tier2.broadcast_message(Arc::new(PeerMessage::Block(block)));
NetworkResponses::NoResponse
}
NetworkRequests::OptimisticBlock { optimistic_block } => {
// TODO(#10584): send this message to all the producers.
// Maybe we just need to send this to the next producers.
self.state
.tier1
.broadcast_message(Arc::new(PeerMessage::OptimisticBlock(optimistic_block)));
NetworkResponses::NoResponse
}
NetworkRequests::Approval { approval_message } => {
self.state.send_message_to_account(
&self.clock,
Expand Down
2 changes: 2 additions & 0 deletions chain/network/src/rate_limits/messages_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ pub enum RateLimitedPeerMessageKey {
ContractCodeResponse,
PartialEncodedContractDeploys,
EpochSyncRequest,
OptimisticBlock,
}

/// Given a `PeerMessage` returns a tuple containing the `RateLimitedPeerMessageKey`
Expand All @@ -206,6 +207,7 @@ fn get_key_and_token_cost(message: &PeerMessage) -> Option<(RateLimitedPeerMessa
PeerMessage::BlockHeaders(_) => Some((BlockHeaders, 1)),
PeerMessage::BlockRequest(_) => Some((BlockRequest, 1)),
PeerMessage::Block(_) => Some((Block, 1)),
PeerMessage::OptimisticBlock(_) => Some((OptimisticBlock, 1)),
PeerMessage::Transaction(_) => Some((Transaction, 1)),
PeerMessage::Routed(msg) => match msg.body {
RoutedMessageBody::BlockApproval(_) => Some((BlockApproval, 1)),
Expand Down
19 changes: 17 additions & 2 deletions chain/network/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::sync::{Arc, Mutex};

use crate::client::{
BlockApproval, BlockHeadersRequest, BlockHeadersResponse, BlockRequest, BlockResponse,
ChunkEndorsementMessage, EpochSyncRequestMessage, EpochSyncResponseMessage, ProcessTxRequest,
ProcessTxResponse,
ChunkEndorsementMessage, EpochSyncRequestMessage, EpochSyncResponseMessage,
OptimisticBlockMessage, ProcessTxRequest, ProcessTxResponse,
};
use crate::shards_manager::ShardsManagerRequestFromNetwork;
use crate::state_witness::{
Expand Down Expand Up @@ -37,6 +37,7 @@ pub struct ClientSenderForTestLoopNetwork {
pub chunk_endorsement: AsyncSender<ChunkEndorsementMessage, ()>,
pub epoch_sync_request: Sender<EpochSyncRequestMessage>,
pub epoch_sync_response: Sender<EpochSyncResponseMessage>,
pub optimistic_block_receiver: Sender<OptimisticBlockMessage>,
}

#[derive(Clone, MultiSend, MultiSenderFrom)]
Expand Down Expand Up @@ -242,6 +243,20 @@ fn network_message_to_client_handler(
}
None
}
NetworkRequests::OptimisticBlock { optimistic_block } => {
let my_peer_id = shared_state.account_to_peer_id.get(&my_account_id).unwrap();
for account_id in shared_state.accounts() {
if account_id != &my_account_id {
let _ = shared_state.senders_for_account(account_id).client_sender.send(
OptimisticBlockMessage {
optimistic_block: optimistic_block.clone(),
from_peer: my_peer_id.clone(),
},
);
}
}
None
}
NetworkRequests::Approval { approval_message } => {
assert_ne!(
approval_message.target, my_account_id,
Expand Down
3 changes: 3 additions & 0 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use near_primitives::challenge::Challenge;
use near_primitives::epoch_sync::CompressedEpochSyncProof;
use near_primitives::hash::CryptoHash;
use near_primitives::network::{AnnounceAccount, PeerId};
use near_primitives::optimistic_block::OptimisticBlock;
use near_primitives::sharding::PartialEncodedChunkWithArcReceipts;
use near_primitives::stateless_validation::chunk_endorsement::ChunkEndorsement;
use near_primitives::stateless_validation::contract_distribution::{
Expand Down Expand Up @@ -240,6 +241,8 @@ impl From<NetworkResponses> for PeerManagerMessageResponse {
pub enum NetworkRequests {
/// Sends block, either when block was just produced or when requested.
Block { block: Block },
/// Sends optimistic block as soon as the production window for the height starts.
OptimisticBlock { optimistic_block: OptimisticBlock },
/// Sends approval.
Approval { approval_message: ApprovalMessage },
/// Request block with given hash from given peer.
Expand Down
Loading

0 comments on commit 8b3fc09

Please sign in to comment.