Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Runtime trait and type #641

Merged
merged 2 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion async-std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ keywords = ["trillium", "framework", "async"]
categories = ["web-programming::http-server", "web-programming"]

[dependencies]
async-std = "1.12.0"
async-std = { version = "1.12.0", features = ["unstable"] }
futures-lite = "2.3.0"
log = "0.4.20"
trillium = { path = "../trillium", version = "0.2.19" }
trillium-http = { path = "../http", version = "0.3.16" }
Expand Down
16 changes: 5 additions & 11 deletions async-std/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::AsyncStdTransport;
use crate::{AsyncStdRuntime, AsyncStdTransport};
use async_std::net::TcpStream;
use std::{
future::Future,
io::{Error, ErrorKind, Result},
};
use std::io::{Error, ErrorKind, Result};
use trillium_server_common::{
url::{Host, Url},
Connector, Transport,
Expand Down Expand Up @@ -45,6 +42,7 @@ impl ClientConfig {

impl Connector for ClientConfig {
type Transport = AsyncStdTransport<TcpStream>;
type Runtime = AsyncStdRuntime;

async fn connect(&self, url: &Url) -> Result<Self::Transport> {
if url.scheme() != "http" {
Expand Down Expand Up @@ -80,11 +78,7 @@ impl Connector for ClientConfig {
Ok(tcp)
}

fn spawn<Fut: Future<Output = ()> + Send + 'static>(&self, fut: Fut) {
async_std::task::spawn(fut);
}

async fn delay(&self, duration: std::time::Duration) {
let _ = async_std::future::timeout(duration, std::future::pending::<()>()).await;
fn runtime(&self) -> Self::Runtime {
AsyncStdRuntime::default()
}
}
8 changes: 2 additions & 6 deletions async-std/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ async fn main() {
```
*/

use std::future::Future;

use trillium::Handler;
pub use trillium_server_common::{Binding, Swansong};

Expand Down Expand Up @@ -113,7 +111,5 @@ pub fn config() -> Config<()> {
Config::new()
}

/// spawn and detach a Future that returns ()
pub fn spawn<Fut: Future<Output = ()> + Send + 'static>(future: Fut) {
async_std::task::spawn(future);
}
mod runtime;
pub use runtime::AsyncStdRuntime;
86 changes: 86 additions & 0 deletions async-std/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use futures_lite::future::FutureExt;
use std::{future::Future, time::Duration};
use trillium_server_common::{DroppableFuture, Runtime, RuntimeTrait, Stream};

/// async-std runtime
#[derive(Clone, Copy, Default, Debug)]
pub struct AsyncStdRuntime(());

impl RuntimeTrait for AsyncStdRuntime {
fn spawn<Fut>(
&self,
fut: Fut,
) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let join_handle = async_std::task::spawn(fut);
DroppableFuture::new(async move { join_handle.catch_unwind().await.ok() })
}

async fn delay(&self, duration: Duration) {
async_std::task::sleep(duration).await
}

fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
async_std::stream::interval(period)
}

fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
async_std::task::block_on(fut)
}
}

impl AsyncStdRuntime {
/// Spawn a future on the runtime, returning a future that has detach-on-drop semantics
///
/// Spawned tasks conform to the following behavior:
///
/// * detach on drop: If the returned [`DroppableFuture`] is dropped immediately, the task will
/// continue to execute until completion.
///
/// * unwinding: If the spawned future panics, this must not propagate to the join
/// handle. Instead, the awaiting the join handle returns None in case of panic.
pub fn spawn<Fut>(
&self,
fut: Fut,
) -> DroppableFuture<impl Future<Output = Option<Fut::Output>> + Send + 'static>
where
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
let join_handle = async_std::task::spawn(fut);
DroppableFuture::new(async move { join_handle.catch_unwind().await.ok() })
}

/// Wake in this amount of wall time
pub async fn delay(&self, duration: Duration) {
async_std::task::sleep(duration).await
}

/// Returns a [`Stream`] that yields a `()` on the provided period
pub fn interval(&self, period: Duration) -> impl Stream<Item = ()> + Send + 'static {
async_std::stream::interval(period)
}

/// Runtime implementation hook for blocking on a top level future.
pub fn block_on<Fut: Future>(&self, fut: Fut) -> Fut::Output {
async_std::task::block_on(fut)
}

/// Race a future against the provided duration, returning None in case of timeout.
pub async fn timeout<Fut>(&self, duration: Duration, fut: Fut) -> Option<Fut::Output>
where
Fut: Future + Send,
Fut::Output: Send + 'static,
{
RuntimeTrait::timeout(self, duration, fut).await
}
}

impl From<AsyncStdRuntime> for Runtime {
fn from(value: AsyncStdRuntime) -> Self {
Runtime::new(value)
}
}
1 change: 0 additions & 1 deletion async-std/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ mod unix;
#[cfg(unix)]
pub use unix::AsyncStdServer;

#[cfg(not(unix))]
mod tcp;
#[cfg(not(unix))]
pub use tcp::AsyncStdServer;
Expand Down
16 changes: 6 additions & 10 deletions async-std/src/server/tcp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::AsyncStdTransport;
use crate::{AsyncStdRuntime, AsyncStdTransport};
use async_std::net::{TcpListener, TcpStream};
use async_std::task::{block_on, spawn};
use std::{convert::TryInto, env, future::Future, io::Result};
use std::{env, io::Result};
use trillium::Info;
use trillium_server_common::Server;

Expand All @@ -20,6 +19,7 @@ impl From<std::net::TcpListener> for AsyncStdServer {
}

impl Server for AsyncStdServer {
type Runtime = AsyncStdRuntime;
type Transport = AsyncStdTransport<TcpStream>;
const DESCRIPTION: &'static str = concat!(
" (",
Expand All @@ -34,18 +34,14 @@ impl Server for AsyncStdServer {
}

fn listener_from_tcp(tcp: std::net::TcpListener) -> Self {
Self(tcp.try_into().unwrap())
Self(tcp.into())
}

fn info(&self) -> Info {
self.0.local_addr().unwrap().into()
}

fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
spawn(fut);
}

fn block_on(fut: impl Future<Output = ()> + 'static) {
block_on(fut)
fn runtime() -> Self::Runtime {
AsyncStdRuntime::default()
}
}
15 changes: 6 additions & 9 deletions async-std/src/server/unix.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use crate::AsyncStdTransport;
use crate::{AsyncStdRuntime, AsyncStdTransport};
use async_std::{
net::{TcpListener, TcpStream},
os::unix::net::{UnixListener, UnixStream},
stream::StreamExt,
task::{block_on, spawn},
};
use std::{env, future::Future, io::Result};
use std::{env, io::Result};
use trillium::{log_error, Info};
use trillium_server_common::{
Binding::{self, *},
Expand Down Expand Up @@ -39,6 +38,8 @@ impl From<std::os::unix::net::UnixListener> for AsyncStdServer {

#[cfg(unix)]
impl Server for AsyncStdServer {
type Runtime = AsyncStdRuntime;

type Transport = Binding<AsyncStdTransport<TcpStream>, AsyncStdTransport<UnixStream>>;
const DESCRIPTION: &'static str = concat!(
" (",
Expand Down Expand Up @@ -94,12 +95,8 @@ impl Server for AsyncStdServer {
}
}

fn spawn(fut: impl Future<Output = ()> + Send + 'static) {
spawn(fut);
}

fn block_on(fut: impl Future<Output = ()> + 'static) {
block_on(fut);
fn runtime() -> Self::Runtime {
AsyncStdRuntime::default()
}

async fn clean_up(self) {
Expand Down
Loading
Loading