From 834bbd70f85a5c53b32bbc3c36bc55f95da3759e Mon Sep 17 00:00:00 2001
From: Roman Zeyde <me@romanzey.de>
Date: Fri, 13 Aug 2021 20:34:40 +0300
Subject: [PATCH] Add blockchain.outpoint.subscribe RPC

---
 src/electrum.rs |  55 +++++++++++++++++--
 src/status.rs   | 137 +++++++++++++++++++++++++++++++++++++++++++++++-
 src/tracker.rs  |  10 +++-
 3 files changed, 196 insertions(+), 6 deletions(-)

diff --git a/src/electrum.rs b/src/electrum.rs
index fed999454..b9d8d57de 100644
--- a/src/electrum.rs
+++ b/src/electrum.rs
@@ -2,7 +2,7 @@ use anyhow::{bail, Context, Result};
 use bitcoin::{
     consensus::{deserialize, encode::serialize_hex},
     hashes::hex::FromHex,
-    BlockHash, Txid,
+    BlockHash, OutPoint, Txid,
 };
 use crossbeam_channel::Receiver;
 use rayon::prelude::*;
@@ -21,7 +21,7 @@ use crate::{
     merkle::Proof,
     metrics::{self, Histogram, Metrics},
     signals::Signal,
-    status::ScriptHashStatus,
+    status::{OutPointStatus, ScriptHashStatus},
     tracker::Tracker,
     types::ScriptHash,
 };
@@ -36,6 +36,7 @@ const UNSUBSCRIBED_QUERY_MESSAGE: &str = "your wallet uses less efficient method
 pub struct Client {
     tip: Option<BlockHash>,
     scripthashes: HashMap<ScriptHash, ScriptHashStatus>,
+    outpoints: HashMap<OutPoint, OutPointStatus>,
 }
 
 #[derive(Deserialize)]
@@ -185,7 +186,25 @@ impl Rpc {
                 }
             })
             .collect::<Result<Vec<Value>>>()
-            .context("failed to update status")?;
+            .context("failed to update scripthash status")?;
+
+        notifications.extend(
+            client
+                .outpoints
+                .par_iter_mut()
+                .filter_map(|(outpoint, status)| -> Option<Result<Value>> {
+                    match self.tracker.update_outpoint_status(status, &self.daemon) {
+                        Ok(true) => Some(Ok(notification(
+                            "blockchain.outpoint.subscribe",
+                            &[json!([outpoint.txid, outpoint.vout]), json!(status)],
+                        ))),
+                        Ok(false) => None, // outpoint status is the same
+                        Err(e) => Some(Err(e)),
+                    }
+                })
+                .collect::<Result<Vec<Value>>>()
+                .context("failed to update scripthash status")?,
+        );
 
         if let Some(old_tip) = client.tip {
             let new_tip = self.tracker.chain().tip();
@@ -350,6 +369,28 @@ impl Rpc {
         })
     }
 
