Skip to content

Commit

Permalink
Replace mpsc::{Sender,Receiver} by VecDeque (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
soywod authored Jan 21, 2025
1 parent a7d22f0 commit 6fe22ed
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 190 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ bufstream = "0.1.3"
imap-proto = "0.16.1"
nom = { version = "7.1.0", default-features = false }
base64 = "0.22"
chrono = { version = "0.4", default-features = false, features = ["std"]}
chrono = { version = "0.4.37", default-features = false, features = ["std"]}
lazy_static = "1.4"
ouroboros = "0.18.0"

Expand Down
75 changes: 37 additions & 38 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use bufstream::BufStream;
use chrono::{DateTime, FixedOffset};
use imap_proto::Response;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::io::{Read, Write};
use std::ops::{Deref, DerefMut};
use std::str;
use std::sync::mpsc;

use super::authenticator::Authenticator;
use super::error::{Bad, Bye, Error, No, ParseError, Result, TagMismatch, ValidateError};
Expand Down Expand Up @@ -141,11 +141,9 @@ fn validate_sequence_set(
#[derive(Debug)]
pub struct Session<T: Read + Write> {
conn: Connection<T>,
pub(crate) unsolicited_responses_tx: mpsc::Sender<UnsolicitedResponse>,

/// Server responses that are not related to the current command. See also the note on
/// [unilateral server responses in RFC 3501](https://tools.ietf.org/html/rfc3501#section-7).
pub unsolicited_responses: mpsc::Receiver<UnsolicitedResponse>,
pub(crate) unsolicited_responses: VecDeque<UnsolicitedResponse>,
}

/// An (unauthenticated) handle to talk to an IMAP server. This is what you get when first
Expand Down Expand Up @@ -262,7 +260,7 @@ impl<'a, T: Read + Write> AppendCmd<'a, T> {
self.session.stream.flush()?;
self.session
.read_response()
.and_then(|(lines, _)| parse_append(&lines, &mut self.session.unsolicited_responses_tx))
.and_then(|(lines, _)| parse_append(&lines, &mut self.session.unsolicited_responses))
}
}

Expand Down Expand Up @@ -370,10 +368,10 @@ impl<T: Read + Write> Client<T> {
///
/// This allows reading capabilities before authentication.
pub fn capabilities(&mut self) -> Result<Capabilities> {
// Create a temporary channel as we do not care about out of band responses before login
let (mut tx, _rx) = mpsc::channel();
// Create a temporary vec deque as we do not care about out of band responses before login
let mut unsolicited_responses = VecDeque::new();
self.run_command_and_read_response("CAPABILITY")
.and_then(|lines| Capabilities::parse(lines, &mut tx))
.and_then(|lines| Capabilities::parse(lines, &mut unsolicited_responses))
}

/// Log in to the IMAP server. Upon success a [`Session`](struct.Session.html) instance is
Expand Down Expand Up @@ -530,14 +528,17 @@ impl<T: Read + Write> Client<T> {
impl<T: Read + Write> Session<T> {
// not public, just to avoid duplicating the channel creation code
fn new(conn: Connection<T>) -> Self {
let (tx, rx) = mpsc::channel();
Session {
conn,
unsolicited_responses: rx,
unsolicited_responses_tx: tx,
unsolicited_responses: VecDeque::new(),
}
}

/// Takes all the unsolicited responses received thus far.
pub fn take_all_unsolicited(&mut self) -> impl ExactSizeIterator<Item = UnsolicitedResponse> {
std::mem::take(&mut self.unsolicited_responses).into_iter()
}

/// Selects a mailbox
///
/// The `SELECT` command selects a mailbox so that messages in the mailbox can be accessed.
Expand All @@ -561,7 +562,7 @@ impl<T: Read + Write> Session<T> {
"SELECT {}",
validate_str("SELECT", "mailbox", mailbox_name.as_ref())?
))
.and_then(|(lines, _)| parse_mailbox(&lines[..], &mut self.unsolicited_responses_tx))
.and_then(|(lines, _)| parse_mailbox(&lines[..], &mut self.unsolicited_responses))
}

/// The `EXAMINE` command is identical to [`Session::select`] and returns the same output;
Expand All @@ -573,7 +574,7 @@ impl<T: Read + Write> Session<T> {
"EXAMINE {}",
validate_str("EXAMINE", "mailbox", mailbox_name.as_ref())?
))
.and_then(|(lines, _)| parse_mailbox(&lines[..], &mut self.unsolicited_responses_tx))
.and_then(|(lines, _)| parse_mailbox(&lines[..], &mut self.unsolicited_responses))
}

