Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BufResult<T, B> to be Result<(T,B), BufError<B>> #267

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/cat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ fn main() {

loop {
// Read a chunk
let (res, b) = file.read_at(buf, pos).await;
let n = res.unwrap();
let (n, b) = file.read_at(buf, pos).await.unwrap();

if n == 0 {
break;
Expand Down
7 changes: 3 additions & 4 deletions examples/mix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,14 @@ fn main() {

loop {
// Read a chunk
let (res, b) = file.read_at(buf, pos).await;
let n = res.unwrap();
let (n, b) = file.read_at(buf, pos).await.unwrap();

if n == 0 {
break;
}

let (res, b) = socket.write(b).submit().await;
pos += res.unwrap() as u64;
let (written, b) = socket.write(b).submit().await.unwrap();
pos += written as u64;

buf = b;
}
Expand Down
6 changes: 2 additions & 4 deletions examples/tcp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,14 @@ fn main() {

let mut buf = vec![0u8; 4096];
loop {
let (result, nbuf) = stream.read(buf).await;
let (read, nbuf) = stream.read(buf).await.unwrap();
buf = nbuf;
let read = result.unwrap();
if read == 0 {
println!("{} closed, {} total ping-ponged", socket_addr, n);
break;
}

let (res, slice) = stream.write_all(buf.slice(..read)).await;
let _ = res.unwrap();
let (_, slice) = stream.write_all(buf.slice(..read)).await.unwrap();
buf = slice.into_inner();
println!("{} all {} bytes ping-ponged", socket_addr, read);
n += read;
Expand Down
6 changes: 2 additions & 4 deletions examples/tcp_listener_fixed_buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,15 @@ async fn echo_handler<T: IoBufMut>(
// Each time through the loop, use fbuf and then get it back for the next
// iteration.

let (result, fbuf1) = stream.read_fixed(fbuf).await;
let (read, fbuf1) = stream.read_fixed(fbuf).await.unwrap();
fbuf = {
let read = result.unwrap();
if read == 0 {
break;
}
assert_eq!(4096, fbuf1.len()); // To prove a point.

let (res, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await;
let (_, nslice) = stream.write_fixed_all(fbuf1.slice(..read)).await.unwrap();

let _ = res.unwrap();
println!("peer {} all {} bytes ping-ponged", peer, read);
n += read;

Expand Down
7 changes: 3 additions & 4 deletions examples/tcp_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ fn main() {
let stream = TcpStream::connect(socket_addr).await.unwrap();
let buf = vec![1u8; 128];

let (result, buf) = stream.write(buf).submit().await;
println!("written: {}", result.unwrap());
let (written, buf) = stream.write(buf).submit().await.unwrap();
println!("written: {}", written);

let (result, buf) = stream.read(buf).await;
let read = result.unwrap();
let (read, buf) = stream.read(buf).await.unwrap();
println!("read: {:?}", &buf[..read]);
});
}
7 changes: 3 additions & 4 deletions examples/udp_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ fn main() {

let buf = vec![0u8; 128];

let (result, mut buf) = socket.recv_from(buf).await;
let (read, socket_addr) = result.unwrap();
let ((read, socket_addr), mut buf) = socket.recv_from(buf).await.unwrap();
buf.resize(read, 0);
println!("received from {}: {:?}", socket_addr, &buf[..]);

let (result, _buf) = socket.send_to(buf, socket_addr).await;
println!("sent to {}: {}", socket_addr, result.unwrap());
let (sent, _buf) = socket.send_to(buf, socket_addr).await.unwrap();
println!("sent to {}: {}", socket_addr, sent);
});
}
7 changes: 3 additions & 4 deletions examples/unix_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ fn main() {
tokio_uring::spawn(async move {
let buf = vec![1u8; 128];

let (result, buf) = stream.write(buf).submit().await;
println!("written to {}: {}", &socket_addr, result.unwrap());
let (written, buf) = stream.write(buf).submit().await.unwrap();
println!("written to {}: {}", &socket_addr, written);

let (result, buf) = stream.read(buf).await;
let read = result.unwrap();
let (read, buf) = stream.read(buf).await.unwrap();
println!("read from {}: {:?}", &socket_addr, &buf[..read]);
});
}
Expand Down
7 changes: 3 additions & 4 deletions examples/unix_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ fn main() {
let stream = UnixStream::connect(socket_addr).await.unwrap();
let buf = vec![1u8; 128];

let (result, buf) = stream.write(buf).submit().await;
println!("written: {}", result.unwrap());
let (written, buf) = stream.write(buf).submit().await.unwrap();
println!("written: {}", written);

let (result, buf) = stream.read(buf).await;
let read = result.unwrap();
let (read, buf) = stream.read(buf).await.unwrap();
println!("read: {:?}", &buf[..read]);
});
}
4 changes: 2 additions & 2 deletions examples/wrk-bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ fn main() -> io::Result<()> {
let (stream, _) = listener.accept().await?;

tokio_uring::spawn(async move {
let (result, _) = stream.write(RESPONSE).submit().await;
let result = stream.write(RESPONSE).submit().await;

if let Err(err) = result {
eprintln!("Client connection failed: {}", err);
eprintln!("Client connection failed: {}", err.0);
}
});
}
Expand Down
1 change: 1 addition & 0 deletions src/buf/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::ops;
///
/// assert_eq!(&slice[..], b"hello");
/// ```
#[derive(Debug)]
pub struct Slice<T> {
buf: T,
begin: usize,
Expand Down
108 changes: 44 additions & 64 deletions src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::fs::OpenOptions;
use crate::io::SharedFd;

use crate::runtime::driver::op::Op;
use crate::{UnsubmittedOneshot, UnsubmittedWrite};
use crate::{map_buf, BufError, UnsubmittedOneshot, UnsubmittedWrite};
use std::fmt;
use std::io;
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
Expand Down Expand Up @@ -397,8 +397,9 @@ impl File {
T: BoundedBufMut,
{
let orig_bounds = buf.bounds();
let (res, buf) = self.read_exact_slice_at(buf.slice_full(), pos).await;
(res, T::from_buf_bounds(buf, orig_bounds))
let buf_res = self.read_exact_slice_at(buf.slice_full(), pos).await;
// (res, T::from_buf_bounds(buf, orig_bounds))
map_buf(buf_res, |buf| T::from_buf_bounds(buf, orig_bounds))
}

async fn read_exact_slice_at<T: IoBufMut>(
Expand All @@ -407,28 +408,22 @@ impl File {
mut pos: u64,
) -> crate::BufResult<(), T> {
if pos.checked_add(buf.bytes_total() as u64).is_none() {
return (
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"buffer too large for file",
)),
return Err(BufError(
io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"),
buf.into_inner(),
);
));
}

while buf.bytes_total() != 0 {
let (res, slice) = self.read_at(buf, pos).await;
match res {
Ok(0) => {
return (
Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
)),
let buf_result = self.read_at(buf, pos).await;
match buf_result {
Ok((0, slice)) => {
return Err(BufError(
io::Error::new(io::ErrorKind::UnexpectedEof, "failed to fill whole buffer"),
slice.into_inner(),
)
))
}
Ok(n) => {
Ok((n, slice)) => {
pos += n as u64;
buf = slice.slice(n..);
}
Expand All @@ -437,11 +432,10 @@ impl File {
// crate's design ensures we are not calling the 'wait' option
// in the ENTER syscall. Only an Enter with 'wait' can generate
// an EINTR according to the io_uring man pages.
Err(e) => return (Err(e), slice.into_inner()),
Err(e) => return Err(e.map_buf(|slice| slice.into_inner())),
};
}

(Ok(()), buf.into_inner())
Ok(((), buf.into_inner()))
}

/// Like [`read_at`], but using a pre-mapped buffer
Expand Down Expand Up @@ -589,8 +583,8 @@ impl File {
T: BoundedBuf,
{
let orig_bounds = buf.bounds();
let (res, buf) = self.write_all_slice_at(buf.slice_full(), pos).await;
(res, T::from_buf_bounds(buf, orig_bounds))
let buf_result = self.write_all_slice_at(buf.slice_full(), pos).await;
map_buf(buf_result, |buf| T::from_buf_bounds(buf, orig_bounds))
}

async fn write_all_slice_at<T: IoBuf>(
Expand All @@ -599,28 +593,22 @@ impl File {
mut pos: u64,
) -> crate::BufResult<(), T> {
if pos.checked_add(buf.bytes_init() as u64).is_none() {
return (
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"buffer too large for file",
)),
return Err(BufError(
io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"),
buf.into_inner(),
);
));
}

while buf.bytes_init() != 0 {
let (res, slice) = self.write_at(buf, pos).submit().await;
match res {
Ok(0) => {
return (
Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
let buf_result = self.write_at(buf, pos).submit().await;
match buf_result {
Ok((0, slice)) => {
return Err(BufError(
io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"),
slice.into_inner(),
)
))
}
Ok(n) => {
Ok((n, slice)) => {
pos += n as u64;
buf = slice.slice(n..);
}
Expand All @@ -629,11 +617,10 @@ impl File {
// crate's design ensures we are not calling the 'wait' option
// in the ENTER syscall. Only an Enter with 'wait' can generate
// an EINTR according to the io_uring man pages.
Err(e) => return (Err(e), slice.into_inner()),
Err(e) => return Err(e.map_buf(|slice| slice.into_inner())),
};
}

(Ok(()), buf.into_inner())
Ok(((), buf.into_inner()))
}

/// Like [`write_at`], but using a pre-mapped buffer
Expand Down Expand Up @@ -709,8 +696,8 @@ impl File {
T: BoundedBuf<Buf = FixedBuf>,
{
let orig_bounds = buf.bounds();
let (res, buf) = self.write_fixed_all_at_slice(buf.slice_full(), pos).await;
(res, T::from_buf_bounds(buf, orig_bounds))
let buf_result = self.write_fixed_all_at_slice(buf.slice_full(), pos).await;
map_buf(buf_result, |buf| T::from_buf_bounds(buf, orig_bounds))
}

async fn write_fixed_all_at_slice(
Expand All @@ -719,28 +706,22 @@ impl File {
mut pos: u64,
) -> crate::BufResult<(), FixedBuf> {
if pos.checked_add(buf.bytes_init() as u64).is_none() {
return (
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"buffer too large for file",
)),
return Err(BufError(
io::Error::new(io::ErrorKind::InvalidInput, "buffer too large for file"),
buf.into_inner(),
);
));
}

while buf.bytes_init() != 0 {
let (res, slice) = self.write_fixed_at(buf, pos).await;
match res {
Ok(0) => {
return (
Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write whole buffer",
)),
let buf_result = self.write_fixed_at(buf, pos).await;
match buf_result {
Ok((0, slice)) => {
return Err(BufError(
io::Error::new(io::ErrorKind::WriteZero, "failed to write whole buffer"),
slice.into_inner(),
)
))
}
Ok(n) => {
Ok((n, slice)) => {
pos += n as u64;
buf = slice.slice(n..);
}
Expand All @@ -749,11 +730,10 @@ impl File {
// crate's design ensures we are not calling the 'wait' option
// in the ENTER syscall. Only an Enter with 'wait' can generate
// an EINTR according to the io_uring man pages.
Err(e) => return (Err(e), slice.into_inner()),
Err(e) => return Err(e.map_buf(|slice| slice.into_inner())),
};
}

(Ok(()), buf.into_inner())
Ok(((), buf.into_inner()))
}

/// Attempts to sync all OS-internal metadata to disk.
Expand Down
7 changes: 5 additions & 2 deletions src/io/read.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::BufResult;
use crate::{BufError, BufResult};

use crate::runtime::driver::op::{Completable, CqeResult, Op};
use crate::runtime::CONTEXT;
Expand Down Expand Up @@ -59,6 +59,9 @@ where
}
}

(res, buf)
match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(BufError(e, buf)),
}
}
}
7 changes: 5 additions & 2 deletions src/io/read_fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::buf::fixed::FixedBuf;
use crate::buf::BoundedBufMut;
use crate::io::SharedFd;
use crate::runtime::driver::op::{self, Completable, Op};
use crate::BufResult;
use crate::{BufError, BufResult};

use crate::runtime::CONTEXT;
use std::io;
Expand Down Expand Up @@ -68,6 +68,9 @@ where
}
}

(res, buf)
match res {
Ok(n) => Ok((n, buf)),
Err(e) => Err(BufError(e, buf)),
}
}
}
Loading
Loading