Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

. #3552

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft

. #3552

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions rs/p2p/consensus_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,15 @@ mod metrics;
mod receiver;
mod sender;

type StartConsensusManagerFn =
Box<dyn FnOnce(Arc<dyn Transport>, watch::Receiver<SubnetTopology>) -> Vec<Shutdown>>;
type StartConsensusManagerFn = Box<dyn FnOnce(Arc<dyn Transport>) -> Vec<Shutdown>>;

pub struct AbortableBroadcastChannelManager(Vec<StartConsensusManagerFn>);

impl AbortableBroadcastChannelManager {
pub fn start(
self,
transport: Arc<dyn Transport>,
topology_watcher: watch::Receiver<SubnetTopology>,
) -> Vec<Shutdown> {
pub fn start(self, transport: Arc<dyn Transport>) -> Vec<Shutdown> {
let mut ret = vec![];
for client in self.0 {
ret.append(&mut client(transport.clone(), topology_watcher.clone()));
ret.append(&mut client(transport.clone()));
}
ret
}
Expand Down Expand Up @@ -77,6 +72,7 @@ impl AbortableBroadcastChannelBuilder {
D: ArtifactAssembler<Artifact, WireArtifact>,
>(
&mut self,
topology_watcher: watch::Receiver<SubnetTopology>,
(assembler, assembler_router): (F, Router),
slot_limit: usize,
) -> (
Expand All @@ -102,7 +98,7 @@ impl AbortableBroadcastChannelBuilder {
let rt_handle = self.rt_handle.clone();
let metrics_registry = self.metrics_registry.clone();

let builder = move |transport: Arc<dyn Transport>, topology_watcher| {
let builder = move |transport: Arc<dyn Transport>| {
start_consensus_manager(
log,
&metrics_registry,
Expand Down
12 changes: 8 additions & 4 deletions rs/p2p/consensus_manager/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,21 +342,25 @@ fn load_test(
let mut jhs = vec![];
let mut nodes = vec![];
let mut cms = vec![];
let (nodes, topology_watcher) = fully_connected_localhost_subnet(rt.handle(), log, id, nodes);
for i in 0..num_peers {
let node = node_test_id(i);
let processor = TestConsensus::new(log.clone(), node, 256 * (i as usize + 1), i % 2 == 0);
let (jh, cm) =
start_consensus_manager(no_op_logger(), rt.handle().clone(), processor.clone());
let (jh, cm) = start_consensus_manager(
no_op_logger(),
rt.handle().clone(),
processor.clone(),
topology_watcher.clone(),
);
jhs.push(jh);
let (r, m) = cm.build();
nodes.push((node, r));
cms.push((node, m));
node_advert_map.insert(node, processor);
}
let (nodes, topology_watcher) = fully_connected_localhost_subnet(rt.handle(), log, id, nodes);
for ((node1, transport), (node2, cm)) in nodes.into_iter().zip(cms.into_iter()) {
assert!(node1 == node2);
cm.start(transport, topology_watcher.clone());
cm.start(transport);
}

rt.block_on(async move {
Expand Down
4 changes: 3 additions & 1 deletion rs/p2p/test_utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ pub fn start_consensus_manager(
log: ReplicaLogger,
rt_handle: Handle,
processor: TestConsensus<U64Artifact>,
topology_watcher: tokio::sync::watch::Receiver<SubnetTopology>,
) -> (
Box<dyn JoinGuard>,
ic_consensus_manager::AbortableBroadcastChannelBuilder,
Expand All @@ -467,7 +468,8 @@ pub fn start_consensus_manager(
rt_handle.clone(),
MetricsRegistry::default(),
);
let (outbound_tx, inbound_rx) = cm1.abortable_broadcast_channel(downloader, usize::MAX);
let (outbound_tx, inbound_rx) =
cm1.abortable_broadcast_channel(topology_watcher, downloader, usize::MAX);

let artifact_processor_jh = start_test_processor(
outbound_tx,
Expand Down
9 changes: 6 additions & 3 deletions rs/p2p/test_utils/src/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,11 @@ pub fn add_transport_to_sim<F>(
bouncer_factory,
MetricsRegistry::default(),
);
let (outbound_tx, inbound_tx) =
consensus_builder.abortable_broadcast_channel(downloader, usize::MAX);
let (outbound_tx, inbound_tx) = consensus_builder.abortable_broadcast_channel(
topology_watcher_clone.clone(),
downloader,
usize::MAX,
);

let artifact_processor_jh = start_test_processor(
outbound_tx,
Expand Down Expand Up @@ -412,7 +415,7 @@ pub fn add_transport_to_sim<F>(
));

if let Some((_, con_manager)) = con {
con_manager.start(transport.clone(), topology_watcher_clone.clone());
con_manager.start(transport.clone());
}

if let Some(state_sync_manager) = state_sync_manager {
Expand Down
Loading