/// Fetch retrieves data associated with a set of messages in the mailbox.
Expand Down Expand Up @@ -640,15 +641,15 @@ impl<T: Read + Write> Session<T> {
query: impl AsRef<str>,
) -> Result<Fetches> {
if sequence_set.as_ref().is_empty() {
Fetches::parse(vec![], &mut self.unsolicited_responses_tx)
Fetches::parse(vec![], &mut self.unsolicited_responses)
} else {
let synopsis = "FETCH";
self.run_command_and_read_response(&format!(
"FETCH {} {}",
validate_sequence_set(synopsis, "seq", sequence_set.as_ref())?,
validate_str_noquote(synopsis, "query", query.as_ref())?
))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses))
}
}

Expand All @@ -660,22 +661,22 @@ impl<T: Read + Write> Session<T> {
query: impl AsRef<str>,
) -> Result<Fetches> {
if uid_set.as_ref().is_empty() {
Fetches::parse(vec![], &mut self.unsolicited_responses_tx)
Fetches::parse(vec![], &mut self.unsolicited_responses)
} else {
let synopsis = "UID FETCH";
self.run_command_and_read_response(&format!(
"UID FETCH {} {}",
validate_sequence_set(synopsis, "seq", uid_set.as_ref())?,
validate_str_noquote(synopsis, "query", query.as_ref())?
))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses))
}
}

/// Noop always succeeds, and it does nothing.
pub fn noop(&mut self) -> Result<()> {
self.run_command_and_read_response("NOOP")
.and_then(|lines| parse_noop(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| parse_noop(lines, &mut self.unsolicited_responses))
}

/// Logout informs the server that the client is done with the connection.
Expand Down Expand Up @@ -807,7 +808,7 @@ impl<T: Read + Write> Session<T> {
/// one of the listed capabilities. See [`Capabilities`] for further details.
pub fn capabilities(&mut self) -> Result<Capabilities> {
self.run_command_and_read_response("CAPABILITY")
.and_then(|lines| Capabilities::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| Capabilities::parse(lines, &mut self.unsolicited_responses))
}

/// The [`EXPUNGE` command](https://tools.ietf.org/html/rfc3501#section-6.4.3) permanently
Expand All @@ -816,7 +817,7 @@ impl<T: Read + Write> Session<T> {
pub fn expunge(&mut self) -> Result<Deleted> {
self.run_command("EXPUNGE")?;
self.read_response()
.and_then(|(lines, _)| parse_expunge(lines, &mut self.unsolicited_responses_tx))
.and_then(|(lines, _)| parse_expunge(lines, &mut self.unsolicited_responses))
}

/// The [`UID EXPUNGE` command](https://tools.ietf.org/html/rfc4315#section-2.1) permanently
Expand Down Expand Up @@ -844,7 +845,7 @@ impl<T: Read + Write> Session<T> {
pub fn uid_expunge(&mut self, uid_set: impl AsRef<str>) -> Result<Deleted> {
self.run_command(&format!("UID EXPUNGE {}", uid_set.as_ref()))?;
self.read_response()
.and_then(|(lines, _)| parse_expunge(lines, &mut self.unsolicited_responses_tx))
.and_then(|(lines, _)| parse_expunge(lines, &mut self.unsolicited_responses))
}

/// The [`CHECK` command](https://tools.ietf.org/html/rfc3501#section-6.4.1) requests a
Expand Down Expand Up @@ -934,7 +935,7 @@ impl<T: Read + Write> Session<T> {
sequence_set.as_ref(),
query.as_ref()
))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses))
}

/// Equivalent to [`Session::store`], except that all identifiers in `sequence_set` are
Expand All @@ -949,7 +950,7 @@ impl<T: Read + Write> Session<T> {
uid_set.as_ref(),
query.as_ref()
))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| Fetches::parse(lines, &mut self.unsolicited_responses))
}

/// The [`COPY` command](https://tools.ietf.org/html/rfc3501#section-6.4.7) copies the
Expand Down Expand Up @@ -1084,7 +1085,7 @@ impl<T: Read + Write> Session<T> {
quote!(reference_name.unwrap_or("")),
mailbox_pattern.unwrap_or("\"\"")
))
.and_then(|lines| Names::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| Names::parse(lines, &mut self.unsolicited_responses))
}

/// The [`LSUB` command](https://tools.ietf.org/html/rfc3501#section-6.3.9) returns a subset of
Expand Down Expand Up @@ -1112,7 +1113,7 @@ impl<T: Read + Write> Session<T> {
quote!(reference_name.unwrap_or("")),
mailbox_pattern.unwrap_or("")
))
.and_then(|lines| Names::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| Names::parse(lines, &mut self.unsolicited_responses))
}

