Skip to content

Commit

Permalink
Add UDP packets dropped metric to RecvData
Browse files Browse the repository at this point in the history
  • Loading branch information
evanrittenhouse committed Apr 24, 2024
1 parent 7850f24 commit 0837340
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 41 deletions.
2 changes: 1 addition & 1 deletion dgram/src/async_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub async fn recv_from(
Ok(RecvData {
bytes: recv,
peer_addr: None,
cmsgs: vec![],
metrics: None,
gro: None,
rx_time: None,
})
Expand Down
105 changes: 65 additions & 40 deletions dgram/src/syscalls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,6 @@ pub fn send_msg(
.map_err(Into::into)
}

/// Output of a `recvmsg` call
pub struct RecvData {
pub bytes: usize,
pub peer_addr: Option<SocketAddr>,
pub cmsgs: Vec<ControlMessageOwned>,
pub gro: Option<u16>,
pub rx_time: Option<SystemTime>,
}

impl RecvData {
pub fn new(
peer_addr: Option<SocketAddr>, bytes: usize, cmsg_space: usize,
) -> Self {
Self {
peer_addr,
bytes,
cmsgs: Vec::with_capacity(cmsg_space),
gro: None,
rx_time: None,
}
}
}

/// Receive a message via `recvmsg`.
///
/// # Note
Expand All @@ -100,18 +77,17 @@ pub fn recv_msg(
fd: impl AsRawFd, read_buf: &mut [u8], cmsg_space: &mut Vec<u8>,
msg_flags: Option<MsgFlags>,
) -> std::result::Result<RecvData, Errno> {
use nix::sys::socket::getsockopt;
use nix::sys::socket::sockopt::RxqOvfl;

cmsg_space.clear();

let iov_s = &mut [IoSliceMut::new(read_buf)];
let msg_flags = msg_flags.unwrap_or(MsgFlags::empty());
let cmsg_cap = cmsg_space.capacity();

match recvmsg::<SockaddrStorage>(
fd.as_raw_fd(),
iov_s,
Some(cmsg_space),
msg_flags,
) {
let raw = fd.as_raw_fd();

match recvmsg::<SockaddrStorage>(raw, iov_s, Some(cmsg_space), msg_flags) {
Ok(r) => {
let bytes = r.bytes;

Expand All @@ -131,23 +107,72 @@ pub fn recv_msg(
_ => None,
};

let mut recv_data = RecvData::new(peer_addr, bytes, cmsg_cap);

r.cmsgs().for_each(|msg| match msg {
ControlMessageOwned::ScmTimestampns(time) =>
recv_data.rx_time =
SystemTime::UNIX_EPOCH.checked_add(time.into()),
ControlMessageOwned::UdpGroSegments(gro) =>
recv_data.gro = Some(gro),
_ => recv_data.cmsgs.push(msg),
});
let mut recv_data = RecvData::new(peer_addr, bytes);

for msg in r.cmsgs() {
match msg {
ControlMessageOwned::ScmTimestampns(time) =>
recv_data.rx_time =
SystemTime::UNIX_EPOCH.checked_add(time.into()),
ControlMessageOwned::UdpGroSegments(gro) =>
recv_data.gro = Some(gro),
ControlMessageOwned::RxqOvfl(c) => {
if let Ok(1) = getsockopt(raw, RxqOvfl) {
recv_data.metrics = Some(RecvMetrics {
udp_packets_dropped: c as u64,
});
}
},
_ => return Err(Errno::EINVAL),
}
}

Ok(recv_data)
},
Err(e) => Err(e),
}
}

/// Output of a `recvmsg` call.
pub struct RecvData {
/// The number of bytes which `recvmsg` returned.
pub bytes: usize,
/// The peer address for this message.
pub peer_addr: Option<SocketAddr>,
/// Metrics for this `recvmsg` call.
///
/// If no valid metrics exist - for example, when the RXQOVFL sockopt is not
/// set - this will be `None` to prevent confusion when parsing metrics.
pub metrics: Option<RecvMetrics>,
/// The `UDP_GRO_SEGMENTS` control message from the result of `recvmsg`, if
/// it exist.
pub gro: Option<u16>,
/// The RX_TIME control message from the result of `recvmsg`, if it exists.
pub rx_time: Option<SystemTime>,
}

impl RecvData {
pub fn new(peer_addr: Option<SocketAddr>, bytes: usize) -> Self {
Self {
peer_addr,
bytes,
metrics: None,
gro: None,
rx_time: None,
}
}
}

/// Metrics for `recvmsg` calls.
#[derive(Default)]
pub struct RecvMetrics {
/// The number of packets dropped between the last received packet and this
/// one.
///
/// See SO_RXQOVFL for more.
pub udp_packets_dropped: u64,
}

#[cfg(all(test, target_os = "linux"))]
mod tests {
use nix::cmsg_space;
Expand Down

0 comments on commit 0837340

Please sign in to comment.