Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New p2p design #2838

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
544 changes: 283 additions & 261 deletions Cargo.lock

Large diffs are not rendered by default.

47 changes: 25 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,47 +23,50 @@ rust-version = "1.81"
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "4e4565bee4" }

# Third party dependencies used by one or more of our crates
anyhow = "1.0.94"
async-channel = "2.3"
async-stream = "0.3.6"
async-trait = "0.1.83"
axum = "0.7.7"
axum-extra = "0.9.4"
axum = "0.7.9"
axum-extra = "0.9.6"
base64 = "0.22.1"
blake3 = "1.5.4"
bytes = "1.7.1" # Update blocked by hyper
chrono = "0.4.38"
blake3 = "1.5.5"
bytes = "1.9.0"
chrono = "0.4.39"
ed25519-dalek = "2.1"
flume = "0.11.0"
futures = "0.3.31"
futures-concurrency = "7.6"
futures-concurrency = "7.6.2"
globset = "0.4.15"
http = "1.1"
hyper = "1.5"
image = "0.25.4"
http = "1.2.0"
hyper = "1.5.2"
image = "0.25.5"
iroh = "0.29.0"
itertools = "0.13.0"
lending-stream = "1.0"
libc = "0.2.159"
libc = "0.2.169"
mimalloc = "0.1.43"
normpath = "1.3"
pin-project-lite = "0.2.14"
pin-project-lite = "0.2.15"
quic-rpc = "0.17.3"
rand = "0.9.0-alpha.2"
regex = "1.11"
reqwest = { version = "0.12.8", default-features = false }
regex = "1.11.1"
reqwest = { version = "0.12.9", default-features = false }
rmp = "0.8.14"
rmp-serde = "1.3"
rmpv = { version = "1.3", features = ["with-serde"] }
serde = "1.0"
serde_json = "1.0"
serde = "1.0.216"
serde_json = "1.0.133"
specta = "=2.0.0-rc.20"
strum = "0.26"
strum_macros = "0.26"
tempfile = "3.13"
thiserror = "1.0"
tokio = "1.40"
tokio-stream = "0.1.16"
tokio-util = "0.7.12"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tempfile = "3.14.0"
thiserror = "2.0.8"
tokio = "1.42.0"
tokio-stream = "0.1.17"
tokio-util = "0.7.13"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
tracing-test = "0.2.5"
uhlc = "0.8.0" # Must follow version used by specta
uuid = "1.10" # Must follow version used by specta
Expand Down
8 changes: 4 additions & 4 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
sd-file-ext = { path = "../crates/file-ext" }
sd-images = { path = "../crates/images", features = ["rspc", "serde", "specta"] }
sd-media-metadata = { path = "../crates/media-metadata" }
sd-p2p = { path = "../crates/p2p", features = ["specta"] }
sd-p2p-block = { path = "../crates/p2p/crates/block" }
sd-p2p-proto = { path = "../crates/p2p/crates/proto" }
sd-p2p-tunnel = { path = "../crates/p2p/crates/tunnel" }
sd-old-p2p = { path = "../crates/old-p2p", features = ["specta"] }
sd-old-p2p-block = { path = "../crates/old-p2p/crates/block" }
sd-old-p2p-proto = { path = "../crates/old-p2p/crates/proto" }
sd-old-p2p-tunnel = { path = "../crates/old-p2p/crates/tunnel" }
sd-prisma = { path = "../crates/prisma" }
sd-sync = { path = "../crates/sync" }
sd-task-system = { path = "../crates/task-system" }
Expand Down
12 changes: 6 additions & 6 deletions core/crates/cloud-services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ sd-prisma = { path = "../../../crates/prisma" }
sd-utils = { path = "../../../crates/utils" }

# Workspace dependencies
anyhow = { workspace = true }
async-stream = { workspace = true }
base64 = { workspace = true }
blake3 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
flume = { workspace = true }
futures = { workspace = true }
futures-concurrency = { workspace = true }
iroh = { workspace = true, features = ["discovery-local-network"] }
quic-rpc = { workspace = true, features = ["iroh-transport", "quinn-transport"] }
rmp-serde = { workspace = true }
rspc = { workspace = true }
serde = { workspace = true, features = ["derive"] }
Expand All @@ -37,12 +40,9 @@ uuid = { workspace = true, features = ["serde"] }
zeroize = { workspace = true }