+    fn outpoint_subscribe(&self, client: &mut Client, (txid, vout): (Txid, u32)) -> Result<Value> {
+        let outpoint = OutPoint::new(txid, vout);
+        Ok(match client.outpoints.entry(outpoint) {
+            Entry::Occupied(e) => json!(e.get()),
+            Entry::Vacant(e) => {
+                let outpoint = OutPoint::new(txid, vout);
+                let mut status = OutPointStatus::new(outpoint);
+                self.tracker
+                    .update_outpoint_status(&mut status, &self.daemon)?;
+                json!(e.insert(status))
+            }
+        })
+    }
+
+    fn outpoint_unsubscribe(
+        &self,
+        client: &mut Client,
+        (txid, vout): (Txid, u32),
+    ) -> Result<Value> {
+        Ok(json!(client.outpoints.remove(&OutPoint::new(txid, vout))))
+    }
+
     fn new_status(&self, scripthash: ScriptHash) -> Result<ScriptHashStatus> {
         let mut status = ScriptHashStatus::new(scripthash);
         self.tracker
@@ -548,6 +589,8 @@ impl Rpc {
                 Params::Features => self.features(),
                 Params::HeadersSubscribe => self.headers_subscribe(client),
                 Params::MempoolFeeHistogram => self.get_fee_histogram(),
+                Params::OutPointSubscribe(args) => self.outpoint_subscribe(client, *args),
+                Params::OutPointUnsubscribe(args) => self.outpoint_unsubscribe(client, *args),
                 Params::PeersSubscribe => Ok(json!([])),
                 Params::Ping => Ok(Value::Null),
                 Params::RelayFee => self.relayfee(),
@@ -572,12 +615,13 @@ enum Params {
     Banner,
     BlockHeader((usize,)),
     BlockHeaders((usize, usize)),
-    TransactionBroadcast((String,)),
     Donation,
     EstimateFee((u16,)),
     Features,
     HeadersSubscribe,
     MempoolFeeHistogram,
+    OutPointSubscribe((Txid, u32)), // TODO: support spk_hint
+    OutPointUnsubscribe((Txid, u32)),
     PeersSubscribe,
     Ping,
     RelayFee,
@@ -586,6 +630,7 @@ enum Params {
     ScriptHashListUnspent((ScriptHash,)),
     ScriptHashSubscribe((ScriptHash,)),
     ScriptHashUnsubscribe((ScriptHash,)),
+    TransactionBroadcast((String,)),
     TransactionGet(TxGetArgs),
     TransactionGetMerkle((Txid, usize)),
     TransactionFromPosition((usize, usize, bool)),
@@ -599,6 +644,8 @@ impl Params {
             "blockchain.block.headers" => Params::BlockHeaders(convert(params)?),
             "blockchain.estimatefee" => Params::EstimateFee(convert(params)?),
             "blockchain.headers.subscribe" => Params::HeadersSubscribe,
+            "blockchain.outpoint.subscribe" => Params::OutPointSubscribe(convert(params)?),
+            "blockchain.outpoint.unsubscribe" => Params::OutPointUnsubscribe(convert(params)?),
             "blockchain.relayfee" => Params::RelayFee,
             "blockchain.scripthash.get_balance" => Params::ScriptHashGetBalance(convert(params)?),
             "blockchain.scripthash.get_history" => Params::ScriptHashGetHistory(convert(params)?),
diff --git a/src/status.rs b/src/status.rs
index dd71be660..15c133bdd 100644
--- a/src/status.rs
+++ b/src/status.rs
@@ -4,7 +4,7 @@ use bitcoin::{
     Amount, Block, BlockHash, OutPoint, SignedAmount, Transaction, Txid,
 };
 use rayon::prelude::*;
-use serde::ser::{Serialize, Serializer};
+use serde::ser::{Serialize, SerializeMap, Serializer};
 
 use std::collections::{BTreeMap, HashMap, HashSet};
 use std::convert::TryFrom;
@@ -48,12 +48,26 @@ impl TxEntry {
 // Confirmation height of a transaction or its mempool state:
 // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-history
 // https://electrumx-spesmilo.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-get-mempool
+#[derive(Copy, Clone, Eq, PartialEq)]
 enum Height {
     Confirmed { height: usize },
     Unconfirmed { has_unconfirmed_inputs: bool },
 }
 
 impl Height {
+    fn from_blockhash(blockhash: BlockHash, chain: &Chain) -> Self {
+        let height = chain
+            .get_block_height(&blockhash)
+            .expect("missing block in chain");
+        Self::Confirmed { height }
+    }
+
+    fn unconfirmed(e: &crate::mempool::Entry) -> Self {
+        Self::Unconfirmed {
+            has_unconfirmed_inputs: e.has_unconfirmed_inputs,
+        }
+    }
+
     fn as_i64(&self) -> i64 {
         match self {
             Self::Confirmed { height } => i64::try_from(*height).unwrap(),
@@ -538,6 +552,127 @@ fn filter_block_txs<T: Send>(
         .into_iter()
 }
 
+pub(crate) struct OutPointStatus {
+    outpoint: OutPoint,
+    funding: Option<Height>,
+    spending: Option<(Txid, Height)>,
+    tip: BlockHash,
+}
+
+impl Serialize for OutPointStatus {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        let mut map = serializer.serialize_map(None)?;
+        if let Some(funding) = &self.funding {
+            map.serialize_entry("height", &funding)?;
+        }
+        if let Some((txid, height)) = &self.spending {
+            map.serialize_entry("spender_txhash", &txid)?;
+            map.serialize_entry("spender_height", &height)?;
+        }
+        map.end()
+    }
+}
+
+impl OutPointStatus {
+    pub(crate) fn new(outpoint: OutPoint) -> Self {
+        Self {
+            outpoint,
+            funding: None,
+            spending: None,
+            tip: BlockHash::all_zeros(),
+        }
+    }
+
+    pub(crate) fn sync(
+        &mut self,
+        index: &Index,
+        mempool: &Mempool,
+        daemon: &Daemon,
+    ) -> Result<bool> {
+        let funding = self.sync_funding(index, daemon, mempool)?;
+        let spending = self.sync_spending(index, daemon, mempool)?;
+        let same_status = (self.funding == funding) && (self.spending == spending);
+        self.funding = funding;
+        self.spending = spending;
+        self.tip = index.chain().tip();
+        Ok(!same_status)
+    }
+
+    /// Return true iff current tip became unconfirmed
+    fn is_reorg(&self, chain: &Chain) -> bool {
+        chain.get_block_height(&self.tip).is_none()
+    }
+
+    fn sync_funding(
+        &self,
+        index: &Index,
+        daemon: &Daemon,
+        mempool: &Mempool,
+    ) -> Result<Option<Height>> {
+        let chain = index.chain();
+        if !self.is_reorg(chain) {
+            if let Some(Height::Confirmed { .. }) = &self.funding {
+                return Ok(self.funding);
+            }
+        }
+        let mut confirmed = None;
+        daemon.for_blocks(
+            index.filter_by_txid(self.outpoint.txid),
+            |blockhash, block| {
+                if confirmed.is_none() {
+                    for tx in block.txdata {
+                        let txid = tx.txid();
+                        let output_len = u32::try_from(tx.output.len()).unwrap();
+                        if self.outpoint.txid == txid && self.outpoint.vout < output_len {
+                            confirmed = Some(Height::from_blockhash(blockhash, chain));
+                            return;
+                        }
+                    }
+                }
+            },
+        )?;
+        Ok(confirmed.or_else(|| mempool.get(&self.outpoint.txid).map(Height::unconfirmed)))
+    }
+
+    fn sync_spending(
+        &self,
+        index: &Index,
+        daemon: &Daemon,
+        mempool: &Mempool,
+    ) -> Result<Option<(Txid, Height)>> {
+        let chain = index.chain();
+        if !self.is_reorg(chain) {
+            if let Some((_, Height::Confirmed { .. })) = &self.spending {
+                return Ok(self.spending);
+            }
+        }
+        let spending_blockhashes = index.filter_by_spending(self.outpoint);
+        let mut confirmed = None;
+        daemon.for_blocks(spending_blockhashes, |blockhash, block| {
+            for tx in block.txdata {
+                for txi in &tx.input {
+                    if txi.previous_output == self.outpoint {
+                        // TODO: there should be only one spending input
+                        assert!(confirmed.is_none(), "double spend of {}", self.outpoint);
+                        confirmed = Some((tx.txid(), Height::from_blockhash(blockhash, chain)));
+                        return;
+                    }
+                }
+            }
+        })?;
+        Ok(confirmed.or_else(|| {
+            let entries = mempool.filter_by_spending(&self.outpoint);
+            assert!(entries.len() <= 1, "double spend of {}", self.outpoint);
+            entries
+                .first()
+                .map(|entry| (entry.txid, Height::unconfirmed(entry)))
+        }))
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::HistoryEntry;
diff --git a/src/tracker.rs b/src/tracker.rs
index 57c4b14d3..3508e1c6a 100644
--- a/src/tracker.rs
+++ b/src/tracker.rs
@@ -11,7 +11,7 @@ use crate::{
     mempool::{FeeHistogram, Mempool},
     metrics::Metrics,
     signals::ExitFlag,
-    status::{Balance, ScriptHashStatus, UnspentEntry},
+    status::{Balance, OutPointStatus, ScriptHashStatus, UnspentEntry},
 };
 
 /// Electrum protocol subscriptions' tracker
@@ -114,4 +114,12 @@ impl Tracker {
         })?;
         Ok(result)
     }
+
+    pub(crate) fn update_outpoint_status(
+        &self,
+        status: &mut OutPointStatus,
+        daemon: &Daemon,
+    ) -> Result<bool> {
+        status.sync(&self.index, &self.mempool, daemon)
+    }
 }