From 4f4eb13d7021584f4917aa2d2c6122092a21cf53 Mon Sep 17 00:00:00 2001 From: Bernhard Specht Date: Wed, 10 Nov 2021 14:49:02 +0100 Subject: [PATCH 1/2] Support async capture with tokio --- Cargo.toml | 5 ++++ examples/async_capture.rs | 40 ++++++++++++++++++++++++++++++++ src/lib.rs | 48 ++++++++++++++++++++++++++++++++++++++- src/v4l2.rs | 10 +++++--- 4 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 examples/async_capture.rs diff --git a/Cargo.toml b/Cargo.toml index 1492065..56e8286 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,11 @@ edition = "2018" [dependencies] libc = "0.2" thiserror = "1.0.9" +tokio = { version = "1.9.0", features = ["net"] } + +[dev-dependencies] +tokio = { version = "1.9.0", features = ["full"] } [features] no_wrapper = [] +tokio_async = [] diff --git a/examples/async_capture.rs b/examples/async_capture.rs new file mode 100644 index 0000000..8456ea4 --- /dev/null +++ b/examples/async_capture.rs @@ -0,0 +1,40 @@ +use std::time::Duration; + +#[cfg(feature = "tokio_async")] +#[tokio::main] +async fn main() { + loop { + std::thread::sleep(Duration::from_secs(1)); + + let mut camera = match rscam::new("/dev/video0") { + Ok(camera) => camera, + Err(e) => { + eprintln!("failed to open camera: {}", e); + continue; + } + }; + + let res = camera.start(&rscam::Config { + interval: (1, 30), + resolution: (1920, 1080), + format: b"MJPG", + ..Default::default() + }); + + if let Err(e) = res { + eprintln!("failed to start camera: {}", e); + continue; + } + + for i in 1.. { + let frame = match camera.capture().await { + Ok(frame) => frame, + Err(e) => { + eprintln!("failed to capture frame: {}", e); + break; + } + }; + println!("Frame #{} of length {}", i, frame.len()); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 469c535..dafc7cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,9 @@ use std::slice; use std::str; use std::sync::Arc; +#[cfg(feature = "tokio_async")] +use tokio::io::unix::AsyncFd; + pub use self::consts::*; pub use self::v4l2::pubconsts as consts; use self::v4l2::MappedRegion; @@ -245,6 +248,8 @@ enum State { pub struct Camera { fd: RawFd, + #[cfg(feature = "tokio_async")] + async_fd: AsyncFd, state: State, resolution: (u32, u32), format: [u8; 4], @@ -253,8 +258,14 @@ pub struct Camera { impl Camera { pub fn new(device: &str) -> io::Result { + #[cfg(feature = "tokio_async")] + let fd = v4l2::open(device, false)?; + #[cfg(not(feature = "tokio_async"))] + let fd = v4l2::open(device, true)?; Ok(Camera { - fd: v4l2::open(device)?, + fd, + #[cfg(feature = "tokio_async")] + async_fd: AsyncFd::new(fd)?, state: State::Idle, resolution: (0, 0), format: [0; 4], @@ -534,6 +545,7 @@ impl Camera { /// /// # Panics /// If called w/o streaming. + #[cfg(not(feature = "tokio_async"))] pub fn capture(&self) -> io::Result { assert_eq!(self.state, State::Streaming); @@ -552,6 +564,40 @@ impl Camera { }) } + /// Async request of frame. + /// It dequeues buffer from a driver, which will be enqueueed after destructing `Frame`. + /// + /// # Panics + /// If called w/o streaming. + #[cfg(feature = "tokio_async")] + pub async fn capture(&self) -> io::Result { + assert_eq!(self.state, State::Streaming); + + let mut buf = v4l2::Buffer::new(); + + loop { + let mut guard = self.async_fd.readable().await?; + + match guard.try_io(|fd| v4l2::xioctl(*fd.get_ref(), v4l2::VIDIOC_DQBUF, &mut buf)) { + Ok(res) => { + res?; + break; + } + Err(_would_block) => continue, + } + } + assert!(buf.index < self.buffers.len() as u32); + + Ok(Frame { + resolution: self.resolution, + format: self.format, + region: self.buffers[buf.index as usize].clone(), + length: buf.bytesused, + fd: self.fd, + buffer: buf, + }) + } + /// Stop streaming. Otherwise it's called after destructing `Camera`. /// /// # Panics diff --git a/src/v4l2.rs b/src/v4l2.rs index 8aee511..1d5eee1 100644 --- a/src/v4l2.rs +++ b/src/v4l2.rs @@ -6,8 +6,8 @@ use std::ptr::null_mut; use std::{io, mem, usize}; // C types and constants. -use libc::timeval as Timeval; use libc::{c_ulong, c_void, off_t, size_t}; +use libc::{timeval as Timeval, O_NONBLOCK}; use libc::{MAP_SHARED, O_RDWR, PROT_READ, PROT_WRITE}; #[cfg(not(feature = "no_wrapper"))] @@ -55,9 +55,13 @@ macro_rules! check_io( (if $cond { Ok(()) } else { Err(io::Error::last_os_error()) }?) ); -pub fn open(file: &str) -> io::Result { +pub fn open(file: &str, sync: bool) -> io::Result { let c_str = CString::new(file)?; - let fd = unsafe { ll::open(c_str.as_ptr(), O_RDWR, 0) }; + let mut oflag = O_RDWR; + if !sync { + oflag |= O_NONBLOCK; + } + let fd = unsafe { ll::open(c_str.as_ptr(), oflag, 0) }; check_io!(fd != -1); Ok(fd) } From 424a8b9f10e80e5e9d4f83915a47b28a4fbbc6ed Mon Sep 17 00:00:00 2001 From: "b.specht" Date: Mon, 15 Nov 2021 13:38:46 +0100 Subject: [PATCH 2/2] Add option to set timeouts --- Cargo.toml | 5 --- examples/async_capture.rs | 40 --------------------- src/lib.rs | 73 +++++++++++++++------------------------ src/pselect.rs | 63 +++++++++++++++++++++++++++++++++ src/v4l2.rs | 8 ++--- 5 files changed, 92 insertions(+), 97 deletions(-) delete mode 100644 examples/async_capture.rs create mode 100644 src/pselect.rs diff --git a/Cargo.toml b/Cargo.toml index 56e8286..1492065 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,6 @@ edition = "2018" [dependencies] libc = "0.2" thiserror = "1.0.9" -tokio = { version = "1.9.0", features = ["net"] } - -[dev-dependencies] -tokio = { version = "1.9.0", features = ["full"] } [features] no_wrapper = [] -tokio_async = [] diff --git a/examples/async_capture.rs b/examples/async_capture.rs deleted file mode 100644 index 8456ea4..0000000 --- a/examples/async_capture.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::time::Duration; - -#[cfg(feature = "tokio_async")] -#[tokio::main] -async fn main() { - loop { - std::thread::sleep(Duration::from_secs(1)); - - let mut camera = match rscam::new("/dev/video0") { - Ok(camera) => camera, - Err(e) => { - eprintln!("failed to open camera: {}", e); - continue; - } - }; - - let res = camera.start(&rscam::Config { - interval: (1, 30), - resolution: (1920, 1080), - format: b"MJPG", - ..Default::default() - }); - - if let Err(e) = res { - eprintln!("failed to start camera: {}", e); - continue; - } - - for i in 1.. { - let frame = match camera.capture().await { - Ok(frame) => frame, - Err(e) => { - eprintln!("failed to capture frame: {}", e); - break; - } - }; - println!("Frame #{} of length {}", i, frame.len()); - } - } -} diff --git a/src/lib.rs b/src/lib.rs index dafc7cf..1704afb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,14 +36,17 @@ use std::result; use std::slice; use std::str; use std::sync::Arc; +use std::time::Duration; -#[cfg(feature = "tokio_async")] -use tokio::io::unix::AsyncFd; +use libc::timespec; + +use crate::pselect::{make_timespec, pselect, FdSet}; pub use self::consts::*; pub use self::v4l2::pubconsts as consts; use self::v4l2::MappedRegion; +mod pselect; mod v4l2; pub type Result = result::Result; @@ -79,6 +82,8 @@ pub struct Config<'a> { /// Number of buffers in the queue of camera. /// Default is `2`. pub nbuffers: u32, + /// Timeout for capturing a frame. + pub timeout: Option, } impl<'a> Default for Config<'a> { @@ -89,6 +94,7 @@ impl<'a> Default for Config<'a> { format: b"YUYV", field: FIELD_NONE, nbuffers: 2, + timeout: None, } } } @@ -248,31 +254,31 @@ enum State { pub struct Camera { fd: RawFd, - #[cfg(feature = "tokio_async")] - async_fd: AsyncFd, state: State, resolution: (u32, u32), format: [u8; 4], buffers: Vec>, + fd_set: FdSet, + timeout: Option, } impl Camera { pub fn new(device: &str) -> io::Result { - #[cfg(feature = "tokio_async")] - let fd = v4l2::open(device, false)?; - #[cfg(not(feature = "tokio_async"))] - let fd = v4l2::open(device, true)?; + let fd = v4l2::open(device)?; + let mut fd_set = FdSet::new(); + fd_set.set(fd); Ok(Camera { fd, - #[cfg(feature = "tokio_async")] - async_fd: AsyncFd::new(fd)?, state: State::Idle, resolution: (0, 0), format: [0; 4], buffers: vec![], + fd_set, + timeout: None, }) } + /// Sets the timeout for capturing an image. /// Get detailed info about the available formats. pub fn formats(&self) -> FormatIter<'_> { FormatIter { @@ -521,6 +527,7 @@ impl Camera { self.tune_format(config.resolution, *config.format, config.field)?; self.tune_stream(config.interval)?; self.alloc_buffers(config.nbuffers)?; + self.timeout = config.timeout.map(make_timespec); if let Err(err) = self.streamon() { self.free_buffers(); @@ -545,47 +552,21 @@ impl Camera { /// /// # Panics /// If called w/o streaming. - #[cfg(not(feature = "tokio_async"))] - pub fn capture(&self) -> io::Result { + pub fn capture(&mut self) -> io::Result { assert_eq!(self.state, State::Streaming); - let mut buf = v4l2::Buffer::new(); - - v4l2::xioctl(self.fd, v4l2::VIDIOC_DQBUF, &mut buf)?; - assert!(buf.index < self.buffers.len() as u32); - - Ok(Frame { - resolution: self.resolution, - format: self.format, - region: self.buffers[buf.index as usize].clone(), - length: buf.bytesused, - fd: self.fd, - buffer: buf, - }) - } - - /// Async request of frame. - /// It dequeues buffer from a driver, which will be enqueueed after destructing `Frame`. - /// - /// # Panics - /// If called w/o streaming. - #[cfg(feature = "tokio_async")] - pub async fn capture(&self) -> io::Result { - assert_eq!(self.state, State::Streaming); + pselect( + self.fd + 1, + Some(&mut self.fd_set), + None, + None, + self.timeout.as_ref(), + None, + )?; let mut buf = v4l2::Buffer::new(); - loop { - let mut guard = self.async_fd.readable().await?; - - match guard.try_io(|fd| v4l2::xioctl(*fd.get_ref(), v4l2::VIDIOC_DQBUF, &mut buf)) { - Ok(res) => { - res?; - break; - } - Err(_would_block) => continue, - } - } + v4l2::xioctl(self.fd, v4l2::VIDIOC_DQBUF, &mut buf)?; assert!(buf.index < self.buffers.len() as u32); Ok(Frame { diff --git a/src/pselect.rs b/src/pselect.rs new file mode 100644 index 0000000..be8731a --- /dev/null +++ b/src/pselect.rs @@ -0,0 +1,63 @@ +use std::os::unix::io::RawFd; +use std::{io, mem, ptr, time}; + +pub struct FdSet(libc::fd_set); + +impl FdSet { + pub fn new() -> FdSet { + unsafe { + let mut raw_fd_set = mem::MaybeUninit::::uninit(); + libc::FD_ZERO(raw_fd_set.as_mut_ptr()); + FdSet(raw_fd_set.assume_init()) + } + } + + pub fn set(&mut self, fd: RawFd) { + unsafe { + libc::FD_SET(fd, &mut self.0); + } + } +} + +fn to_fdset_ptr(opt: Option<&mut FdSet>) -> *mut libc::fd_set { + match opt { + None => ptr::null_mut(), + Some(&mut FdSet(ref mut raw_fd_set)) => raw_fd_set, + } +} +fn to_ptr(opt: Option<&T>) -> *const T { + match opt { + None => ptr::null::(), + Some(p) => p, + } +} + +pub fn pselect( + nfds: libc::c_int, + readfds: Option<&mut FdSet>, + writefds: Option<&mut FdSet>, + errorfds: Option<&mut FdSet>, + timeout: Option<&libc::timespec>, + sigmask: Option<&libc::sigset_t>, +) -> io::Result { + match unsafe { + libc::pselect( + nfds, + to_fdset_ptr(readfds), + to_fdset_ptr(writefds), + to_fdset_ptr(errorfds), + to_ptr(timeout), + to_ptr(sigmask), + ) + } { + -1 => Err(io::Error::last_os_error()), + res => Ok(res as usize), + } +} + +pub fn make_timespec(duration: time::Duration) -> libc::timespec { + libc::timespec { + tv_sec: duration.as_secs() as i64, + tv_nsec: duration.subsec_nanos() as i64, + } +} diff --git a/src/v4l2.rs b/src/v4l2.rs index 1d5eee1..41a1754 100644 --- a/src/v4l2.rs +++ b/src/v4l2.rs @@ -55,13 +55,9 @@ macro_rules! check_io( (if $cond { Ok(()) } else { Err(io::Error::last_os_error()) }?) ); -pub fn open(file: &str, sync: bool) -> io::Result { +pub fn open(file: &str) -> io::Result { let c_str = CString::new(file)?; - let mut oflag = O_RDWR; - if !sync { - oflag |= O_NONBLOCK; - } - let fd = unsafe { ll::open(c_str.as_ptr(), oflag, 0) }; + let fd = unsafe { ll::open(c_str.as_ptr(), O_RDWR | O_NONBLOCK, 0) }; check_io!(fd != -1); Ok(fd) }