diff --git a/src/liquidity.rs b/src/liquidity.rs index 57b92f612..92a4da617 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -17,6 +17,10 @@ use lightning_invoice::{Bolt11Invoice, InvoiceBuilder, RoutingFees}; use lightning_liquidity::events::Event; use lightning_liquidity::lsps0::ser::RequestId; use lightning_liquidity::lsps1::client::LSPS1ClientConfig; +use lightning_liquidity::lsps1::event::LSPS1ClientEvent; +use lightning_liquidity::lsps1::msgs::{ + ChannelInfo, LSPS1Options, OrderId, OrderParameters, PaymentInfo, +}; use lightning_liquidity::lsps2::client::LSPS2ClientConfig; use lightning_liquidity::lsps2::event::LSPS2ClientEvent; use lightning_liquidity::lsps2::msgs::OpeningFeeParams; @@ -35,11 +39,18 @@ use std::time::Duration; const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; +const LSPS1_DEFAULT_REQUIRED_CHANNEL_CONF: u16 = 0; + struct LSPS1Service { node_id: PublicKey, address: SocketAddress, token: Option, client_config: LSPS1ClientConfig, + pending_opening_params_requests: + Mutex>>, + pending_create_order_requests: Mutex>>, + pending_check_order_status_requests: + Mutex>>, } struct LSPS2Service { @@ -90,7 +101,18 @@ where ) -> &mut Self { // TODO: allow to set max_channel_fees_msat let client_config = LSPS1ClientConfig { max_channel_fees_msat: None }; - self.lsps1_service = Some(LSPS1Service { node_id, address, token, client_config }); + let pending_opening_params_requests = Mutex::new(HashMap::new()); + let pending_create_order_requests = Mutex::new(HashMap::new()); + let pending_check_order_status_requests = Mutex::new(HashMap::new()); + self.lsps1_service = Some(LSPS1Service { + node_id, + address, + token, + client_config, + pending_opening_params_requests, + pending_create_order_requests, + pending_check_order_status_requests, + }); self } @@ -174,6 +196,175 @@ where pub(crate) async fn handle_next_event(&self) { match self.liquidity_manager.next_event_async().await { + Event::LSPS1Client(LSPS1ClientEvent::SupportedOptionsReady { + request_id, + counterparty_node_id, + supported_options, + }) => { + if let Some(lsps1_service) = self.lsps1_service.as_ref() { + if counterparty_node_id != lsps1_service.node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + if let Some(sender) = lsps1_service + .pending_opening_params_requests + .lock() + .unwrap() + .remove(&request_id) + { + let response = LSPS1OpeningParamsResponse { supported_options }; + + match sender.send(response) { + Ok(()) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!( + self.logger, + "Received unexpected LSPS1Client::SupportedOptionsReady event!" + ); + } + }, + Event::LSPS1Client(LSPS1ClientEvent::OrderCreated { + request_id, + counterparty_node_id, + order_id, + order, + payment, + channel, + }) => { + if let Some(lsps1_service) = self.lsps1_service.as_ref() { + if counterparty_node_id != lsps1_service.node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + if let Some(sender) = lsps1_service + .pending_create_order_requests + .lock() + .unwrap() + .remove(&request_id) + { + let response = LSPS1OrderStatus { + order_id, + order_params: order, + payment_options: payment, + channel_state: channel, + }; + + match sender.send(response) { + Ok(()) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!(self.logger, "Received unexpected LSPS1Client::OrderCreated event!"); + } + }, + Event::LSPS1Client(LSPS1ClientEvent::OrderStatus { + request_id, + counterparty_node_id, + order_id, + order, + payment, + channel, + }) => { + if let Some(lsps1_service) = self.lsps1_service.as_ref() { + if counterparty_node_id != lsps1_service.node_id { + debug_assert!( + false, + "Received response from unexpected LSP counterparty. This should never happen." + ); + log_error!( + self.logger, + "Received response from unexpected LSP counterparty. This should never happen." + ); + return; + } + + if let Some(sender) = lsps1_service + .pending_check_order_status_requests + .lock() + .unwrap() + .remove(&request_id) + { + let response = LSPS1OrderStatus { + order_id, + order_params: order, + payment_options: payment, + channel_state: channel, + }; + + match sender.send(response) { + Ok(()) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to handle response from liquidity service: {:?}", + e + ); + }, + } + } else { + debug_assert!( + false, + "Received response from liquidity service for unknown request." + ); + log_error!( + self.logger, + "Received response from liquidity service for unknown request." + ); + } + } else { + log_error!(self.logger, "Received unexpected LSPS1Client::OrderStatus event!"); + } + }, Event::LSPS2Client(LSPS2ClientEvent::OpeningParametersReady { request_id, counterparty_node_id, @@ -282,6 +473,166 @@ where } } + pub(crate) async fn lsps1_request_opening_params( + &self, + ) -> Result { + let lsps1_service = self.lsps1_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS1 liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (request_sender, request_receiver) = oneshot::channel(); + { + let mut pending_opening_params_requests_lock = + lsps1_service.pending_opening_params_requests.lock().unwrap(); + let request_id = client_handler.request_supported_options(lsps1_service.node_id); + pending_opening_params_requests_lock.insert(request_id, request_sender); + } + + tokio::time::timeout(Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), request_receiver) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + }) + } + + pub(crate) async fn lsps1_request_channel( + &self, lsp_balance_sat: u64, client_balance_sat: u64, channel_expiry_blocks: u32, + announce_channel: bool, refund_address: bitcoin::Address, + ) -> Result { + let lsps1_service = self.lsps1_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS1 liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let lsp_limits = self.lsps1_request_opening_params().await?.supported_options; + let channel_size_sat = lsp_balance_sat + client_balance_sat; + + if channel_size_sat < lsp_limits.min_channel_balance_sat + || channel_size_sat > lsp_limits.max_channel_balance_sat + { + log_error!( + self.logger, + "Requested channel size doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", + lsp_limits.min_channel_balance_sat, + lsp_limits.max_channel_balance_sat + ); + return Err(Error::LiquidityRequestFailed); + } + + if lsp_balance_sat < lsp_limits.min_initial_lsp_balance_sat + || lsp_balance_sat > lsp_limits.max_initial_lsp_balance_sat + { + log_error!( + self.logger, + "Requested LSP-side balance doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", + lsp_limits.min_initial_lsp_balance_sat, + lsp_limits.max_initial_lsp_balance_sat + ); + return Err(Error::LiquidityRequestFailed); + } + + if client_balance_sat < lsp_limits.min_initial_client_balance_sat + || client_balance_sat > lsp_limits.max_initial_client_balance_sat + { + log_error!( + self.logger, + "Requested client-side balance doesn't meet the LSP-provided limits (min: {}sat, max: {}sat).", + lsp_limits.min_initial_client_balance_sat, + lsp_limits.max_initial_client_balance_sat + ); + return Err(Error::LiquidityRequestFailed); + } + + let order_params = OrderParameters { + lsp_balance_sat, + client_balance_sat, + required_channel_confirmations: LSPS1_DEFAULT_REQUIRED_CHANNEL_CONF, + funding_confirms_within_blocks: lsp_limits.min_funding_confirms_within_blocks, + channel_expiry_blocks, + token: lsps1_service.token.clone(), + announce_channel, + }; + + let (request_sender, request_receiver) = oneshot::channel(); + { + let mut pending_create_order_requests_lock = + lsps1_service.pending_create_order_requests.lock().unwrap(); + let request_id = client_handler.create_order( + &lsps1_service.node_id, + order_params.clone(), + Some(refund_address), + ); + pending_create_order_requests_lock.insert(request_id, request_sender); + } + + let response = tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + })?; + + if response.order_params != order_params { + log_error!( + self.logger, + "Aborting LSPS1 request as LSP-provided parameters don't match our order." + ); + return Err(Error::LiquidityRequestFailed); + } + + Ok(response) + } + + pub(crate) async fn lsps1_check_order_status( + &self, order_id: OrderId, + ) -> Result { + let lsps1_service = self.lsps1_service.as_ref().ok_or(Error::LiquiditySourceUnavailable)?; + let client_handler = self.liquidity_manager.lsps1_client_handler().ok_or_else(|| { + log_error!(self.logger, "LSPS1 liquidity client was not configured.",); + Error::LiquiditySourceUnavailable + })?; + + let (request_sender, request_receiver) = oneshot::channel(); + { + let mut pending_check_order_status_requests_lock = + lsps1_service.pending_check_order_status_requests.lock().unwrap(); + let request_id = client_handler.check_order_status(&lsps1_service.node_id, order_id); + pending_check_order_status_requests_lock.insert(request_id, request_sender); + } + + let response = tokio::time::timeout( + Duration::from_secs(LIQUIDITY_REQUEST_TIMEOUT_SECS), + request_receiver, + ) + .await + .map_err(|e| { + log_error!(self.logger, "Liquidity request timed out: {}", e); + Error::LiquidityRequestFailed + })? + .map_err(|e| { + log_error!(self.logger, "Failed to handle response from liquidity service: {}", e); + Error::LiquidityRequestFailed + })?; + + Ok(response) + } + pub(crate) async fn lsps2_receive_to_jit_channel( &self, amount_msat: u64, description: &str, expiry_secs: u32, max_total_lsp_fee_limit_msat: Option, @@ -515,6 +866,24 @@ where } } +#[derive(Debug, Clone)] +pub(crate) struct LSPS1OpeningParamsResponse { + supported_options: LSPS1Options, +} + +/// Represents the status of an LSPS1 channel request. +#[derive(Debug, Clone)] +pub struct LSPS1OrderStatus { + /// The id of the channel order. + pub order_id: OrderId, + /// The parameters of channel order. + pub order_params: OrderParameters, + /// Contains details about how to pay for the order. + pub payment_options: PaymentInfo, + /// Contains information about the channel state. + pub channel_state: Option, +} + #[derive(Debug, Clone)] pub(crate) struct LSPS2FeeResponse { opening_fee_params_menu: Vec,