diff --git a/src/runtime/rpc/asio_rpc_session.cpp b/src/runtime/rpc/asio_rpc_session.cpp index 39ac3e6c04..331d0253e2 100644 --- a/src/runtime/rpc/asio_rpc_session.cpp +++ b/src/runtime/rpc/asio_rpc_session.cpp @@ -130,7 +130,7 @@ void asio_rpc_session::do_read(int read_next) } else { LOG_ERROR("asio read from {} failed: {}", _remote_addr, ec.message()); } - on_failure(); + on_failure(false); } else { _reader.mark_read(length); @@ -151,7 +151,7 @@ void asio_rpc_session::do_read(int read_next) if (read_next == -1) { LOG_ERROR("asio read from {} failed", _remote_addr); - on_failure(); + on_failure(false); } else { start_read_next(read_next); } @@ -197,16 +197,25 @@ asio_rpc_session::asio_rpc_session(asio_network_provider &net, set_options(); } -void asio_rpc_session::close() +asio_rpc_session::~asio_rpc_session() { + // Because every async_* invoking adds the reference counter and releases the reference counter + // in corresponding callback, it's certain that the reference counter is zero in its + // destructor, which means there is no inflight invoking, then it's safe to close the socket. + asio_rpc_session::close(); +} +void asio_rpc_session::close() +{ boost::system::error_code ec; _socket->shutdown(boost::asio::socket_base::shutdown_type::shutdown_both, ec); - if (ec) + if (ec) { LOG_WARNING("asio socket shutdown failed, error = {}", ec.message()); + } _socket->close(ec); - if (ec) + if (ec) { LOG_WARNING("asio socket close failed, error = {}", ec.message()); + } } void asio_rpc_session::connect() @@ -222,7 +231,7 @@ void asio_rpc_session::connect() set_options(); set_connected(); - on_send_completed(); + on_send_completed(0); start_read_next(); } else { LOG_ERROR( diff --git a/src/runtime/rpc/asio_rpc_session.h b/src/runtime/rpc/asio_rpc_session.h index e3f5da4e21..7a7cda8b35 100644 --- a/src/runtime/rpc/asio_rpc_session.h +++ b/src/runtime/rpc/asio_rpc_session.h @@ -51,10 +51,14 @@ class asio_rpc_session : public rpc_session message_parser_ptr &parser, bool is_client); - ~asio_rpc_session() override = default; + ~asio_rpc_session() override; void send(uint64_t signature) override; + // The under layer socket will be invalidated after being closed. + // + // It's needed to prevent the '_socket' to be closed while the socket's async_* interfaces are + // in flight. void close() override; void connect() override; @@ -69,9 +73,6 @@ class asio_rpc_session : public rpc_session } } -private: - // boost::asio::socket is thread-unsafe, must use lock to prevent a - // reading/writing socket being modified or closed concurrently. std::shared_ptr _socket; }; diff --git a/src/runtime/rpc/network.cpp b/src/runtime/rpc/network.cpp index c572b19a8d..ce1dcc757d 100644 --- a/src/runtime/rpc/network.cpp +++ b/src/runtime/rpc/network.cpp @@ -429,8 +429,14 @@ bool rpc_session::on_disconnected(bool is_write) void rpc_session::on_failure(bool is_write) { + // Just update the state machine here. if (on_disconnected(is_write)) { - close(); + // The under layer socket may be used by async_* interfaces concurrently, it's not thread + // safe to invalidate the '_socket', it should be invalidated when the session is + // destroyed. + LOG_WARNING("disconnect to remote %s, the socket will be lazily closed when the session " + "destroyed", + _remote_addr.to_string()); } } diff --git a/src/runtime/rpc/network.h b/src/runtime/rpc/network.h index 5a9bb06094..aca926743d 100644 --- a/src/runtime/rpc/network.h +++ b/src/runtime/rpc/network.h @@ -274,8 +274,8 @@ class rpc_session : public ref_counter // should always be called in lock bool unlink_message_for_send(); virtual void send(uint64_t signature) = 0; - void on_send_completed(uint64_t signature = 0); - virtual void on_failure(bool is_write = false); + void on_send_completed(uint64_t signature); + virtual void on_failure(bool is_write); protected: /// @@ -314,7 +314,6 @@ class rpc_session : public ref_counter uint64_t _message_sent; // ] -protected: /// /// change status and check status /// @@ -327,7 +326,6 @@ class rpc_session : public ref_counter void clear_send_queue(bool resend_msgs); bool on_disconnected(bool is_write); -protected: // constant info connection_oriented_network &_net; dsn::rpc_address _remote_addr; diff --git a/src/runtime/rpc/network.sim.h b/src/runtime/rpc/network.sim.h index f7954afbf1..1e26333444 100644 --- a/src/runtime/rpc/network.sim.h +++ b/src/runtime/rpc/network.sim.h @@ -50,15 +50,15 @@ class sim_client_session : public rpc_session ::dsn::rpc_address remote_addr, message_parser_ptr &parser); - virtual void connect(); + void connect() override; - virtual void send(uint64_t signature) override; + void send(uint64_t signature) override; - virtual void do_read(int sz) override {} + void do_read(int sz) override {} - virtual void close() override {} + void close() override {} - virtual void on_failure(bool is_write = false) override {} + void on_failure(bool is_write) override {} }; class sim_server_session : public rpc_session @@ -69,15 +69,15 @@ class sim_server_session : public rpc_session rpc_session_ptr &client, message_parser_ptr &parser); - virtual void send(uint64_t signature) override; + void send(uint64_t signature) override; - virtual void connect() {} + void connect() override {} - virtual void do_read(int sz) override {} + void do_read(int sz) override {} - virtual void close() override {} + void close() override {} - virtual void on_failure(bool is_write = false) override {} + void on_failure(bool is_write) override {} private: rpc_session_ptr _client;