Skip to content

Commit

Permalink
[Plugin] Block Storage for Files
Browse files Browse the repository at this point in the history
See #10
  • Loading branch information
canewsin committed Mar 29, 2022
1 parent 2fbde65 commit 1cc6408
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 32 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ authors = ["canewsin <[email protected]>"]
edition = "2021"

[features]
default = ["userio"]
default = ["userio", "blockstorage"]
userio = []
blockstorage = []

[dependencies]
async-recursion = "1.0.0"
Expand Down
2 changes: 1 addition & 1 deletion src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ pub async fn site_need_file(site: &mut Site, inner_path: String) -> Result<(), E
if !download {
println!("Inner Path Not Exists in content.json");
} else {
let result = site.need_file(inner_path.clone(), None).await;
let result = site.need_file(inner_path.clone(), None, None).await;
if let Err(e) = &result {
println!("Error : {:?}", e);
} else {
Expand Down
7 changes: 7 additions & 0 deletions src/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct Environment {
// pub homepage: String,
pub lang: String,
// pub dist: String,
pub use_block_storage: bool,
}

fn get_matches() -> ArgMatches {
Expand Down Expand Up @@ -216,6 +217,10 @@ fn get_matches() -> ArgMatches {
// .long("broadcast_port")
// .default_value("1544")
// .help("Port to broadcast local discovery messages"),
Arg::new("USE_BLOCK_STORAGE")
.long("use_block_storage")
.short('b')
.help("Use Block Storage for Files instead of Normal Site Storage"),
])
.subcommands(sub_commands)
.get_matches()
Expand Down Expand Up @@ -249,6 +254,7 @@ pub fn get_env(matches: &ArgMatches) -> Result<Environment, Error> {
} else {
10000 + rand::random::<u16>() % 10000
};
let use_block_storage = matches.is_present("USE_BLOCK_STORAGE");
// let ui_ip = matches.value_of("UI_IP").unwrap();
// let ui_port: usize = matches.value_of("UI_PORT").unwrap().parse()?;
// let broadcast_port: usize = matches.value_of("BROADCAST_PORT").unwrap().parse()?;
Expand All @@ -266,6 +272,7 @@ pub fn get_env(matches: &ArgMatches) -> Result<Environment, Error> {
// homepage: String::from(matches.value_of("HOMEPAGE").unwrap()),
lang: String::from(matches.value_of("LANGUAGE").unwrap()),
// dist: String::from(matches.value_of("DIST_TYPE").unwrap()),
use_block_storage,
};
Ok(env)
}
Expand Down
95 changes: 65 additions & 30 deletions src/io/site.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ use log::*;
use serde_bytes::ByteBuf;
use tokio::{
fs::{self, File},
io::{AsyncReadExt, AsyncWriteExt},
io::AsyncWriteExt,
};

use zerucontent::Content;
use zerucontent::{Content, File as ZFile};

use crate::{
core::{error::*, io::*, peer::*, site::*},
discovery::tracker::IpPort,
environment::ENV,
io::utils::check_file_integrity,
plugins::BlockStorage,
protocol::{api::Request, Protocol},
};

Expand Down Expand Up @@ -52,50 +53,76 @@ impl Site {
&self,
inner_path: String,
peer: &mut Peer,
) -> Result<bool, Error> {
let path = &self.site_path().join(&inner_path);
) -> Result<ByteBuf, Error> {
let message = Protocol::new(peer.connection_mut().unwrap())
.get_file(self.address(), inner_path)
.await?;
let parent = path.parent().unwrap();
if !parent.is_dir() {
fs::create_dir_all(parent).await?;
.get_file(self.address(), inner_path.clone())
.await;
if let Err(e) = &message {
Err(format!("Error Downloading File from Peer, Error : {:?}", e)
.as_str()
.into())
} else {
Ok(message.unwrap().body)
}
let mut file = File::create(path).await?;
file.write_all(&message.body).await?;
Ok(true)
}

pub async fn need_file(&self, inner_path: String, _peer: Option<Peer>) -> Result<bool, Error> {
self.download_file(inner_path, _peer).await
pub async fn need_file(
&self,
inner_path: String,
file: Option<ZFile>,
_peer: Option<Peer>,
) -> Result<bool, Error> {
self.download_file(inner_path, file, _peer).await
}

async fn download_file(&self, inner_path: String, _peer: Option<Peer>) -> Result<bool, Error> {
let path = &self.site_path().join(&inner_path);
async fn download_file(
&self,
inner_path: String,
file: Option<ZFile>,
_peer: Option<Peer>,
) -> Result<bool, Error> {
let (parent, path) = if let Some(file) = file {
if cfg!(feature = "blockstorage") && Self::use_block_storage() {
let file_path = BlockStorage::get_block_file_path(self, &file.sha512);
let parent = BlockStorage::get_block_storage_path(self);
(parent, file_path)
} else {
let path = self.site_path().join(&inner_path);
(path.parent().unwrap().into(), path)
}
} else {
let path = self.site_path().join(&inner_path);
(path.parent().unwrap().into(), path)
};
if !parent.is_dir() {
fs::create_dir_all(parent).await?;
}
if path.is_file() {
let mut file = File::open(path).await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
//TODO! Verify file integrity here.
return Ok(true);
}
//TODO!: Download from multiple peers
let mut peer = self.peers.values().next().unwrap().clone();
Self::download_file_from_peer(self, inner_path, &mut peer).await
let bytes = Self::download_file_from_peer(self, inner_path, &mut peer).await?;
let mut file = File::create(path).await?;
file.write_all(&bytes).await?;

Ok(true)
}

async fn download_site_files(&self) -> Result<(), Error> {
let files = self.content().unwrap().files;
let mut tasks = Vec::new();
let mut inner_paths = Vec::new();
for (inner_path, _file) in files {
for (inner_path, file) in files {
inner_paths.push(inner_path.clone());
let task = self.download_file(inner_path, None);
let task = self.download_file(inner_path, Some(file), None);
tasks.push(task);
}
let includes = self.content().unwrap().includes;
for (inner_path, _file) in includes {
inner_paths.push(inner_path.clone());
let task = self.download_file(inner_path, None);
let task = self.download_file(inner_path, None, None);
tasks.push(task);
}
//TODO!: Other client may not have an up-to-date site files
Expand All @@ -107,7 +134,7 @@ impl Site {
continue;
}
user_data_files.push(inner_path.clone());
let task = self.download_file(inner_path, None);
let task = self.download_file(inner_path, None, None);
tasks.push(task);
}
let mut res = join_all(tasks).await;
Expand All @@ -133,10 +160,12 @@ impl Site {
let path = Path::new(&content.inner_path);
if let Some(parent) = path.parent() {
let files_inner = content.files.clone();
for (path, _file) in files_inner {
files.push(
self.download_file(parent.join(path).to_str().unwrap().to_owned(), None),
);
for (path, file) in files_inner {
files.push(self.download_file(
parent.join(path).to_str().unwrap().to_owned(),
Some(file),
None,
));
}
}
});
Expand Down Expand Up @@ -164,7 +193,13 @@ impl Site {
let mut tasks = Vec::new();
for (inner_path, file) in files {
let hash = file.sha512.clone();
let task = check_file_integrity(self.site_path(), inner_path, hash);
let (site_path, inner_path) = if cfg!(feature = "blockstorage") {
let path = BlockStorage::get_block_storage_path(self);
(path, hash.clone())
} else {
(self.site_path(), inner_path)
};
let task = check_file_integrity(site_path, inner_path, hash);
tasks.push(task);
}
//TODO!: Verify includes, user data files
Expand Down Expand Up @@ -246,7 +281,7 @@ impl SiteIO for Site {
}
let content_exists = self.content_path().is_file();
if !content_exists {
Self::download_file(self, "content.json".into(), None).await?;
Self::download_file(self, "content.json".into(), None, None).await?;
}
let verified = self.load_content().await?;
if verified {
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod discovery;
pub mod environment;
pub mod io;
pub mod net;
pub mod plugins;
pub mod protocol;
pub mod utils;

Expand Down
28 changes: 28 additions & 0 deletions src/plugins/blockstorage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::{
core::{io::SiteIO, site::Site},
environment::ENV,
};
use std::path::PathBuf;

pub trait BlockStorage: SiteIO {
fn use_block_storage() -> bool;

fn get_block_storage_path(&self) -> PathBuf;

fn get_block_file_path(&self, block_id: &str) -> PathBuf;
}

#[cfg(feature = "blockstorage")]
impl BlockStorage for Site {
fn use_block_storage() -> bool {
ENV.use_block_storage
}

fn get_block_storage_path(&self) -> PathBuf {
self.data_path.join("blockstorage")
}

fn get_block_file_path(&self, block_id: &str) -> PathBuf {
self.get_block_storage_path().join(block_id)
}
}
3 changes: 3 additions & 0 deletions src/plugins/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod blockstorage;

pub use blockstorage::BlockStorage;

0 comments on commit 1cc6408

Please sign in to comment.