Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide reason string in forced disconnects to allow Zeek to identify backpressure overflow, plus two fixes #436

Merged
merged 3 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions libbroker/broker/expected.hh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public:

// -- constructors, destructors, and assignment operators --------------------

template <class U>
expected(U x, std::enable_if_t<std::is_convertible_v<U, T>>* = nullptr)
: engaged_(true) {
new (std::addressof(value_)) T(std::move(x));
}

expected(T&& x) noexcept(nothrow_move) : engaged_(true) {
new (std::addressof(value_)) T(std::move(x));
}
Expand Down
3 changes: 2 additions & 1 deletion libbroker/broker/internal/connector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1944,7 +1944,8 @@ void connector::run_impl(listener* sub, shared_filter_type* filter) {
} else {
mgr.abort(*i);
}
while ((i->revents & read_mask) && mgr.must_read_more(*i))
while ((i->revents & read_mask) && (i->events & read_mask)
&& mgr.must_read_more(*i))
mgr.continue_reading(*i);
}
} while (--presult > 0 && advance());
Expand Down
4 changes: 2 additions & 2 deletions libbroker/broker/internal/core_actor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ void core_actor_state::shutdown(shutdown_options options) {
void core_actor_state::finalize_shutdown() {
// Drop any remaining state of peers.
for (auto& kvp : peers)
kvp.second->force_disconnect();
kvp.second->force_disconnect("shutting down");
peers.clear();
// Close the shared state for all peers.
peer_statuses->close();
Expand Down Expand Up @@ -873,7 +873,7 @@ caf::error core_actor_state::init_new_peer(endpoint_id peer_id,
.on_backpressure_buffer(peer_buffer_size(), peer_overflow_policy())
.do_on_error([this, ptr, peer_id](const caf::error& what) {
BROKER_INFO("remove peer" << peer_id << "due to:" << what);
ptr->force_disconnect();
ptr->force_disconnect(to_string(what));
})
.as_observable());
// Push messages received from the peer into the central merge point.
Expand Down
19 changes: 14 additions & 5 deletions libbroker/broker/internal/peering.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

#include <caf/scheduled_actor/flow.hpp>

using namespace std::literals;

namespace broker::internal {

namespace {
Expand Down Expand Up @@ -94,9 +96,14 @@ class suffix_generator : public affix_generator {

node_message first() override {
if (ptr_->removed()) {
auto msg = "removed connection to remote peer"s;
if (const auto& reason = ptr_->removed_reason(); !reason.empty()) {
msg += " (";
msg += reason;
msg += ')';
}
return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()},
sc_constant<sc::peer_removed>(),
"removed connection to remote peer");
sc_constant<sc::peer_removed>(), msg.c_str());
} else {
return make_status_msg(endpoint_info{ptr_->peer_id(), ptr_->addr()},
sc_constant<sc::peer_lost>(),
Expand All @@ -119,18 +126,20 @@ void peering::on_bye_ack() {
bye_timeout_.dispose();
}

void peering::force_disconnect() {
void peering::force_disconnect(std::string reason) {
if (!removed_) {
removed_ = true;
removed_reason_ = std::move(reason);
}
on_bye_ack();
}

void peering::schedule_bye_timeout(caf::scheduled_actor* self) {
bye_timeout_.dispose();
bye_timeout_ =
self->run_delayed(defaults::unpeer_timeout,
[ptr = shared_from_this()] { ptr->force_disconnect(); });
self->run_delayed(defaults::unpeer_timeout, [ptr = shared_from_this()] {
ptr->force_disconnect("timeout during graceful disconnect");
});
}

void peering::assign_bye_token(std::array<std::byte, bye_token_size>& buf) {
Expand Down
9 changes: 8 additions & 1 deletion libbroker/broker/internal/peering.hh
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public:

/// Forces the peering to shut down its connection without performing the BYE
/// handshake.
void force_disconnect();
void force_disconnect(std::string reason);

void schedule_bye_timeout(caf::scheduled_actor* self);

Expand All @@ -63,6 +63,11 @@ public:
return removed_;
}

/// Returns the removal reason, which may be empty.
const std::string& removed_reason() const noexcept {
return removed_reason_;
}

/// Tag this peering as removed and send a BYE message on the `snk` for a
/// graceful shutdown.
void remove(caf::scheduled_actor* self,
Expand Down Expand Up @@ -117,6 +122,8 @@ private:
/// BYE message to the peer.
bool removed_ = false;

std::string removed_reason_;

/// Network address as reported from the transport (usually TCP).
network_info addr_;

Expand Down
Loading