Skip to content

Commit

Permalink
- update readme with new filtered_trx_by_events module
Browse files Browse the repository at this point in the history
- adding filtered_trx_by_events substreams module
  • Loading branch information
Eduard-Voiculescu committed Jun 13, 2024
1 parent da2de45 commit 85c5459
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 78 deletions.
22 changes: 17 additions & 5 deletions injective-common/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

The Injective Foundational modules are Substreams modules extracting common data from the Injective blockchain.



## Modules

### all_events
Expand All @@ -14,7 +12,11 @@ The `all_events` module extracts only the events and provides them, along with t

The `filtered_events` module allows a reduction of the `all_events` output, only matching the events that match the requested type.

Use with the parameter, ex: `substreams run [...] -p filtered_events="message || injective.peggy.v1.EventDepositClaim"
Use with the parameter, ex:

```bash
substreams run [...] -p filtered_events="message || injective.peggy.v1.EventDepositClaim"
```

### all_transactions (work in progress)

Expand All @@ -23,11 +25,21 @@ The `all_transactions` module extracts all the transactions from the Injective b
Some message types are parsed from their "Any" type into the the corresponding type of an enum. See ./proto/cosmos/v1/transactions.proto to see the ones that are supported.
The other types will still be shown as protobuf "Any" type.

### filtered_trx_by_events

The `filtered_trx_by_events` modules allows a reduction of the `all_transactions` output, only matching the events that match the requested type. The module will return the entire transactions. Some event types will appear that do not match from the filtered params as the entire transaction is returned.

Use with the parameter, ex:

```bash
substreams run [...] -p filtered_trx_by_events="message || injective.peggy.v1.EventDepositClaim"
```

## Getting Started

### Gather protobuf definitions in generated-buf-build.binpb
### Gather protobuf definitions in generated-buf-build.binpb

The required protobuf modules are referenced in `buf.yaml`.
The required protobuf modules are referenced in `buf.yaml`.
You need a (free) API token to access https://buf.build and resolve the dependencies into a single file, generated-buf-build.binpb.
That file is then used to generate the rust protobuf bindings or to bundle the definitions in the .spkg. (it is referenced in the substreams.yaml)

Expand Down
175 changes: 104 additions & 71 deletions injective-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod pb;

use core::panic;
use std::collections::HashMap;

use crate::pb::cosmos::authz::v1beta1::MsgExec;
use crate::pb::cosmos::bank::v1beta1::MsgMultiSend;
Expand Down Expand Up @@ -102,6 +103,109 @@ pub fn all_transactions(block: Block) -> Result<TransactionList, Error> {
})
}

#[substreams::handlers::map]
pub fn all_events(block: Block) -> Result<EventList, Error> {
// Mutable list to add the output of the Substreams
let mut events: Vec<Event> = Vec::new();

if block.txs.len() != block.tx_results.len() {
return Err(anyhow!("Transaction list and result list do not match"));
}

for (i, tx_result) in block.tx_results.into_iter().enumerate() {
let tx_hash = compute_tx_hash(block.txs.get(i).unwrap());

let block_events: Vec<Event> = tx_result
.events
.into_iter()
.map(|event| {
return Event {
event: Some(event),
transaction_hash: tx_hash.clone(),
};
})
.collect();

events.extend(block_events);
}

Ok(EventList {
events: events,
clock: Some(Clock {
id: hex::encode(block.hash),
number: block.height as u64,
timestamp: block.time,
}),
})
}

#[substreams::handlers::map]
fn index_events(events: EventList) -> Result<Keys, Error> {
let mut keys = Keys::default();

events.events.into_iter().for_each(|e| {
if let Some(ev) = e.event {
keys.keys.push(ev.r#type);
}
});

Ok(keys)
}

#[substreams::handlers::map]
fn filtered_events(query: String, events: EventList) -> Result<EventList, Error> {
let filtered: Vec<Event> = events
.events
.into_iter()
.filter(|e| {
if let Some(ev) = &e.event {
let mut keys = Vec::new();
keys.push(ev.r#type.clone());
matches_keys_in_parsed_expr(&keys, &query).expect("matching events from query")
} else {
false
}
})
.collect();

Ok(EventList {
events: filtered,
clock: events.clock,
})
}

#[substreams::handlers::map]
fn filtered_trx_by_events(query: String, events: EventList, trxs: TransactionList) -> Result<TransactionList, Error> {
let mut transactions: HashMap<String, bool> = HashMap::new();

events
.events
.into_iter()
.filter(|e| {
if let Some(ev) = &e.event {
let mut keys = Vec::new();
keys.push(ev.r#type.clone());
matches_keys_in_parsed_expr(&keys, &query).expect("matching events from query")
} else {
false
}
})
.for_each(|e: Event| {
transactions.insert(e.transaction_hash, true);
});

let transactions: Vec<Transaction> = trxs
.transactions
.into_iter()
.filter(|t| transactions.contains_key(&t.hash))
.collect();

Ok(TransactionList {
transactions: transactions,
clock: trxs.clock,
})
}

fn extract_messages(messages: Vec<Any>) -> Vec<Message> {
return messages
.iter()
Expand Down Expand Up @@ -261,74 +365,3 @@ fn compute_tx_hash(tx_as_bytes: &[u8]) -> String {
let tx_hash = hasher.finalize();
return hex::encode(tx_hash);
}

#[substreams::handlers::map]
pub fn all_events(block: Block) -> Result<EventList, Error> {
// Mutable list to add the output of the Substreams
let mut events: Vec<Event> = Vec::new();

if block.txs.len() != block.tx_results.len() {
return Err(anyhow!("Transaction list and result list do not match"));
}

for (i, tx_result) in block.tx_results.into_iter().enumerate() {
let tx_hash = compute_tx_hash(block.txs.get(i).unwrap());

let block_events: Vec<Event> = tx_result
.events
.into_iter()
.map(|event| {
return Event {
event: Some(event),
transaction_hash: tx_hash.clone(),
};
})
.collect();

events.extend(block_events);
}

Ok(EventList {
events: events,
clock: Some(Clock {
id: hex::encode(block.hash),
number: block.height as u64,
timestamp: block.time,
}),
})
}

#[substreams::handlers::map]
fn index_events(events: EventList) -> Result<Keys, Error> {
let mut keys = Keys::default();

events.events.into_iter().for_each(|e| {
if let Some(ev) = e.event {
keys.keys.push(ev.r#type);
}
});

Ok(keys)
}

#[substreams::handlers::map]
fn filtered_events(query: String, events: EventList) -> Result<EventList, Error> {
let filtered: Vec<Event> = events
.events
.into_iter()
.filter(|e| {
if let Some(ev) = &e.event {
let mut keys = Vec::new();
keys.push(ev.r#type.clone());
matches_keys_in_parsed_expr(&keys, &query).expect("matching events from query")
} else {
false
}
})
.collect();

Ok(EventList {
events: filtered,
clock: events.clock,
})
}
22 changes: 20 additions & 2 deletions injective-common/substreams.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
specVersion: v0.1.0
package:
name: injective_common
version: v0.1.0
version: v0.1.1

network: cosmos

Expand Down Expand Up @@ -57,5 +57,23 @@ modules:
`filtered_events` reads from `all_events` and applies a filter on the event types, only outputing the events that match the filter.
The only operator that you should need to use this filter is the logical or `||`, because each event can only match one type.
- name: filtered_trx_by_events
kind: map
blockFilter:
module: index_events
query:
params: true
inputs:
- params: string
- map: all_events
- map: all_transactions
output:
type: proto:sf.substreams.cosmos.v1.TransactionList
doc: |
`filtered_trx_by_events` reads from `all_events` and `all_transactions` and applies a filter on the event types,
only outputing the transactions that match the filter. The only operator that you should need to use this filter
is the logical or `||`, because each event can only match one type.
params:
filtered_events: "message"
filtered_events: "message"
filtered_trx_by_events: "message"

0 comments on commit 85c5459

Please sign in to comment.