diff --git a/async_grpc/rpc.cc b/async_grpc/rpc.cc index f29e84b..0d2b4b2 100644 --- a/async_grpc/rpc.cc +++ b/async_grpc/rpc.cc @@ -102,6 +102,12 @@ void Rpc::OnReadsDone() { handler_->OnReadsDone(); } void Rpc::OnFinish() { handler_->OnFinish(); } +void Rpc::OnServerShutdown() { + if (handler_) { + handler_->OnServerShutdown(); + } +} + void Rpc::RequestNextMethodInvocation() { // Ask gRPC to notify us when the connection terminates. SetRpcEventState(Event::DONE, true); @@ -385,4 +391,11 @@ std::weak_ptr ActiveRpcs::GetWeakPtr(Rpc* rpc) { return it->second; } +void ActiveRpcs::Shutdown() { + common::MutexLocker locker(&lock_); + for (auto it : rpcs_) { + it.second->OnServerShutdown(); + } +} + } // namespace async_grpc diff --git a/async_grpc/rpc.h b/async_grpc/rpc.h index 6f01d08..b9448c1 100644 --- a/async_grpc/rpc.h +++ b/async_grpc/rpc.h @@ -111,6 +111,7 @@ class Rpc { void OnRequest(); void OnReadsDone(); void OnFinish(); + void OnServerShutdown(); void RequestNextMethodInvocation(); void RequestStreamingReadIfNeeded(); void HandleSendQueue(); @@ -198,6 +199,7 @@ class ActiveRpcs { std::shared_ptr Add(std::unique_ptr rpc) EXCLUDES(lock_); bool Remove(Rpc* rpc) EXCLUDES(lock_); Rpc::WeakPtrFactory GetWeakPtrFactory(); + void Shutdown(); private: std::weak_ptr GetWeakPtr(Rpc* rpc); diff --git a/async_grpc/rpc_handler_interface.h b/async_grpc/rpc_handler_interface.h index 655fea9..72b2bd8 100644 --- a/async_grpc/rpc_handler_interface.h +++ b/async_grpc/rpc_handler_interface.h @@ -36,6 +36,7 @@ class RpcHandlerInterface { const ::google::protobuf::Message* request) = 0; virtual void OnReadsDone(){}; virtual void OnFinish(){}; + virtual void OnServerShutdown(){}; virtual Span* trace_span() = 0; template static std::unique_ptr Instantiate() { diff --git a/async_grpc/service.cc b/async_grpc/service.cc index d99529d..00296d6 100644 --- a/async_grpc/service.cc +++ b/async_grpc/service.cc @@ -53,7 +53,10 @@ void Service::StartServing( } } -void Service::StopServing() { shutting_down_ = true; } +void Service::StopServing() { + shutting_down_ = true; + active_rpcs_.Shutdown(); +} void Service::HandleEvent(Rpc::Event event, Rpc* rpc, bool ok) { switch (event) {