/// The [`STATUS` command](https://tools.ietf.org/html/rfc3501#section-6.3.10) requests the
Expand Down Expand Up @@ -1161,9 +1162,7 @@ impl<T: Read + Write> Session<T> {
validate_str("STATUS", "mailbox", mailbox_name)?,
data_items.as_ref()
))
.and_then(|lines| {
parse_status(&lines[..], mailbox_name, &mut self.unsolicited_responses_tx)
})
.and_then(|lines| parse_status(&lines[..], mailbox_name, &mut self.unsolicited_responses))
}

/// This method returns a handle that lets you use the [`IDLE`
Expand Down Expand Up @@ -1264,15 +1263,15 @@ impl<T: Read + Write> Session<T> {
/// - `SINCE <date>`: Messages whose internal date (disregarding time and timezone) is within or later than the specified date.
pub fn search(&mut self, query: impl AsRef<str>) -> Result<HashSet<Seq>> {
self.run_command_and_read_response(&format!("SEARCH {}", query.as_ref()))
.and_then(|lines| parse_id_set(&lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| parse_id_set(&lines, &mut self.unsolicited_responses))
}

/// Equivalent to [`Session::search`], except that the returned identifiers
/// are [`Uid`] instead of [`Seq`]. See also the [`UID`
/// command](https://tools.ietf.org/html/rfc3501#section-6.4.8).
pub fn uid_search(&mut self, query: impl AsRef<str>) -> Result<HashSet<Uid>> {
self.run_command_and_read_response(&format!("UID SEARCH {}", query.as_ref()))
.and_then(|lines| parse_id_set(&lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| parse_id_set(&lines, &mut self.unsolicited_responses))
}

/// This issues the [SORT command](https://tools.ietf.org/html/rfc5256#section-3),
Expand All @@ -1292,7 +1291,7 @@ impl<T: Read + Write> Session<T> {
charset,
query.as_ref()
))
.and_then(|lines| parse_id_seq(&lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| parse_id_seq(&lines, &mut self.unsolicited_responses))
}

/// Equivalent to [`Session::sort`], except that it returns [`Uid`]s.
Expand All @@ -1310,7 +1309,7 @@ impl<T: Read + Write> Session<T> {
charset,
query.as_ref()
))
.and_then(|lines| parse_id_seq(&lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| parse_id_seq(&lines, &mut self.unsolicited_responses))
}

/// The [`SETACL` command](https://datatracker.ietf.org/doc/html/rfc4314#section-3.1)
Expand Down Expand Up @@ -1373,7 +1372,7 @@ impl<T: Read + Write> Session<T> {
"GETACL {}",
validate_str("GETACL", "mailbox", mailbox_name.as_ref())?
))
.and_then(|lines| AclResponse::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| AclResponse::parse(lines, &mut self.unsolicited_responses))
}

/// The [`LISTRIGHTS` command](https://datatracker.ietf.org/doc/html/rfc4314#section-3.4)
Expand All @@ -1394,7 +1393,7 @@ impl<T: Read + Write> Session<T> {
validate_str("LISTRIGHTS", "mailbox", mailbox_name.as_ref())?,
validate_str("LISTRIGHTS", "identifier", identifier.as_ref())?
))
.and_then(|lines| ListRightsResponse::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| ListRightsResponse::parse(lines, &mut self.unsolicited_responses))
}

/// The [`MYRIGHTS` command](https://datatracker.ietf.org/doc/html/rfc4314#section-3.5)
Expand All @@ -1408,7 +1407,7 @@ impl<T: Read + Write> Session<T> {
"MYRIGHTS {}",
validate_str("MYRIGHTS", "mailbox", mailbox_name.as_ref())?,
))
.and_then(|lines| MyRightsResponse::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| MyRightsResponse::parse(lines, &mut self.unsolicited_responses))
}

/// The [`SETQUOTA` command](https://datatracker.ietf.org/doc/html/rfc2087#section-4.1)
Expand All @@ -1428,7 +1427,7 @@ impl<T: Read + Write> Session<T> {
validate_str("SETQUOTA", "quota_root", quota_root.as_ref())?,
limits,
))
.and_then(|lines| QuotaResponse::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| QuotaResponse::parse(lines, &mut self.unsolicited_responses))
}

/// The [`GETQUOTA` command](https://datatracker.ietf.org/doc/html/rfc2087#section-4.2)
Expand All @@ -1439,7 +1438,7 @@ impl<T: Read + Write> Session<T> {
"GETQUOTA {}",
validate_str("GETQUOTA", "quota_root", quota_root.as_ref())?
))
.and_then(|lines| QuotaResponse::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| QuotaResponse::parse(lines, &mut self.unsolicited_responses))
}

