From e12e00685c828aa6c25158c4f4590aed311fc9f8 Mon Sep 17 00:00:00 2001 From: Bohdan Khorolets Date: Thu, 23 Nov 2023 12:03:02 +0200 Subject: [PATCH 1/2] refactor(high-level): Don't require &mut Block to improve the API for developers --- examples/actions.rs | 2 +- examples/nft_indexer.rs | 2 +- examples/with_context.rs | 2 +- examples/with_context_parent_tx_cache.rs | 2 +- lake-context-derive/src/lib.rs | 2 +- lake-framework/src/lib.rs | 6 +- lake-framework/src/types.rs | 6 +- lake-parent-transaction-cache/src/lib.rs | 2 +- lake-primitives/src/types/block.rs | 162 +++++++++-------------- 9 files changed, 78 insertions(+), 108 deletions(-) diff --git a/examples/actions.rs b/examples/actions.rs index 227ca39..c017359 100644 --- a/examples/actions.rs +++ b/examples/actions.rs @@ -20,7 +20,7 @@ fn main() -> anyhow::Result<()> { } async fn print_function_calls_to_my_account( - mut block: near_lake_primitives::block::Block, + block: near_lake_primitives::block::Block, ) -> anyhow::Result<()> { let block_height = block.block_height(); let actions: Vec<&near_lake_primitives::actions::FunctionCall> = block diff --git a/examples/nft_indexer.rs b/examples/nft_indexer.rs index 7b8b45a..048787e 100644 --- a/examples/nft_indexer.rs +++ b/examples/nft_indexer.rs @@ -24,7 +24,7 @@ fn main() -> anyhow::Result<()> { Ok(()) } -async fn handle_block(mut block: near_lake_primitives::block::Block) -> anyhow::Result<()> { +async fn handle_block(block: near_lake_primitives::block::Block) -> anyhow::Result<()> { // Indexing lines START let nfts: Vec = block .events() // fetching all the events that occurred in the block diff --git a/examples/with_context.rs b/examples/with_context.rs index 9de6150..9aae36b 100644 --- a/examples/with_context.rs +++ b/examples/with_context.rs @@ -51,7 +51,7 @@ fn main() -> anyhow::Result<()> { } async fn print_function_calls_to_my_account( - mut block: near_lake_primitives::block::Block, + block: near_lake_primitives::block::Block, ctx: &FileContext, ) -> anyhow::Result<()> { let block_height = block.block_height(); diff --git a/examples/with_context_parent_tx_cache.rs b/examples/with_context_parent_tx_cache.rs index add12a7..8c90188 100644 --- a/examples/with_context_parent_tx_cache.rs +++ b/examples/with_context_parent_tx_cache.rs @@ -33,7 +33,7 @@ fn main() -> anyhow::Result<()> { } async fn print_function_call_tx_hash( - mut block: near_lake_primitives::block::Block, + block: near_lake_primitives::block::Block, ctx: &ParentTransactionCache, ) -> anyhow::Result<()> { // Cache has been updated before this function is called. diff --git a/lake-context-derive/src/lib.rs b/lake-context-derive/src/lib.rs index 57da168..0587ef2 100644 --- a/lake-context-derive/src/lib.rs +++ b/lake-context-derive/src/lib.rs @@ -65,7 +65,7 @@ pub fn lake_context_derive(input: TokenStream) -> TokenStream { let expanded = quote! { // The generated impl. impl near_lake_framework::LakeContextExt for #name { - fn execute_before_run(&self, block: &mut near_lake_primitives::block::Block) { + fn execute_before_run(&self, block: &near_lake_primitives::block::Block) { #( #calls_before_run )* } diff --git a/lake-framework/src/lib.rs b/lake-framework/src/lib.rs index c9025d8..834a79e 100644 --- a/lake-framework/src/lib.rs +++ b/lake-framework/src/lib.rs @@ -68,9 +68,9 @@ impl types::Lake { // concurrency 1 let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) .map(|streamer_message| async { - let mut block: near_lake_primitives::block::Block = streamer_message.into(); + let block: near_lake_primitives::block::Block = streamer_message.into(); - context.execute_before_run(&mut block); + context.execute_before_run(&block); let user_indexer_function_execution_result = f(block, context).await; @@ -116,7 +116,7 @@ impl types::Lake { struct EmptyContext {} impl LakeContextExt for EmptyContext { - fn execute_before_run(&self, _block: &mut near_lake_primitives::block::Block) {} + fn execute_before_run(&self, _block: &near_lake_primitives::block::Block) {} fn execute_after_run(&self) {} } diff --git a/lake-framework/src/types.rs b/lake-framework/src/types.rs index 51352c9..c0bb273 100644 --- a/lake-framework/src/types.rs +++ b/lake-framework/src/types.rs @@ -179,7 +179,7 @@ pub enum LakeError { /// struct PrinterContext; /// /// impl LakeContextExt for PrinterContext { -/// fn execute_before_run(&self, block: &mut near_lake_primitives::block::Block) { +/// fn execute_before_run(&self, block: &near_lake_primitives::block::Block) { /// println!("Processing block {}", block.header().height()); /// } /// fn execute_after_run(&self) {} @@ -201,7 +201,7 @@ pub enum LakeError { /// // We need our context to do nothing before and after the indexing process. /// // The only purpose is to provide the database connection pool to the indexing process. /// impl LakeContextExt for ApplicationDataContext { -/// fn execute_before_run(&self, block: &mut near_lake_primitives::block::Block) {} +/// fn execute_before_run(&self, block: &near_lake_primitives::block::Block) {} /// fn execute_after_run(&self) {} /// } /// @@ -319,7 +319,7 @@ pub enum LakeError { /// And we didn't need to implement them in our `ApplicationDataContext` struct because `LakeContext` derive macro did it for us automatically. pub trait LakeContextExt { /// This method will be called before the indexing process is started. - fn execute_before_run(&self, block: &mut near_lake_primitives::block::Block); + fn execute_before_run(&self, block: &near_lake_primitives::block::Block); /// This method will be called after the indexing process is finished. fn execute_after_run(&self); } diff --git a/lake-parent-transaction-cache/src/lib.rs b/lake-parent-transaction-cache/src/lib.rs index 8002228..8bdc270 100644 --- a/lake-parent-transaction-cache/src/lib.rs +++ b/lake-parent-transaction-cache/src/lib.rs @@ -62,7 +62,7 @@ impl LakeContextExt for ParentTransactionCache { /// The process to scan the [near_lake_primitives::Block](near_lake_framework::near_lake_primitives::block::Block) and update the cache /// with the new transactions and first expected receipts. /// The cache is used to find the parent transaction hash for a given receipt id. - fn execute_before_run(&self, block: &mut Block) { + fn execute_before_run(&self, block: &Block) { // Fill up the cache with new transactions and first expected receipts // We will try to skip the transactions related to the accounts we're not watching for. // Based on `accounts_id` diff --git a/lake-primitives/src/types/block.rs b/lake-primitives/src/types/block.rs index 0fede21..f75d535 100644 --- a/lake-primitives/src/types/block.rs +++ b/lake-primitives/src/types/block.rs @@ -75,16 +75,7 @@ impl Block { /// A reminder that `receipt_execution_outcomes` has a type [near_indexer_primitives::IndexerExecutionOutcomeWithReceipt] which is an /// ephemeral structure from `near-indexer-primitives` that hold a [near_primitives::views::ExecutionOutcomeView] /// along with the corresponding [near_primitives::views::ReceiptView]. - pub fn receipts(&mut self) -> impl Iterator { - if self.executed_receipts.is_empty() { - self.executed_receipts = self - .streamer_message - .shards - .iter() - .flat_map(|shard| shard.receipt_execution_outcomes.iter()) - .map(Into::into) - .collect(); - } + pub fn receipts(&self) -> impl Iterator { self.executed_receipts.iter() } @@ -92,23 +83,7 @@ impl Block { /// /// [Receipts](crate::receipts::Receipt) included on the chain but not executed yet are called "postponed", /// they are represented by the same structure [Receipt](crate::receipts::Receipt). - pub fn postponed_receipts(&mut self) -> impl Iterator { - if self.postponed_receipts.is_empty() { - let executed_receipts_ids: Vec<_> = self - .receipts() - .map(|receipt| receipt.receipt_id()) - .collect(); - self.postponed_receipts = self - .streamer_message - .shards - .iter() - .filter_map(|shard| shard.chunk.as_ref().map(|chunk| chunk.receipts.iter())) - .flatten() - // exclude receipts that are already executed - .filter(|receipt| !executed_receipts_ids.contains(&receipt.receipt_id)) - .map(Into::into) - .collect(); - } + pub fn postponed_receipts(&self) -> impl Iterator { self.postponed_receipts.iter() } @@ -118,61 +93,22 @@ impl Block { /// the action chain has begun. Other indexer developers care about it because of the habits /// from other blockchains like Ethereum where a transaction is a main asset. In case of NEAR /// [Receipts](crate::receipts::Receipt) are more important. - pub fn transactions(&mut self) -> impl Iterator { - if self.transactions.is_empty() { - self.transactions = self - .streamer_message - .shards - .iter() - .filter_map(|shard| shard.chunk.as_ref().map(|chunk| chunk.transactions.iter())) - .flatten() - .map(TryInto::try_into) - .filter_map(|transactions| transactions.ok()) - .collect(); - } + pub fn transactions(&self) -> impl Iterator { self.transactions.iter() } - /// Internal method to build the cache of actions on demand - fn actions_from_streamer_message(&self) -> Vec { - self.streamer_message() - .shards - .iter() - .flat_map(|shard| shard.receipt_execution_outcomes.iter()) - .filter_map(|receipt_execution_outcome| { - actions::Action::try_vec_from_receipt_view(&receipt_execution_outcome.receipt).ok() - }) - .flatten() - .collect() - } - /// Returns an iterator of the [Actions](crate::actions::Action) executed in the [Block] - pub fn actions(&mut self) -> impl Iterator { - if self.actions.is_empty() { - self.build_actions_cache(); - } + pub fn actions(&self) -> impl Iterator { self.actions.iter() } /// Returns an iterator of the [Events](crate::events::Event) emitted in the [Block] - pub fn events(&mut self) -> impl Iterator { - if self.events.is_empty() { - self.build_events_hashmap(); - } + pub fn events(&self) -> impl Iterator { self.events.values().flatten() } /// Returns an iterator of the [StateChanges](crate::state_changes::StateChange) happened in the [Block] - pub fn state_changes(&mut self) -> impl Iterator { - if self.state_changes.is_empty() { - self.state_changes = self - .streamer_message - .shards - .iter() - .flat_map(|shard| shard.state_changes.iter()) - .map(Into::into) - .collect(); - } + pub fn state_changes(&self) -> impl Iterator { self.state_changes.iter() } @@ -180,7 +116,7 @@ impl Block { /// /// **Heads up!** This methods searches for the actions in the current [Block] only. pub fn actions_by_receipt_id<'a>( - &'a mut self, + &'a self, receipt_id: &'a super::ReceiptId, ) -> impl Iterator + 'a { self.actions() @@ -188,10 +124,7 @@ impl Block { } /// Helper to get all the [Events](crate::events::Event) emitted by the specific [Receipt](crate::receipts::Receipt) - pub fn events_by_receipt_id(&mut self, receipt_id: &super::ReceiptId) -> Vec { - if self.events.is_empty() { - self.build_events_hashmap(); - } + pub fn events_by_receipt_id(&self, receipt_id: &super::ReceiptId) -> Vec { if let Some(events) = self.events.get(receipt_id) { events.to_vec() } else { @@ -201,7 +134,7 @@ impl Block { /// Helper to get all the [Events](crate::events::Event) emitted by the specific contract ([AccountId](crate::near_indexer_primitives::types::AccountId)) pub fn events_by_contract_id<'a>( - &'a mut self, + &'a self, account_id: &'a crate::near_indexer_primitives::types::AccountId, ) -> impl Iterator + 'a { self.events() @@ -209,37 +142,73 @@ impl Block { } /// Helper to get a specific [Receipt](crate::receipts::Receipt) by the [ReceiptId](crate::types::ReceiptId) - pub fn receipt_by_id(&mut self, receipt_id: &super::ReceiptId) -> Option<&receipts::Receipt> { + pub fn receipt_by_id(&self, receipt_id: &super::ReceiptId) -> Option<&receipts::Receipt> { self.receipts() .find(|receipt| &receipt.receipt_id() == receipt_id) } } -impl Block { - // Internal method to build the cache of actions on demand - fn build_actions_cache(&mut self) { - self.actions = self.actions_from_streamer_message().to_vec(); - } +impl From for Block { + fn from(streamer_message: StreamerMessage) -> Self { + let executed_receipts: Vec = streamer_message + .shards + .iter() + .flat_map(|shard| shard.receipt_execution_outcomes.iter()) + .map(Into::into) + .collect(); + let postponed_receipts = streamer_message + .shards + .iter() + .filter_map(|shard| shard.chunk.as_ref().map(|chunk| chunk.receipts.iter())) + .flatten() + // exclude receipts that are already executed + .filter(|receipt| { + !executed_receipts + .iter() + .any(|executed_receipt| executed_receipt.receipt_id() == receipt.receipt_id) + }) + .map(Into::into) + .collect(); + + let transactions: Vec = streamer_message + .shards + .iter() + .filter_map(|shard| shard.chunk.as_ref().map(|chunk| chunk.transactions.iter())) + .flatten() + .map(TryInto::try_into) + .filter_map(|transactions| transactions.ok()) + .collect(); + + let actions: Vec = streamer_message + .shards + .iter() + .flat_map(|shard| shard.receipt_execution_outcomes.iter()) + .filter_map(|receipt_execution_outcome| { + actions::Action::try_vec_from_receipt_view(&receipt_execution_outcome.receipt).ok() + }) + .flatten() + .collect(); - // Internal method to build the cache of events on demand - fn build_events_hashmap(&mut self) { - self.events = self - .receipts() + let events: HashMap> = executed_receipts + .iter() .map(|receipt| (receipt.receipt_id(), receipt.events())) .collect(); - } -} -impl From for Block { - fn from(streamer_message: StreamerMessage) -> Self { + let state_changes: Vec = streamer_message + .shards + .iter() + .flat_map(|shard| shard.state_changes.iter()) + .map(Into::into) + .collect(); + Self { + executed_receipts, + postponed_receipts, + transactions, + actions, + events, + state_changes, streamer_message, - executed_receipts: vec![], - postponed_receipts: vec![], - transactions: vec![], - actions: vec![], - events: HashMap::new(), - state_changes: vec![], } } } @@ -263,6 +232,7 @@ pub struct BlockHeader { latest_protocol_version: u32, random_value: CryptoHash, chunks_included: u64, + // TODO: replace with the corresponding Lake Primitives type eventually validator_proposals: Vec, } From d17ba6a760c3c142c97bfac01aa25afd295dabc1 Mon Sep 17 00:00:00 2001 From: Bohdan Khorolets Date: Fri, 8 Dec 2023 13:49:26 +0200 Subject: [PATCH 2/2] refactor: Don't store events data on the `Block` (process on demand instead) --- examples/nft_indexer.rs | 2 ++ lake-context-derive/README.md | 2 +- lake-framework/Cargo.toml | 4 +-- lake-framework/README.md | 2 +- lake-parent-transaction-cache/README.md | 2 +- lake-primitives/src/types/block.rs | 33 +++++++++++++------------ 6 files changed, 24 insertions(+), 21 deletions(-) diff --git a/examples/nft_indexer.rs b/examples/nft_indexer.rs index 048787e..f04cf18 100644 --- a/examples/nft_indexer.rs +++ b/examples/nft_indexer.rs @@ -28,6 +28,8 @@ async fn handle_block(block: near_lake_primitives::block::Block) -> anyhow::Resu // Indexing lines START let nfts: Vec = block .events() // fetching all the events that occurred in the block + .values() + .flatten() .filter(|event| event.standard() == "nep171") .filter(|event| event.event() == "nft_mint") // filter them by "nft_mint" event only .filter_map(|event| parse_event(event)) diff --git a/lake-context-derive/README.md b/lake-context-derive/README.md index 57bab04..684d598 100644 --- a/lake-context-derive/README.md +++ b/lake-context-derive/README.md @@ -35,7 +35,7 @@ This will simplify your indexer function signature. It now needs only the contex ```ignore async fn handle_block( - mut block: Block, + block: Block, ctx: &MyContext, ) -> anyhow::Result<()> { // body diff --git a/lake-framework/Cargo.toml b/lake-framework/Cargo.toml index 75dc44d..137bfbb 100644 --- a/lake-framework/Cargo.toml +++ b/lake-framework/Cargo.toml @@ -22,8 +22,8 @@ tokio = { version = "1.1", features = ["sync", "time", "rt-multi-thread"] } tokio-stream = { version = "0.1" } tracing = "0.1.13" -near-lake-primitives = { path = "../lake-primitives", version = "0.8.0-beta.2" } -near-lake-context-derive = { path = "../lake-context-derive", version = "0.8.0-beta.2" } +near-lake-primitives = { path = "../lake-primitives", version = "0.8.0-beta.3" } +near-lake-context-derive = { path = "../lake-context-derive", version = "0.8.0-beta.3" } [dev-dependencies] aws-smithy-http = "0.60.0" diff --git a/lake-framework/README.md b/lake-framework/README.md index 489f62c..7f28c77 100644 --- a/lake-framework/README.md +++ b/lake-framework/README.md @@ -91,7 +91,7 @@ fn main() -> anyhow::Result<()> { } async fn print_function_call_tx_hash( - mut block: near_lake_primitives::block::Block, + block: near_lake_primitives::block::Block, ctx: &ParentTransactionCache, ) -> anyhow::Result<()> { // Cache has been updated before this function is called. diff --git a/lake-parent-transaction-cache/README.md b/lake-parent-transaction-cache/README.md index 1e59fea..515bd07 100644 --- a/lake-parent-transaction-cache/README.md +++ b/lake-parent-transaction-cache/README.md @@ -24,7 +24,7 @@ LakeBuilder::default() # } async fn handle_block( - mut block: Block, + block: Block, ctx: &ParentTransactionCache, ) -> anyhow::Result<()> { for action in block.actions() { diff --git a/lake-primitives/src/types/block.rs b/lake-primitives/src/types/block.rs index f75d535..dd52828 100644 --- a/lake-primitives/src/types/block.rs +++ b/lake-primitives/src/types/block.rs @@ -30,7 +30,6 @@ pub struct Block { postponed_receipts: Vec, transactions: Vec, actions: Vec, - events: HashMap>, state_changes: Vec, } @@ -102,9 +101,12 @@ impl Block { self.actions.iter() } - /// Returns an iterator of the [Events](crate::events::Event) emitted in the [Block] - pub fn events(&self) -> impl Iterator { - self.events.values().flatten() + /// Returns a Vec of [Events](crate::events::Event) emitted in the [Block] + pub fn events(&self) -> HashMap> { + self.executed_receipts + .iter() + .map(|receipt| (receipt.receipt_id(), receipt.events())) + .collect() } /// Returns an iterator of the [StateChanges](crate::state_changes::StateChange) happened in the [Block] @@ -125,7 +127,7 @@ impl Block { /// Helper to get all the [Events](crate::events::Event) emitted by the specific [Receipt](crate::receipts::Receipt) pub fn events_by_receipt_id(&self, receipt_id: &super::ReceiptId) -> Vec { - if let Some(events) = self.events.get(receipt_id) { + if let Some(events) = self.events().get(receipt_id) { events.to_vec() } else { vec![] @@ -133,12 +135,17 @@ impl Block { } /// Helper to get all the [Events](crate::events::Event) emitted by the specific contract ([AccountId](crate::near_indexer_primitives::types::AccountId)) - pub fn events_by_contract_id<'a>( - &'a self, - account_id: &'a crate::near_indexer_primitives::types::AccountId, - ) -> impl Iterator + 'a { + pub fn events_by_contract_id( + &self, + account_id: &crate::near_indexer_primitives::types::AccountId, + ) -> Vec { + let account_id_clone = account_id.clone(); // Clone the account_id self.events() - .filter(move |event| event.is_emitted_by_contract(&account_id.clone())) + .values() + .flatten() + .filter(|event| event.is_emitted_by_contract(&account_id_clone)) + .map(Clone::clone) + .collect() } /// Helper to get a specific [Receipt](crate::receipts::Receipt) by the [ReceiptId](crate::types::ReceiptId) @@ -189,11 +196,6 @@ impl From for Block { .flatten() .collect(); - let events: HashMap> = executed_receipts - .iter() - .map(|receipt| (receipt.receipt_id(), receipt.events())) - .collect(); - let state_changes: Vec = streamer_message .shards .iter() @@ -206,7 +208,6 @@ impl From for Block { postponed_receipts, transactions, actions, - events, state_changes, streamer_message, }