From 1cc6408c553ada71484709350039efbb85f6d693 Mon Sep 17 00:00:00 2001 From: canewsin Date: Wed, 30 Mar 2022 04:19:08 +0530 Subject: [PATCH] [Plugin] Block Storage for Files See https://github.com/canewsin/zeronet-rs/issues/10 --- Cargo.toml | 3 +- src/common.rs | 2 +- src/environment.rs | 7 +++ src/io/site.rs | 95 +++++++++++++++++++++++++------------ src/main.rs | 1 + src/plugins/blockstorage.rs | 28 +++++++++++ src/plugins/mod.rs | 3 ++ 7 files changed, 107 insertions(+), 32 deletions(-) create mode 100644 src/plugins/blockstorage.rs create mode 100644 src/plugins/mod.rs diff --git a/Cargo.toml b/Cargo.toml index 325b63d..afecd54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,9 @@ authors = ["canewsin "] edition = "2021" [features] -default = ["userio"] +default = ["userio", "blockstorage"] userio = [] +blockstorage = [] [dependencies] async-recursion = "1.0.0" diff --git a/src/common.rs b/src/common.rs index e2d66f6..859f02d 100644 --- a/src/common.rs +++ b/src/common.rs @@ -149,7 +149,7 @@ pub async fn site_need_file(site: &mut Site, inner_path: String) -> Result<(), E if !download { println!("Inner Path Not Exists in content.json"); } else { - let result = site.need_file(inner_path.clone(), None).await; + let result = site.need_file(inner_path.clone(), None, None).await; if let Err(e) = &result { println!("Error : {:?}", e); } else { diff --git a/src/environment.rs b/src/environment.rs index 0d875b8..01c3a42 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -71,6 +71,7 @@ pub struct Environment { // pub homepage: String, pub lang: String, // pub dist: String, + pub use_block_storage: bool, } fn get_matches() -> ArgMatches { @@ -216,6 +217,10 @@ fn get_matches() -> ArgMatches { // .long("broadcast_port") // .default_value("1544") // .help("Port to broadcast local discovery messages"), + Arg::new("USE_BLOCK_STORAGE") + .long("use_block_storage") + .short('b') + .help("Use Block Storage for Files instead of Normal Site Storage"), ]) .subcommands(sub_commands) .get_matches() @@ -249,6 +254,7 @@ pub fn get_env(matches: &ArgMatches) -> Result { } else { 10000 + rand::random::() % 10000 }; + let use_block_storage = matches.is_present("USE_BLOCK_STORAGE"); // let ui_ip = matches.value_of("UI_IP").unwrap(); // let ui_port: usize = matches.value_of("UI_PORT").unwrap().parse()?; // let broadcast_port: usize = matches.value_of("BROADCAST_PORT").unwrap().parse()?; @@ -266,6 +272,7 @@ pub fn get_env(matches: &ArgMatches) -> Result { // homepage: String::from(matches.value_of("HOMEPAGE").unwrap()), lang: String::from(matches.value_of("LANGUAGE").unwrap()), // dist: String::from(matches.value_of("DIST_TYPE").unwrap()), + use_block_storage, }; Ok(env) } diff --git a/src/io/site.rs b/src/io/site.rs index c1f3938..e6a3338 100644 --- a/src/io/site.rs +++ b/src/io/site.rs @@ -8,16 +8,17 @@ use log::*; use serde_bytes::ByteBuf; use tokio::{ fs::{self, File}, - io::{AsyncReadExt, AsyncWriteExt}, + io::AsyncWriteExt, }; -use zerucontent::Content; +use zerucontent::{Content, File as ZFile}; use crate::{ core::{error::*, io::*, peer::*, site::*}, discovery::tracker::IpPort, environment::ENV, io::utils::check_file_integrity, + plugins::BlockStorage, protocol::{api::Request, Protocol}, }; @@ -52,50 +53,76 @@ impl Site { &self, inner_path: String, peer: &mut Peer, - ) -> Result { - let path = &self.site_path().join(&inner_path); + ) -> Result { let message = Protocol::new(peer.connection_mut().unwrap()) - .get_file(self.address(), inner_path) - .await?; - let parent = path.parent().unwrap(); - if !parent.is_dir() { - fs::create_dir_all(parent).await?; + .get_file(self.address(), inner_path.clone()) + .await; + if let Err(e) = &message { + Err(format!("Error Downloading File from Peer, Error : {:?}", e) + .as_str() + .into()) + } else { + Ok(message.unwrap().body) } - let mut file = File::create(path).await?; - file.write_all(&message.body).await?; - Ok(true) } - pub async fn need_file(&self, inner_path: String, _peer: Option) -> Result { - self.download_file(inner_path, _peer).await + pub async fn need_file( + &self, + inner_path: String, + file: Option, + _peer: Option, + ) -> Result { + self.download_file(inner_path, file, _peer).await } - async fn download_file(&self, inner_path: String, _peer: Option) -> Result { - let path = &self.site_path().join(&inner_path); + async fn download_file( + &self, + inner_path: String, + file: Option, + _peer: Option, + ) -> Result { + let (parent, path) = if let Some(file) = file { + if cfg!(feature = "blockstorage") && Self::use_block_storage() { + let file_path = BlockStorage::get_block_file_path(self, &file.sha512); + let parent = BlockStorage::get_block_storage_path(self); + (parent, file_path) + } else { + let path = self.site_path().join(&inner_path); + (path.parent().unwrap().into(), path) + } + } else { + let path = self.site_path().join(&inner_path); + (path.parent().unwrap().into(), path) + }; + if !parent.is_dir() { + fs::create_dir_all(parent).await?; + } if path.is_file() { - let mut file = File::open(path).await?; - let mut buf = Vec::new(); - file.read_to_end(&mut buf).await?; + //TODO! Verify file integrity here. return Ok(true); } //TODO!: Download from multiple peers let mut peer = self.peers.values().next().unwrap().clone(); - Self::download_file_from_peer(self, inner_path, &mut peer).await + let bytes = Self::download_file_from_peer(self, inner_path, &mut peer).await?; + let mut file = File::create(path).await?; + file.write_all(&bytes).await?; + + Ok(true) } async fn download_site_files(&self) -> Result<(), Error> { let files = self.content().unwrap().files; let mut tasks = Vec::new(); let mut inner_paths = Vec::new(); - for (inner_path, _file) in files { + for (inner_path, file) in files { inner_paths.push(inner_path.clone()); - let task = self.download_file(inner_path, None); + let task = self.download_file(inner_path, Some(file), None); tasks.push(task); } let includes = self.content().unwrap().includes; for (inner_path, _file) in includes { inner_paths.push(inner_path.clone()); - let task = self.download_file(inner_path, None); + let task = self.download_file(inner_path, None, None); tasks.push(task); } //TODO!: Other client may not have an up-to-date site files @@ -107,7 +134,7 @@ impl Site { continue; } user_data_files.push(inner_path.clone()); - let task = self.download_file(inner_path, None); + let task = self.download_file(inner_path, None, None); tasks.push(task); } let mut res = join_all(tasks).await; @@ -133,10 +160,12 @@ impl Site { let path = Path::new(&content.inner_path); if let Some(parent) = path.parent() { let files_inner = content.files.clone(); - for (path, _file) in files_inner { - files.push( - self.download_file(parent.join(path).to_str().unwrap().to_owned(), None), - ); + for (path, file) in files_inner { + files.push(self.download_file( + parent.join(path).to_str().unwrap().to_owned(), + Some(file), + None, + )); } } }); @@ -164,7 +193,13 @@ impl Site { let mut tasks = Vec::new(); for (inner_path, file) in files { let hash = file.sha512.clone(); - let task = check_file_integrity(self.site_path(), inner_path, hash); + let (site_path, inner_path) = if cfg!(feature = "blockstorage") { + let path = BlockStorage::get_block_storage_path(self); + (path, hash.clone()) + } else { + (self.site_path(), inner_path) + }; + let task = check_file_integrity(site_path, inner_path, hash); tasks.push(task); } //TODO!: Verify includes, user data files @@ -246,7 +281,7 @@ impl SiteIO for Site { } let content_exists = self.content_path().is_file(); if !content_exists { - Self::download_file(self, "content.json".into(), None).await?; + Self::download_file(self, "content.json".into(), None, None).await?; } let verified = self.load_content().await?; if verified { diff --git a/src/main.rs b/src/main.rs index 79ab1bd..c82c9aa 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ pub mod discovery; pub mod environment; pub mod io; pub mod net; +pub mod plugins; pub mod protocol; pub mod utils; diff --git a/src/plugins/blockstorage.rs b/src/plugins/blockstorage.rs new file mode 100644 index 0000000..e7a6827 --- /dev/null +++ b/src/plugins/blockstorage.rs @@ -0,0 +1,28 @@ +use crate::{ + core::{io::SiteIO, site::Site}, + environment::ENV, +}; +use std::path::PathBuf; + +pub trait BlockStorage: SiteIO { + fn use_block_storage() -> bool; + + fn get_block_storage_path(&self) -> PathBuf; + + fn get_block_file_path(&self, block_id: &str) -> PathBuf; +} + +#[cfg(feature = "blockstorage")] +impl BlockStorage for Site { + fn use_block_storage() -> bool { + ENV.use_block_storage + } + + fn get_block_storage_path(&self) -> PathBuf { + self.data_path.join("blockstorage") + } + + fn get_block_file_path(&self, block_id: &str) -> PathBuf { + self.get_block_storage_path().join(block_id) + } +} diff --git a/src/plugins/mod.rs b/src/plugins/mod.rs new file mode 100644 index 0000000..4523549 --- /dev/null +++ b/src/plugins/mod.rs @@ -0,0 +1,3 @@ +mod blockstorage; + +pub use blockstorage::BlockStorage;