# External dependencies
anyhow = "1.0.86"
dashmap = "6.1.0"
iroh = { version = "0.29.0", features = ["discovery-local-network"] }
paste = "=1.0.15"
quic-rpc = { version = "0.17.1", features = ["iroh-transport", "quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.12" }
dashmap = "6.1.0"
paste = "=1.0.15"
quinn = { package = "iroh-quinn", version = "0.12" }
# Using whatever version of reqwest that reqwest-middleware uses, just putting here to enable some features
reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "stream"] }
reqwest-middleware = { version = "0.4", features = ["json"] }
Expand Down
20 changes: 11 additions & 9 deletions core/crates/cloud-services/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
use crate::p2p::{NotifyUser, UserResponse};

use sd_cloud_schema::{Client, Request, Response, ServicesALPN};
use sd_cloud_schema::{Client, Service, ServicesALPN};

use std::{net::SocketAddr, sync::Arc, time::Duration};

use futures::Stream;
use iroh::relay::RelayUrl;
use quic_rpc::{transport::quinn::QuinnConnector, RpcClient, RpcMessage};
use quic_rpc::{client::QuinnConnector, RpcClient};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
use reqwest::{IntoUrl, Url};
use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware};
use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
// use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::sync::{Mutex, RwLock};
use tracing::warn;

use super::{
error::Error, key_manager::KeyManager, p2p::CloudP2P, token_refresher::TokenRefresher,
};

pub type CloudServicesClient = Client<QuinnConnector<Service>>;

