From 8bc8386dcf92c1ae1204112079d942ef06ca9187 Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Wed, 12 Jun 2024 16:17:55 -0500 Subject: [PATCH] Finish async API --- apps/src/common.rs | 8 ++- apps/src/recvfrom.rs | 5 +- dgram/src/lib.rs | 30 +++--------- dgram/src/syscalls.rs | 23 +++++---- dgram/src/tokio.rs | 110 ++++++++++++++++++++++++------------------ 5 files changed, 88 insertions(+), 88 deletions(-) diff --git a/apps/src/common.rs b/apps/src/common.rs index 8ac6b002dd..746e27a7b6 100644 --- a/apps/src/common.rs +++ b/apps/src/common.rs @@ -629,10 +629,8 @@ impl HttpConn for Http09Conn { s ); - // let body = - // std::fs::read(path.as_path()) - // .unwrap_or_else(|_| b"Not Found!\r\n".to_vec()); - let body = vec![0; 1_000_000]; + let body = std::fs::read(path.as_path()) + .unwrap_or_else(|_| b"Not Found!\r\n".to_vec()); info!( "{} sending response of size {} on stream {}", @@ -1094,7 +1092,7 @@ impl Http3Conn { match std::fs::read(file_path.as_path()) { Ok(data) => (200, data), - Err(_) => (404, vec![57; 1_000_000]), + Err(_) => (404, b"Not Found!".to_vec()), } }, diff --git a/apps/src/recvfrom.rs b/apps/src/recvfrom.rs index b46af74410..b59310b84a 100644 --- a/apps/src/recvfrom.rs +++ b/apps/src/recvfrom.rs @@ -39,7 +39,10 @@ pub fn recv_from( use dgram::RecvMsgSettings; use std::os::unix::io::AsRawFd; - let mut recvmsg_cmsg_settings = RecvMsgSettings::default(); + let mut recvmsg_cmsg_settings = RecvMsgSettings { + store_cmsgs: false, + cmsg_space: &mut vec![], + }; socket.try_io(|| { let fd = unsafe { std::os::fd::BorrowedFd::borrow_raw(socket.as_raw_fd()) }; diff --git a/dgram/src/lib.rs b/dgram/src/lib.rs index 416935c41f..4f26d7a923 100644 --- a/dgram/src/lib.rs +++ b/dgram/src/lib.rs @@ -31,12 +31,9 @@ pub struct SendMsgSettings { /// Settings for handling control messages when receiving data. #[cfg(target_os = "linux")] -#[derive(Clone)] -pub struct RecvMsgSettings { - // TODO(evanrittenhouse): deprecate store_cmsgs and only store based on what - // cmsg_space can handle. +pub struct RecvMsgSettings<'c> { /// If cmsgs should be stored when receiving a message. If set, cmsgs will - /// be stored in the `cmsg_space` vector. + /// be stored in the [`RecvData`]'s `cmsgs` field. pub store_cmsgs: bool, /// The vector where cmsgs will be stored, if store_cmsgs is set. /// @@ -45,19 +42,15 @@ pub struct RecvMsgSettings { /// [`cmsg_space`] macro. /// /// [`cmsg_space`]: https://docs.rs/nix/latest/nix/macro.cmsg_space.html - pub cmsg_space: Vec, - /// Flags for [`recvmsg`]. See [MsgFlags] for more. - /// - /// [`recvmsg`]: [nix::sys::socket::recvmsg] - pub msg_flags: MsgFlags, + pub cmsg_space: &'c mut Vec, } -impl Default for RecvMsgSettings { - fn default() -> Self { +impl<'c> RecvMsgSettings<'c> { + // Convenience to avoid forcing a specific version of nix + pub fn new(store_cmsgs: bool, cmsg_space: &'c mut Vec) -> Self { Self { - msg_flags: MsgFlags::empty(), - store_cmsgs: false, - cmsg_space: vec![], + store_cmsgs, + cmsg_space, } } } @@ -173,10 +166,3 @@ mod linux_imports { pub(super) use std::net::SocketAddrV6; pub(super) use std::os::fd::AsRawFd; } - -#[cfg(feature = "async")] -mod async_imports { - pub(super) use std::io::ErrorKind; - pub(super) use tokio::io::Interest; - pub(super) use tokio::net::UdpSocket; -} diff --git a/dgram/src/syscalls.rs b/dgram/src/syscalls.rs index ccdeec011d..ddba231676 100644 --- a/dgram/src/syscalls.rs +++ b/dgram/src/syscalls.rs @@ -89,10 +89,10 @@ pub fn send_msg( /// /// # Note /// -/// It is the caller's responsibility to create the cmsg space. `nix` recommends -/// that the space be created via the `cmsg_space!()` macro. Calling this -/// function will clear the cmsg buffer. It is also the caller's responsibility -/// to set any relevant socket options. +/// It is the caller's responsibility to create and clear the cmsg space.`nix` +/// recommends that the space be created via the `cmsg_space!()` macro. Calling +/// this function will clear the cmsg buffer. It is also the caller's +/// responsibility to set any relevant socket options. #[cfg(target_os = "linux")] pub fn recv_msg( fd: impl AsFd, read_buf: &mut [u8], recvmsg_settings: &mut RecvMsgSettings, @@ -102,7 +102,6 @@ pub fn recv_msg( let RecvMsgSettings { store_cmsgs, ref mut cmsg_space, - msg_flags, } = recvmsg_settings; cmsg_space.clear(); @@ -115,7 +114,7 @@ pub fn recv_msg( borrowed.as_raw_fd(), iov_s, Some(cmsg_space), - *msg_flags, + MsgFlags::empty(), ) { Ok(r) => { let bytes = r.bytes; @@ -137,7 +136,6 @@ pub fn recv_msg( }; let mut recv_data = RecvData::new(peer_addr, bytes, cmsg_space_len); - for msg in r.cmsgs() { match msg { ControlMessageOwned::ScmTimestampns(time) => @@ -260,8 +258,10 @@ mod tests { sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?; let mut read_buf = [0; 4]; - let recv_data = - recv_msg(recv, &mut read_buf, &mut RecvMsgSettings::default())?; + let recv_data = recv_msg(recv, &mut read_buf, &mut RecvMsgSettings { + store_cmsgs: false, + cmsg_space: &mut vec![], + })?; assert_eq!(recv_data.bytes, 4); assert_eq!(&read_buf, b"jets"); @@ -317,11 +317,10 @@ mod tests { let iov = [IoSlice::new(send_buf)]; sendmsg(send.as_raw_fd(), &iov, &[], MsgFlags::empty(), Some(&addr))?; - let cmsg_space = cmsg_space!(TimeVal); + let mut cmsg_space = cmsg_space!(TimeVal); let mut recvmsg_settings = RecvMsgSettings { store_cmsgs: true, - cmsg_space, - msg_flags: MsgFlags::empty(), + cmsg_space: &mut cmsg_space, }; let mut read_buf = [0; 4]; diff --git a/dgram/src/tokio.rs b/dgram/src/tokio.rs index 4b04138efa..2402d9867b 100644 --- a/dgram/src/tokio.rs +++ b/dgram/src/tokio.rs @@ -1,7 +1,11 @@ use crate::RecvData; +use std::io::ErrorKind; use std::io::Result; +use std::task::Context; +use std::task::Poll; -use crate::async_imports::*; +use tokio::io::Interest; +use tokio::net::UdpSocket; #[cfg(target_os = "linux")] mod linux { @@ -13,64 +17,74 @@ mod linux { use linux::*; #[cfg(target_os = "linux")] -pub async fn send_to( - socket: &UdpSocket, send_buf: &[u8], send_msg_settings: SendMsgCmsgSettings, -) -> Result { +pub fn poll_send_to( + socket: &UdpSocket, ctx: &mut Context<'_>, send_buf: &[u8], + sendmsg_settings: SendMsgSettings, +) -> Poll> { loop { - // Important to use try_io so that Tokio can clear the socket's readiness - // flag - let res = socket.try_io(Interest::WRITABLE, || { - let fd = socket.as_fd(); - send_msg(fd, send_buf, send_msg_settings).map_err(Into::into) - }); - - match res { - Err(e) if e.kind() == ErrorKind::WouldBlock => - socket.writable().await?, - res => return res, + match socket.poll_send_ready(ctx) { + Poll::Ready(Ok(())) => { + // Important to use try_io so that Tokio can clear the socket's + // readiness flag + match socket.try_io(Interest::WRITABLE, || { + let fd = socket.as_fd(); + send_msg(fd, send_buf, sendmsg_settings).map_err(Into::into) + }) { + Err(e) if e.kind() == ErrorKind::WouldBlock => {}, + io_res => break Poll::Ready(io_res), + } + }, + Poll::Ready(Err(e)) => break Poll::Ready(Err(e)), + Poll::Pending => break Poll::Pending, } } } #[cfg(target_os = "linux")] -pub async fn recv_from( - socket: &UdpSocket, read_buf: &mut [u8], msg_flags: Option, - store_cmsg_settings: &mut RecvMsgCmsgSettings, -) -> Result { - loop { - // Important to use try_io so that Tokio can clear the socket's readiness - // flag - let res = socket.try_io(Interest::READABLE, || { - let fd = socket.as_fd(); - recv_msg( - fd, - read_buf, - msg_flags.unwrap_or(MsgFlags::empty()), - store_cmsg_settings, - ) - .map_err(Into::into) - }); +pub async fn send_to( + socket: &UdpSocket, send_buf: &[u8], sendmsg_settings: SendMsgSettings, +) -> Result { + std::future::poll_fn(|mut cx| { + poll_send_to(socket, &mut cx, send_buf, sendmsg_settings) + }) + .await +} - match res { - Err(e) if e.kind() == ErrorKind::WouldBlock => - socket.readable().await?, - _ => return res, +#[cfg(target_os = "linux")] +pub fn poll_recv_from( + socket: &UdpSocket, ctx: &mut Context<'_>, recv_buf: &mut [u8], + recvmsg_settings: &mut RecvMsgSettings, +) -> Poll> { + loop { + match socket.poll_recv_ready(ctx) { + Poll::Ready(Ok(())) => { + // Important to use try_io so that Tokio can clear the socket's + // readiness flag + match socket.try_io(Interest::READABLE, || { + let fd = socket.as_fd(); + recv_msg(fd, recv_buf, recvmsg_settings).map_err(Into::into) + }) { + // The `poll_recv_ready` future registers the ctx with Tokio. + // We can only return Pending when that + // future is Pending or we won't wake the + // runtime properly + Err(e) if e.kind() == ErrorKind::WouldBlock => {}, + io_res => break Poll::Ready(io_res), + } + }, + Poll::Ready(Err(e)) => break Poll::Ready(Err(e)), + Poll::Pending => break Poll::Pending, } } } -#[cfg(not(target_os = "linux"))] -pub async fn send_to( - socket: &UdpSocket, client_addr: SocketAddr, -) -> Result { - socket.send_to(send_buf, client_addr).await -} - -#[cfg(not(target_os = "linux"))] +#[cfg(target_os = "linux")] pub async fn recv_from( - socket: &UdpSocket, read_buf: &mut [u8], + socket: &UdpSocket, recv_buf: &mut [u8], + recvmsg_settings: &mut RecvMsgSettings<'_>, ) -> Result { - let recv = socket.recv(read_buf).await?; - - Ok(RecvData::from_bytes(bytes)) + std::future::poll_fn(|mut ctx| { + poll_recv_from(socket, &mut ctx, recv_buf, recvmsg_settings) + }) + .await }