From 083734076eb81ca8a5b4b4fddd06121bc6f95021 Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Wed, 24 Apr 2024 08:46:29 -0500 Subject: [PATCH] Add UDP packets dropped metric to RecvData --- dgram/src/async_socket.rs | 2 +- dgram/src/syscalls.rs | 105 +++++++++++++++++++++++--------------- 2 files changed, 66 insertions(+), 41 deletions(-) diff --git a/dgram/src/async_socket.rs b/dgram/src/async_socket.rs index f241a81576..e61ebd1e6c 100644 --- a/dgram/src/async_socket.rs +++ b/dgram/src/async_socket.rs @@ -87,7 +87,7 @@ pub async fn recv_from( Ok(RecvData { bytes: recv, peer_addr: None, - cmsgs: vec![], + metrics: None, gro: None, rx_time: None, }) diff --git a/dgram/src/syscalls.rs b/dgram/src/syscalls.rs index bd7d310ce3..e42808d49f 100644 --- a/dgram/src/syscalls.rs +++ b/dgram/src/syscalls.rs @@ -66,29 +66,6 @@ pub fn send_msg( .map_err(Into::into) } -/// Output of a `recvmsg` call -pub struct RecvData { - pub bytes: usize, - pub peer_addr: Option, - pub cmsgs: Vec, - pub gro: Option, - pub rx_time: Option, -} - -impl RecvData { - pub fn new( - peer_addr: Option, bytes: usize, cmsg_space: usize, - ) -> Self { - Self { - peer_addr, - bytes, - cmsgs: Vec::with_capacity(cmsg_space), - gro: None, - rx_time: None, - } - } -} - /// Receive a message via `recvmsg`. /// /// # Note @@ -100,18 +77,17 @@ pub fn recv_msg( fd: impl AsRawFd, read_buf: &mut [u8], cmsg_space: &mut Vec, msg_flags: Option, ) -> std::result::Result { + use nix::sys::socket::getsockopt; + use nix::sys::socket::sockopt::RxqOvfl; + cmsg_space.clear(); let iov_s = &mut [IoSliceMut::new(read_buf)]; let msg_flags = msg_flags.unwrap_or(MsgFlags::empty()); - let cmsg_cap = cmsg_space.capacity(); - match recvmsg::( - fd.as_raw_fd(), - iov_s, - Some(cmsg_space), - msg_flags, - ) { + let raw = fd.as_raw_fd(); + + match recvmsg::(raw, iov_s, Some(cmsg_space), msg_flags) { Ok(r) => { let bytes = r.bytes; @@ -131,16 +107,25 @@ pub fn recv_msg( _ => None, }; - let mut recv_data = RecvData::new(peer_addr, bytes, cmsg_cap); - - r.cmsgs().for_each(|msg| match msg { - ControlMessageOwned::ScmTimestampns(time) => - recv_data.rx_time = - SystemTime::UNIX_EPOCH.checked_add(time.into()), - ControlMessageOwned::UdpGroSegments(gro) => - recv_data.gro = Some(gro), - _ => recv_data.cmsgs.push(msg), - }); + let mut recv_data = RecvData::new(peer_addr, bytes); + + for msg in r.cmsgs() { + match msg { + ControlMessageOwned::ScmTimestampns(time) => + recv_data.rx_time = + SystemTime::UNIX_EPOCH.checked_add(time.into()), + ControlMessageOwned::UdpGroSegments(gro) => + recv_data.gro = Some(gro), + ControlMessageOwned::RxqOvfl(c) => { + if let Ok(1) = getsockopt(raw, RxqOvfl) { + recv_data.metrics = Some(RecvMetrics { + udp_packets_dropped: c as u64, + }); + } + }, + _ => return Err(Errno::EINVAL), + } + } Ok(recv_data) }, @@ -148,6 +133,46 @@ pub fn recv_msg( } } +/// Output of a `recvmsg` call. +pub struct RecvData { + /// The number of bytes which `recvmsg` returned. + pub bytes: usize, + /// The peer address for this message. + pub peer_addr: Option, + /// Metrics for this `recvmsg` call. + /// + /// If no valid metrics exist - for example, when the RXQOVFL sockopt is not + /// set - this will be `None` to prevent confusion when parsing metrics. + pub metrics: Option, + /// The `UDP_GRO_SEGMENTS` control message from the result of `recvmsg`, if + /// it exist. + pub gro: Option, + /// The RX_TIME control message from the result of `recvmsg`, if it exists. + pub rx_time: Option, +} + +impl RecvData { + pub fn new(peer_addr: Option, bytes: usize) -> Self { + Self { + peer_addr, + bytes, + metrics: None, + gro: None, + rx_time: None, + } + } +} + +/// Metrics for `recvmsg` calls. +#[derive(Default)] +pub struct RecvMetrics { + /// The number of packets dropped between the last received packet and this + /// one. + /// + /// See SO_RXQOVFL for more. + pub udp_packets_dropped: u64, +} + #[cfg(all(test, target_os = "linux"))] mod tests { use nix::cmsg_space;