Skip to content

Commit

Permalink
fix subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
edouardparis committed Jan 29, 2025
1 parent 9242b06 commit 0987dfb
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 480 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion liana-gui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ chrono = "0.4.38"
# Used for managing internal bitcoind
base64 = "0.21"
bitcoin_hashes = "0.12"
reqwest = { version = "0.11", default-features=false, features = ["json", "rustls-tls"] }
reqwest = { version = "0.11", default-features=false, features = ["json", "rustls-tls", "stream"] }
rust-ini = "0.19.0"
rfd = "0.15.1"

Expand Down
5 changes: 2 additions & 3 deletions liana-gui/src/app/state/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ impl ExportModal {
if let Some(path) = &self.path {
match &self.state {
ExportState::Started | ExportState::Progress(_) => {
Some(iced::subscription::unfold(
Some(iced::Subscription::run_with_id(
"transactions",
export::State::new(self.daemon.clone(), Box::new(path.to_path_buf())),
export::export_subscription,
export::export_subscription(self.daemon.clone(), path.to_path_buf()),
))
}
_ => None,
Expand Down
155 changes: 51 additions & 104 deletions liana-gui/src/download.rs
Original file line number Diff line number Diff line change
@@ -1,135 +1,82 @@
// This is based on https://github.com/iced-rs/iced/blob/master/examples/download_progress/src/download.rs
// with some modifications to store the downloaded bytes in `Progress::Finished` and `State::Downloading`
// and to keep track of any download errors.
use iced::subscription;
use iced::futures::{SinkExt, Stream, StreamExt};
use iced::stream::try_channel;
use iced::Subscription;

use std::hash::Hash;
use std::sync::Arc;

// Just a little utility function
pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>(
id: I,
url: T,
) -> iced::Subscription<(I, Progress)> {
subscription::unfold(id, State::Ready(url.to_string()), move |state| {
download(id, state)
) -> iced::Subscription<(I, Result<Progress, DownloadError>)> {
Subscription::run_with_id(
id,
download(url.to_string()).map(move |progress| (id, progress)),
)
}

fn download(url: String) -> impl Stream<Item = Result<Progress, DownloadError>> {
try_channel(1, move |mut output| async move {
let response = reqwest::get(&url).await?;
let total = response
.content_length()
.ok_or(DownloadError::NoContentLength)?;

let _ = output.send(Progress::Downloading(0.0)).await;

let mut byte_stream = response.bytes_stream();
let mut downloaded = 0;
let mut bytes = Vec::new();

while let Some(next_bytes) = byte_stream.next().await {
let chunk = next_bytes?;
downloaded += chunk.len();
bytes.append(&mut chunk.to_vec());

let _ = output
.send(Progress::Downloading(
100.0 * downloaded as f32 / total as f32,
))
.await;
}

let _ = output.send(Progress::Finished(bytes)).await;

Ok(())
})
}

#[derive(Debug, Hash, Clone)]
pub struct Download<I> {
id: I,
url: String,
#[derive(Debug, Clone)]
pub enum Progress {
Downloading(f32),
Finished(Vec<u8>),
}

/// Possible errors with download.
#[derive(PartialEq, Eq, Debug, Clone)]
#[derive(Debug, Clone)]
pub enum DownloadError {
UnknownContentLength,
RequestError(String),
RequestFailed(Arc<reqwest::Error>),
NoContentLength,
}

impl std::fmt::Display for DownloadError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::UnknownContentLength => {
Self::NoContentLength => {
write!(f, "Response has unknown content length.")
}
Self::RequestError(e) => {
Self::RequestFailed(e) => {
write!(f, "Request error: '{}'.", e)
}
}
}
}

async fn download<I: Copy>(id: I, state: State) -> ((I, Progress), State) {
match state {
State::Ready(url) => {
let response = reqwest::get(&url).await;

match response {
Ok(response) => {
if let Some(total) = response.content_length() {
(
(id, Progress::Started),
State::Downloading {
response,
total,
downloaded: 0,
bytes: Vec::new(),
},
)
} else {
(
(id, Progress::Errored(DownloadError::UnknownContentLength)),
State::Finished,
)
}
}
Err(e) => (
(
id,
Progress::Errored(DownloadError::RequestError(e.to_string())),
),
State::Finished,
),
}
}
State::Downloading {
mut response,
total,
downloaded,
mut bytes,
} => match response.chunk().await {
Ok(Some(chunk)) => {
let downloaded = downloaded + chunk.len() as u64;

let percentage = (downloaded as f32 / total as f32) * 100.0;

bytes.append(&mut chunk.to_vec());

(
(id, Progress::Advanced(percentage)),
State::Downloading {
response,
total,
downloaded,
bytes,
},
)
}
Ok(None) => ((id, Progress::Finished(bytes)), State::Finished),
Err(e) => (
(
id,
Progress::Errored(DownloadError::RequestError(e.to_string())),
),
State::Finished,
),
},
State::Finished => {
// We do not let the stream die, as it would start a
// new download repeatedly if the user is not careful
// in case of errors.
iced::futures::future::pending().await
}
impl From<reqwest::Error> for DownloadError {
fn from(error: reqwest::Error) -> Self {
DownloadError::RequestFailed(Arc::new(error))
}
}

#[derive(Debug, Clone)]
pub enum Progress {
Started,
Advanced(f32),
Finished(Vec<u8>),
Errored(DownloadError),
}

pub enum State {
Ready(String),
Downloading {
response: reqwest::Response,
total: u64,
downloaded: u64,
bytes: Vec<u8>,
},
Finished,
}
93 changes: 58 additions & 35 deletions liana-gui/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use tokio::{
time::sleep,
};

use iced::futures::{SinkExt, Stream};

use crate::{
app::view,
daemon::{
Expand Down Expand Up @@ -343,42 +345,63 @@ impl State {
}
}

pub async fn export_subscription(mut state: State) -> (ExportProgress, State) {
match state.state() {
Status::Init => {
state.start().await;
}
Status::Stopped => {
sleep(time::Duration::from_millis(1000)).await;
return (ExportProgress::None, state);
}
Status::Running => { /* continue */ }
}
let msg = state.receiver.try_recv();
let disconnected = match msg {
Ok(m) => return (m, state),
Err(e) => match e {
std::sync::mpsc::TryRecvError::Empty => false,
std::sync::mpsc::TryRecvError::Disconnected => true,
},
};

let handle = match state.handle.take() {
Some(h) => h,
None => return (ExportProgress::Error(Error::HandleLost), state),
};
{
let h = handle.lock().expect("should not fail");
if h.is_finished() {
return (ExportProgress::Finished, state);
} else if disconnected {
return (ExportProgress::Error(Error::ChannelLost), state);
pub fn export_subscription(
daemon: Arc<dyn Daemon + Sync + Send>,
path: PathBuf,
) -> impl Stream<Item = ExportProgress> {
iced::stream::channel(1, move |mut output| async move {
let mut state = State::new(daemon, Box::new(path));
loop {
match state.state() {
Status::Init => {
state.start().await;
}
Status::Stopped => {
sleep(time::Duration::from_millis(1000)).await;
let _ = output.send(ExportProgress::None).await;
}
Status::Running => { /* continue */ }
}
let msg = state.receiver.try_recv();
let disconnected = match msg {
Ok(m) => {
let _ = output.send(m).await;
continue;
}
Err(e) => match e {
std::sync::mpsc::TryRecvError::Empty => false,
std::sync::mpsc::TryRecvError::Disconnected => true,
},
};

let handle = match state.handle.take() {
Some(h) => h,
None => {
let _ = output.send(ExportProgress::Error(Error::HandleLost)).await;
continue;
}
};
let msg = {
let h = handle.lock().expect("should not fail");
if h.is_finished() {
Some(ExportProgress::Finished)
} else if disconnected {
Some(ExportProgress::Error(Error::ChannelLost))
} else {
None
}
};
if let Some(msg) = msg {
let _ = output.send(msg).await;
continue;
}
// => release handle lock
state.handle = Some(handle);

sleep(time::Duration::from_millis(100)).await;
let _ = output.send(ExportProgress::None).await;
}
} // => release handle lock
state.handle = Some(handle);

sleep(time::Duration::from_millis(100)).await;
(ExportProgress::None, state)
})
}

pub async fn get_path() -> Option<PathBuf> {
Expand Down
Loading

0 comments on commit 0987dfb

Please sign in to comment.