#[derive(Debug, Default, Clone)]
enum ClientState<In: RpcMessage, Out: RpcMessage> {
enum ClientState {
#[default]
NotConnected,
Connected(Client<QuinnConnector<In, Out>>),
Connected(CloudServicesClient),
}

/// Cloud services are a optional feature that allows you to interact with the cloud services
Expand All @@ -35,7 +37,7 @@ enum ClientState<In: RpcMessage, Out: RpcMessage> {
/// that core can always operate without the cloud services.
#[derive(Debug)]
pub struct CloudServices {
client_state: Arc<RwLock<ClientState<Response, Request>>>,
client_state: Arc<RwLock<ClientState>>,
get_cloud_api_address: Url,
http_client: ClientWithMiddleware,
domain_name: String,
Expand Down Expand Up @@ -158,7 +160,7 @@ impl CloudServices {
http_client: &ClientWithMiddleware,
get_cloud_api_address: Url,
domain_name: String,
) -> Result<Client<QuinnConnector<Response, Request>>, Error> {
) -> Result<CloudServicesClient, Error> {
let cloud_api_address = http_client
.get(get_cloud_api_address)
.send()
Expand Down Expand Up @@ -257,7 +259,7 @@ impl CloudServices {
.map_err(Error::FailedToCreateEndpoint)?;
endpoint.set_default_client_config(client_config);

Ok(Client::new(RpcClient::new(QuinnConnector::new(
Ok(Client::new(RpcClient::new(QuinnConnector::<Service>::new(
endpoint,
cloud_api_address,
domain_name,
Expand All @@ -269,7 +271,7 @@ impl CloudServices {
/// If the client is not connected, it will try to connect to the cloud services.
/// Available routes documented in
/// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema).
pub async fn client(&self) -> Result<Client<QuinnConnector<Response, Request>>, Error> {
pub async fn client(&self) -> Result<CloudServicesClient, Error> {
if let ClientState::Connected(client) = { &*self.client_state.read().await } {
return Ok(client.clone());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tracing::{debug, instrument, trace, Level};

#[derive(thiserror::Error, Debug, Serialize, Deserialize, Type, Clone)]
pub enum NonCriticalMediaDataExtractorError {
#[error("failed to extract media data from <file='{}'>: {1}", .0.display())]
#[error("failed to extract media data from <file='{path}'>: {1}", path = .0.display())]
FailedToExtractImageMediaData(PathBuf, String),
#[error("file path missing object id: <file_path_id='{0}'>")]
FilePathMissingObjectId(file_path::id::Type),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,17 @@ pub enum NonCriticalThumbnailerError {
MissingCasId(file_path::id::Type),
#[error("failed to extract isolated file path data from file path <id='{0}'>: {1}")]
FailedToExtractIsolatedFilePathData(file_path::id::Type, String),
#[error("failed to generate video file thumbnail <path='{}'>: {1}", .0.display())]
#[error("failed to generate video file thumbnail <path='{path}'>: {1}", path = .0.display())]
VideoThumbnailGenerationFailed(PathBuf, String),
#[error("failed to format image <path='{}'>: {1}", .0.display())]
#[error("failed to format image <path='{path}'>: {1}", path = .0.display())]
FormatImage(PathBuf, String),
#[error("failed to encode webp image <path='{}'>: {1}", .0.display())]
#[error("failed to encode webp image <path='{path}'>: {1}", path = .0.display())]
WebPEncoding(PathBuf, String),
#[error("processing thread panicked while generating thumbnail from <path='{}'>: {1}", .0.display())]
#[error("processing thread panicked while generating thumbnail from <path='{path}'>: {1}", path = .0.display())]
PanicWhileGeneratingThumbnail(PathBuf, String),
#[error("failed to create shard directory for thumbnail: {0}")]
CreateShardDirectory(String),
#[error("failed to save thumbnail <path='{}'>: {1}", .0.display())]
#[error("failed to save thumbnail <path='{path}'>: {1}", path = .0.display())]
SaveThumbnail(PathBuf, String),
#[error("task timed out: {0}")]
TaskTimeout(TaskId),
Expand Down
70 changes: 70 additions & 0 deletions core/crates/p2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
[package]
edition = "2021"
name = "sd-core-p2p"
version = "0.1.0"

[lints.rust]
# Warns
deprecated = "warn"
rust_2018_idioms = { level = "warn", priority = -1 }
trivial_casts = "warn"
trivial_numeric_casts = "warn"
unused_allocation = "warn"
unused_qualifications = "warn"
# Forbids
deprecated_in_future = "forbid"

[lints.clippy]
# Warns
all = { level = "warn", priority = -1 }
cast_lossless = "warn"
cast_possible_truncation = "warn"
cast_possible_wrap = "warn"
cast_precision_loss = "warn"
cast_sign_loss = "warn"
complexity = { level = "warn", priority = -1 }
correctness = { level = "warn", priority = -1 }
dbg_macro = "warn"
deprecated_cfg_attr = "warn"
nursery = { level = "warn", priority = -1 }
pedantic = { level = "warn", priority = -1 }
perf = { level = "warn", priority = -1 }
separated_literal_suffix = "warn"
style = { level = "warn", priority = -1 }
suspicious = { level = "warn", priority = -1 }
unnecessary_cast = "warn"
unwrap_used = "warn"
# Allows
missing_errors_doc = "allow"
module_name_repetitions = "allow"

[dependencies]
# Core Spacedrive Sub-crates
sd-core-cloud-services = { path = "../cloud-services" }

# Spacedrive Sub-crates
sd-cloud-schema = { workspace = true }
sd-crypto = { path = "../../../crates/crypto" }


# Workspace dependencies
anyhow = { workspace = true }
async-stream = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
futures-concurrency = { workspace = true }
iroh = { workspace = true, features = ["discovery-local-network"] }
quic-rpc = { workspace = true, features = ["iroh-transport"] }
serde = { workspace = true, features = ["derive"] }
specta = { workspace = true, features = ["chrono", "uuid"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tracing = { workspace = true }

# External dependencies
derive_more = { version = "1.0", features = ["display", "from", "try_into"] }
nested_enum_utils = "0.1"
postcard = { version = "1.1", features = ["use-std"] }
quic-rpc-derive = "0.17"
url = "2.5"
24 changes: 24 additions & 0 deletions core/crates/p2p/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::io;

#[derive(Debug, thiserror::Error)]
pub enum Error {
// Network setup errors
#[error("Setup iroh endpoint: {0}")]
SetupEndpoint(anyhow::Error),
#[error("Setup iroh listener: {0}")]
SetupListener(io::Error),
#[error("Initialize LocalSwarmDiscovery: {0}")]
LocalSwarmDiscoveryInit(anyhow::Error),
#[error("Initialize DhtDiscovery: {0}")]
DhtDiscoveryInit(anyhow::Error),

// Known hosts loading errors
#[error("Serialize known devices: {0}")]
SerializeKnownDevices(postcard::Error),
#[error("Deserialize known devices: {0}")]
DeserializeKnownDevices(postcard::Error),
#[error("Load known devices from file: {0}")]
LoadKnownDevices(io::Error),
#[error("Save known devices to file: {0}")]
SaveKnownDevices(io::Error),
}
Loading
Loading