Skip to content

Commit

Permalink
Merge pull request #232 from QuantumEntangledAndy/keep_alive
Browse files Browse the repository at this point in the history
Limit UDP keep alive
  • Loading branch information
thirtythreeforty authored Dec 12, 2021
2 parents 324d14e + 0145983 commit 57c55e4
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 22 deletions.
31 changes: 16 additions & 15 deletions crates/core/src/bc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,23 +186,24 @@ impl BcCamera {
};

if let Some(conn) = &me.connection {
let keep_alive_msg = Bc {
meta: BcMeta {
msg_id: MSG_ID_UDP_KEEP_ALIVE,
channel_id: me.channel_id,
msg_num: me.new_message_num(),
stream_type: 0,
response_code: 0,
class: 0x6414,
},
body: BcBody::ModernMsg(ModernMsg {
..Default::default()
}),
};
if conn.is_udp() {
let keep_alive_msg = Bc {
meta: BcMeta {
msg_id: MSG_ID_UDP_KEEP_ALIVE,
channel_id: me.channel_id,
msg_num: me.new_message_num(),
stream_type: 0,
response_code: 0,
class: 0x6414,
},
body: BcBody::ModernMsg(ModernMsg {
..Default::default()
}),
};

conn.set_keep_alive_msg(keep_alive_msg);
conn.set_keep_alive_msg(keep_alive_msg);
}
}

Ok(me)
}

Expand Down
32 changes: 25 additions & 7 deletions crates/core/src/bc_protocol/connection/bcconn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::net::{SocketAddr, TcpStream};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{atomic::AtomicBool, atomic::Ordering, Arc, Mutex};
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::{Duration, Instant};

/// A shareable connection to a camera. Handles serialization of messages. To send/receive, call
/// .[subscribe()] with a message ID. You can use the BcSubscription to send or receive only
Expand Down Expand Up @@ -46,8 +46,10 @@ impl BcConnection {
let keep_alive_encryption_protocol = connections_encryption_protocol.clone();
let mut context = BcContext::new(connections_encryption_protocol);
let mut result;
let mut last_keep_alive = Instant::now();
let keep_alive_time = Duration::from_millis(500);
loop {
result = Self::poll(&mut context, &conn, &mut subs);
result = Self::poll(&mut context, &conn, &mut subs, &connections_keep_alive_msg);
if poll_abort_rx.load(Ordering::Relaxed) {
break; // Poll has been aborted by request usally during disconnect
}
Expand All @@ -61,11 +63,14 @@ impl BcConnection {
break;
}
// Send a udp keep alive if set
if let Ok(lock) = connections_keep_alive_msg.try_lock() {
if let Some(keep_alive_msg) = lock.as_ref() {
let _ = keep_alive_msg
.serialize(&conn, &keep_alive_encryption_protocol.lock().unwrap());
let _ = conn.flush();
if last_keep_alive.elapsed() > keep_alive_time {
last_keep_alive = Instant::now();
if let Ok(lock) = connections_keep_alive_msg.try_lock() {
if let Some(keep_alive_msg) = lock.as_ref() {
let _ = keep_alive_msg
.serialize(&conn, &keep_alive_encryption_protocol.lock().unwrap());
let _ = conn.flush();
}
}
}
}
Expand Down Expand Up @@ -117,10 +122,15 @@ impl BcConnection {
(*self.encryption_protocol.lock().unwrap()).clone()
}

pub fn is_udp(&self) -> bool {
self.sink.lock().unwrap().is_udp()
}

fn poll(
context: &mut BcContext,
connection: &BcSource,
subscribers: &mut Arc<Mutex<BTreeMap<u32, Sender<Bc>>>>,
connections_keep_alive_msg: &Arc<Mutex<Option<Bc>>>,
) -> Result<()> {
// Don't hold the lock during deserialization so we don't poison the subscribers mutex if
// something goes wrong
Expand All @@ -144,6 +154,14 @@ impl BcConnection {
if msg_id != MSG_ID_UDP_KEEP_ALIVE {
debug!("Ignoring uninteresting message ID {}", msg_id);
trace!("Contents: {:?}", response);
} else {
// This is a keep alive message let see what the camera says about it
if response.meta.response_code != 200 {
// Camera dosen't support the current keep alive message stop sending them
if let Ok(mut lock) = connections_keep_alive_msg.try_lock() {
*lock = None;
}
}
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/bc_protocol/connection/bcsource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ pub enum BcSource {
}

impl BcSource {
pub fn is_udp(&self) -> bool {
matches!(self, BcSource::Udp(_))
}

pub fn new_tcp(addr: SocketAddr, timeout: Duration) -> Result<Self> {
let source = TcpSource::new(addr, timeout)?;
Ok(BcSource::Tcp(Mutex::new(source)))
Expand Down

0 comments on commit 57c55e4

Please sign in to comment.