Skip to content

Commit

Permalink
fix(asio): Fix the crash caused by early invalidated socket
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Sep 9, 2024
1 parent d0e9260 commit 221f5b3
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 25 deletions.
21 changes: 15 additions & 6 deletions src/runtime/rpc/asio_rpc_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void asio_rpc_session::do_read(int read_next)
} else {
LOG_ERROR("asio read from {} failed: {}", _remote_addr, ec.message());
}
on_failure();
on_failure(false);
} else {
_reader.mark_read(length);

Expand All @@ -151,7 +151,7 @@ void asio_rpc_session::do_read(int read_next)

if (read_next == -1) {
LOG_ERROR("asio read from {} failed", _remote_addr);
on_failure();
on_failure(false);
} else {
start_read_next(read_next);
}
Expand Down Expand Up @@ -197,16 +197,25 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net,
set_options();
}

void asio_rpc_session::close()
asio_rpc_session::~asio_rpc_session()
{
// Because every async_* invoking adds the reference counter and releases the reference counter
// in corresponding callback, it's certain that the reference counter is zero in its
// destructor, which means there is no inflight invoking, then it's safe to close the socket.
asio_rpc_session::close();
}

void asio_rpc_session::close()
{
boost::system::error_code ec;
_socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec);
if (ec)
if (ec) {
LOG_WARNING("asio socket shutdown failed, error = {}", ec.message());
}
_socket->close(ec);
if (ec)
if (ec) {
LOG_WARNING("asio socket close failed, error = {}", ec.message());
}
}

void asio_rpc_session::connect()
Expand All @@ -222,7 +231,7 @@ void asio_rpc_session::connect()

set_options();
set_connected();
on_send_completed();
on_send_completed(0);
start_read_next();
} else {
LOG_ERROR(
Expand Down
9 changes: 5 additions & 4 deletions src/runtime/rpc/asio_rpc_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ class asio_rpc_session : public rpc_session
message_parser_ptr &parser,
bool is_client);

~asio_rpc_session() override = default;
~asio_rpc_session() override;

void send(uint64_t signature) override;

// The under layer socket will be invalidated after being closed.
//
// It's needed to prevent the '_socket' to be closed while the socket's async_* interfaces are
// in flight.
void close() override;

void connect() override;
Expand All @@ -69,9 +73,6 @@ class asio_rpc_session : public rpc_session
}
}

private:
// boost::asio::socket is thread-unsafe, must use lock to prevent a
// reading/writing socket being modified or closed concurrently.
std::shared_ptr<boost::asio::ip::tcp::socket> _socket;
};

Expand Down
8 changes: 7 additions & 1 deletion src/runtime/rpc/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,14 @@ bool rpc_session::on_disconnected(bool is_write)

void rpc_session::on_failure(bool is_write)
{
// Just update the state machine here.
if (on_disconnected(is_write)) {
close();
// The under layer socket may be used by async_* interfaces concurrently, it's not thread
// safe to invalidate the '_socket', it should be invalidated when the session is
// destroyed.
LOG_WARNING("disconnect to remote %s, the socket will be lazily closed when the session "
"destroyed",
_remote_addr.to_string());
}
}

Expand Down
6 changes: 2 additions & 4 deletions src/runtime/rpc/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ class rpc_session : public ref_counter
// should always be called in lock
bool unlink_message_for_send();
virtual void send(uint64_t signature) = 0;
void on_send_completed(uint64_t signature = 0);
virtual void on_failure(bool is_write = false);
void on_send_completed(uint64_t signature);
virtual void on_failure(bool is_write);

protected:
///
Expand Down Expand Up @@ -314,7 +314,6 @@ class rpc_session : public ref_counter
uint64_t _message_sent;
// ]

protected:
///
/// change status and check status
///
Expand All @@ -327,7 +326,6 @@ class rpc_session : public ref_counter
void clear_send_queue(bool resend_msgs);
bool on_disconnected(bool is_write);

protected:
// constant info
connection_oriented_network &_net;
dsn::rpc_address _remote_addr;
Expand Down
20 changes: 10 additions & 10 deletions src/runtime/rpc/network.sim.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ class sim_client_session : public rpc_session
::dsn::rpc_address remote_addr,
message_parser_ptr &parser);

virtual void connect();
void connect() override;

virtual void send(uint64_t signature) override;
void send(uint64_t signature) override;

virtual void do_read(int sz) override {}
void do_read(int sz) override {}

virtual void close() override {}
void close() override {}

virtual void on_failure(bool is_write = false) override {}
void on_failure(bool is_write) override {}
};

class sim_server_session : public rpc_session
Expand All @@ -69,15 +69,15 @@ class sim_server_session : public rpc_session
rpc_session_ptr &client,
message_parser_ptr &parser);

virtual void send(uint64_t signature) override;
void send(uint64_t signature) override;

virtual void connect() {}
void connect() override {}

virtual void do_read(int sz) override {}
void do_read(int sz) override {}

virtual void close() override {}
void close() override {}

virtual void on_failure(bool is_write = false) override {}
void on_failure(bool is_write) override {}

private:
rpc_session_ptr _client;
Expand Down

0 comments on commit 221f5b3

Please sign in to comment.