/// The [`GETQUOTAROOT` command](https://datatracker.ietf.org/doc/html/rfc2087#section-4.3)
Expand All @@ -1450,7 +1449,7 @@ impl<T: Read + Write> Session<T> {
"GETQUOTAROOT {}",
validate_str("GETQUOTAROOT", "mailbox", mailbox_name.as_ref())?
))
.and_then(|lines| QuotaRootResponse::parse(lines, &mut self.unsolicited_responses_tx))
.and_then(|lines| QuotaRootResponse::parse(lines, &mut self.unsolicited_responses))
}

// these are only here because they are public interface, the rest is in `Connection`
Expand Down
16 changes: 8 additions & 8 deletions src/extensions/list_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::parse::try_handle_unilateral;
use crate::types::{Mailbox, Name, UnsolicitedResponse};
use imap_proto::types::{MailboxDatum, Response, StatusAttribute};
use ouroboros::self_referencing;
use std::collections::VecDeque;
use std::io::{Read, Write};
use std::slice::Iter;
use std::sync::mpsc;

/// A wrapper for one or more [`Name`] responses paired with optional [`Mailbox`] responses.
///
Expand All @@ -27,7 +27,7 @@ impl ExtendedNames {
/// Parse one or more LIST-STATUS responses from a response buffer
pub(crate) fn parse(
owned: Vec<u8>,
unsolicited: &mut mpsc::Sender<UnsolicitedResponse>,
unsolicited: &mut VecDeque<UnsolicitedResponse>,
) -> core::result::Result<Self, Error> {
ExtendedNamesTryBuilder {
data: owned,
Expand Down Expand Up @@ -145,13 +145,13 @@ impl<T: Read + Write> Session<T> {
data_items: &str,
) -> Result<ExtendedNames> {
let reference = validate_str("LIST-STATUS", "reference", reference_name.unwrap_or(""))?;
self.run_command_and_read_response(&format!(
let lines = self.run_command_and_read_response(format!(
"LIST {} {} RETURN (STATUS {})",
&reference,
mailbox_pattern.unwrap_or("\"\""),
data_items
))
.and_then(|lines| ExtendedNames::parse(lines, &mut self.unsolicited_responses_tx))
))?;
ExtendedNames::parse(lines, &mut self.unsolicited_responses)
}
}

Expand All @@ -171,9 +171,9 @@ mod tests {
* LIST (\\UnMarked) \".\" feeds\r\n\
* LIST () \".\" feeds.test\r\n\
* STATUS feeds.test (HIGHESTMODSEQ 757)\r\n";
let (mut send, recv) = mpsc::channel();
let fetches = ExtendedNames::parse(lines.to_vec(), &mut send).unwrap();
assert!(recv.try_recv().is_err());
let mut queue = VecDeque::new();
let fetches = ExtendedNames::parse(lines.to_vec(), &mut queue).unwrap();
assert_eq!(queue.pop_front(), None);
assert!(!fetches.is_empty());
assert_eq!(fetches.len(), 4);
let (name, status) = fetches.get(0).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions src/extensions/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use crate::error::{Error, ParseError, Result};
use crate::parse::try_handle_unilateral;
use crate::types::*;
use imap_proto::types::{MailboxDatum, Metadata, Response, ResponseCode};
use std::collections::VecDeque;
use std::io::{Read, Write};
use std::sync::mpsc;

// for intra-doc links
#[allow(unused_imports)]
Expand Down Expand Up @@ -83,7 +83,7 @@ impl MetadataDepth {

fn parse_metadata<'a>(
mut lines: &'a [u8],
unsolicited: &'a mut mpsc::Sender<UnsolicitedResponse>,
unsolicited: &'a mut VecDeque<UnsolicitedResponse>,
) -> Result<Vec<Metadata>> {
let mut res: Vec<Metadata> = Vec::new();
loop {
Expand Down Expand Up @@ -197,7 +197,7 @@ impl<T: Read + Write> Session<T> {
.as_str(),
);
let (lines, ok) = self.run(command)?;
let meta = parse_metadata(&lines[..ok], &mut self.unsolicited_responses_tx)?;
let meta = parse_metadata(&lines[..ok], &mut self.unsolicited_responses)?;
let missed = if maxsize.is_some() {
if let Ok((_, Response::Done { code, .. })) =
imap_proto::parser::parse_response(&lines[ok..])
Expand Down
Loading

0 comments on commit 6fe22ed

Please sign in to comment.