Skip to content

Commit

Permalink
@clux's feedback
Browse files Browse the repository at this point in the history
Signed-off-by: Matei David <[email protected]>
  • Loading branch information
mateiidavid committed Apr 17, 2024
1 parent de2eda1 commit 276b75e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
10 changes: 10 additions & 0 deletions kube-runtime/src/reflector/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone,
{
/// Creates and returns a new self that wraps a broadcast sender and an
/// inactive broadcast receiver
///
/// A buffer size is required to create the underlying broadcast channel.
/// Messages will be buffered until all active readers have received a copy
/// of the message. When the channel is full, senders will apply
/// backpressure by waiting for space to free up.
//
// N.B messages are eagerly broadcasted, meaning no active receivers are
// required for a message to be broadcasted.
pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
// Create a broadcast (tx, rx) pair
let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
Expand Down
11 changes: 2 additions & 9 deletions kube-runtime/src/reflector/object_ref.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use derivative::Derivative;
use k8s_openapi::{api::core::v1::ObjectReference, apimachinery::pkg::apis::meta::v1::OwnerReference};
#[cfg(doc)] use kube_client::core::ObjectMeta;
#[cfg(doc)]
use kube_client::core::ObjectMeta;
use kube_client::{
api::{DynamicObject, Resource},
core::api_version_from_group_version,
Expand All @@ -9,7 +10,6 @@ use std::{
borrow::Cow,
fmt::{Debug, Display},
hash::Hash,
sync::Arc,
};

/// Minimal lookup behaviour needed by a [reflector store](super::Store).
Expand Down Expand Up @@ -202,13 +202,6 @@ impl<K: Lookup> ObjectRef<K> {
obj.to_object_ref(dyntype)
}

pub fn from_shared_obj_with(obj: &Arc<K>, dyntype: K::DynamicType) -> Self
where
K: Lookup,
{
obj.as_ref().to_object_ref(dyntype)
}

/// Create an `ObjectRef` from an `OwnerReference`
///
/// Returns `None` if the types do not match.
Expand Down
7 changes: 5 additions & 2 deletions kube-runtime/src/reflector/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ where
/// If the dynamic type is default-able (for example when writer is used with
/// `k8s_openapi` types) you can use `Default` instead.
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn new_shared(dyntype: K::DynamicType, buf_size: usize) -> Self {
pub fn new_shared(buf_size: usize, dyntype: K::DynamicType) -> Self {
let (ready_tx, ready_rx) = DelayedInit::new();
Writer {
store: Default::default(),
Expand Down Expand Up @@ -264,6 +264,9 @@ where
/// The resulting `Writer` can be subscribed on in order to fan out events from
/// a watcher. The `Writer` should be passed to a [`reflector`](crate::reflector()),
/// and the [`Store`] is a read-only handle.
///
/// A buffer size is used for the underlying message channel. When the buffer is
/// full, backpressure will be applied by waiting for capacity.
#[must_use]
#[allow(clippy::module_name_repetitions)]
#[cfg(feature = "unstable-runtime-subscribe")]
Expand All @@ -272,7 +275,7 @@ where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + Hash + Clone + Default,
{
let w = Writer::<K>::new_shared(Default::default(), buf_size);
let w = Writer::<K>::new_shared(buf_size, Default::default());
let r = w.as_reader();
(r, w)
}
Expand Down

0 comments on commit 276b75e

Please sign in to comment.