diff --git a/crates/bytes-util/src/bytes_cursor.rs b/crates/bytes-util/src/bytes_cursor.rs index 3fcf7d4b6..a18d80930 100644 --- a/crates/bytes-util/src/bytes_cursor.rs +++ b/crates/bytes-util/src/bytes_cursor.rs @@ -23,21 +23,37 @@ pub trait BytesCursor { impl BytesCursor for io::Cursor { fn remaining(&self) -> usize { - self.get_ref().len() - self.position() as usize + // We have to use a saturating sub here because the position can be + // greater than the length of the bytes. + self.get_ref().len().saturating_sub(self.position() as usize) } fn extract_remaining(&mut self) -> Bytes { - self.extract_bytes(self.remaining()) - .expect("somehow we read past the end of the file") + // We don't really care if we fail here since the desired behavior is + // to return all bytes remaining in the cursor. If we fail its because + // there are not enough bytes left in the cursor to read. + self.extract_bytes(self.remaining()).unwrap_or_default() } fn extract_bytes(&mut self, size: usize) -> io::Result { - let position = self.position() as usize; - if position + size > self.get_ref().len() { + // If the size is zero we can just return an empty bytes slice. + if size == 0 { + return Ok(Bytes::new()); + } + + // If the size is greater than the remaining bytes we can just return an + // error. + if size > self.remaining() { return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "not enough bytes")); } + let position = self.position() as usize; + + // We slice bytes here which is a O(1) operation as it only modifies a few + // reference counters and does not copy the memory. let slice = self.get_ref().slice(position..position + size); + + // We advance the cursor because we have now "read" the bytes. self.set_position((position + size) as u64); Ok(slice) @@ -45,13 +61,53 @@ impl BytesCursor for io::Cursor { } #[cfg(test)] +#[cfg_attr(all(test, coverage_nightly), coverage(off))] mod tests { use super::*; #[test] - fn test_bytes_cursor() { + fn test_bytes_cursor_extract_remaining() { let mut cursor = io::Cursor::new(Bytes::from_static(&[1, 2, 3, 4, 5])); let remaining = cursor.extract_remaining(); assert_eq!(remaining, Bytes::from_static(&[1, 2, 3, 4, 5])); } + + #[test] + fn test_bytes_cursor_extract_bytes() { + let mut cursor = io::Cursor::new(Bytes::from_static(&[1, 2, 3, 4, 5])); + let bytes = cursor.extract_bytes(3).unwrap(); + assert_eq!(bytes, Bytes::from_static(&[1, 2, 3])); + assert_eq!(cursor.remaining(), 2); + + let bytes = cursor.extract_bytes(2).unwrap(); + assert_eq!(bytes, Bytes::from_static(&[4, 5])); + assert_eq!(cursor.remaining(), 0); + + let bytes = cursor.extract_bytes(1).unwrap_err(); + assert_eq!(bytes.kind(), io::ErrorKind::UnexpectedEof); + + let bytes = cursor.extract_bytes(0).unwrap(); + assert_eq!(bytes, Bytes::from_static(&[])); + assert_eq!(cursor.remaining(), 0); + + let bytes = cursor.extract_remaining(); + assert_eq!(bytes, Bytes::from_static(&[])); + assert_eq!(cursor.remaining(), 0); + } + + #[test] + fn seek_out_of_bounds() { + let mut cursor = io::Cursor::new(Bytes::from_static(&[1, 2, 3, 4, 5])); + cursor.set_position(10); + assert_eq!(cursor.remaining(), 0); + + let bytes = cursor.extract_remaining(); + assert_eq!(bytes, Bytes::from_static(&[])); + + let bytes = cursor.extract_bytes(1); + assert_eq!(bytes.unwrap_err().kind(), io::ErrorKind::UnexpectedEof); + + let bytes = cursor.extract_bytes(0); + assert_eq!(bytes.unwrap(), Bytes::from_static(&[])); + } } diff --git a/crates/rtmp/src/session/server_session.rs b/crates/rtmp/src/session/server_session.rs index 6b969455e..db25a89f4 100644 --- a/crates/rtmp/src/session/server_session.rs +++ b/crates/rtmp/src/session/server_session.rs @@ -128,9 +128,6 @@ impl Session { self.flush().await?; } - println!("write_buf: cap: {}; len: {}", self.write_buf.capacity(), self.write_buf.len()); - println!("read_buf: cap: {}; len: {}", self.read_buf.capacity(), self.read_buf.len()); - // We should technically check the stream_map here // However most clients just disconnect without cleanly stopping the subscrition // streams (play streams) So we just check that all publishers have disconnected