From 0fe62f60f75a02a9a5fbe91976597ba5aa0b11a8 Mon Sep 17 00:00:00 2001 From: Alan Frindell Date: Sun, 21 Apr 2024 16:38:06 -0700 Subject: [PATCH] Coalesce some onWrite callbacks Summary: If there are multiple writes in the same loop, add up the bytes written and only deliver onWrite stats callback once. Reviewed By: mjoras Differential Revision: D56315021 fbshipit-source-id: b4a1005f9f9c61a3667d41febb32e836b85e3ea8 --- proxygen/lib/http/session/HQSession.cpp | 14 +++++++------- proxygen/lib/http/session/HTTPSession.cpp | 8 +++++++- .../lib/http/session/test/HQSessionTestCommon.h | 2 +- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/proxygen/lib/http/session/HQSession.cpp b/proxygen/lib/http/session/HQSession.cpp index 0c303e79a8..25b933091c 100644 --- a/proxygen/lib/http/session/HQSession.cpp +++ b/proxygen/lib/http/session/HQSession.cpp @@ -1066,12 +1066,19 @@ void HQSession::runLoopCallback() noexcept { // Then handle the writes // Write all the control streams first + auto maxToSendOrig = maxToSend_; maxToSend_ -= writeControlStreams(maxToSend_); // Then write the request streams if (!txnEgressQueue_.empty() && maxToSend_ > 0) { // TODO: we could send FIN only? maxToSend_ = writeRequestStreams(maxToSend_); } + auto sent = maxToSendOrig - maxToSend_; + if (sent > 0) { + if (infoCallback_) { + infoCallback_->onWrite(*this, sent); + } + } // Zero out maxToSend_ here. We won't egress anything else until the next // onWriteReady call maxToSend_ = 0; @@ -1769,10 +1776,6 @@ uint64_t HQSession::controlStreamWriteImpl(HQControlStream* ctrlStream, << __func__ << " after write sess=" << *this << ": streamID=" << ctrlStream->getEgressStreamId() << " sent=" << sendLen << " buflen=" << static_cast(ctrlStream->writeBuf_.chainLength()); - if (infoCallback_) { - infoCallback_->onWrite(*this, sendLen); - } - return sendLen; } @@ -2035,9 +2038,6 @@ uint64_t HQSession::requestStreamWriteImpl(HQStreamTransportBase* hqStream, << " buflen=" << hqStream->writeBufferSize() << " hasPendingBody=" << hqStream->txn_.hasPendingBody() << " EOM=" << hqStream->pendingEOM_; - if (infoCallback_) { - infoCallback_->onWrite(*this, sent); - } CHECK_GE(maxEgress, sent); bool flowControlBlocked = (sent == streamSendWindow && !sendEof); diff --git a/proxygen/lib/http/session/HTTPSession.cpp b/proxygen/lib/http/session/HTTPSession.cpp index f3d6b447ad..2b58fe2a34 100644 --- a/proxygen/lib/http/session/HTTPSession.cpp +++ b/proxygen/lib/http/session/HTTPSession.cpp @@ -2236,6 +2236,7 @@ void HTTPSession::runLoopCallback() noexcept { }); } + uint64_t bytesWritten = 0; for (uint32_t i = 0; i < kMaxWritesPerLoop; ++i) { bodyBytesPerWriteBuf_ = 0; bool cork = true; @@ -2281,8 +2282,13 @@ void HTTPSession::runLoopCallback() noexcept { // updateWriteBufSize called in scope guard break; } + bytesWritten += len; // writeChain can result in a writeError and trigger the shutdown code path } + if (infoCallback_ && bytesWritten > 0) { + infoCallback_->onWrite(*this, bytesWritten); + } + if (numActiveWrites_ == 0 && !writesShutdown() && hasMoreWrites() && (!connFlowControl_ || connFlowControl_->getAvailableSend())) { scheduleWrite(); @@ -2800,7 +2806,7 @@ void HTTPSession::writeSuccess() noexcept { writeTimeout_.cancelTimeout(); pendingWrite_.reset(); - if (infoCallback_) { + if (infoCallback_ && !inLoopCallback_) { infoCallback_->onWrite(*this, bytesWritten); } diff --git a/proxygen/lib/http/session/test/HQSessionTestCommon.h b/proxygen/lib/http/session/test/HQSessionTestCommon.h index 93658e7397..d7e82e9eee 100644 --- a/proxygen/lib/http/session/test/HQSessionTestCommon.h +++ b/proxygen/lib/http/session/test/HQSessionTestCommon.h @@ -169,7 +169,7 @@ class HQSessionTest auto dirModifier = (direction_ == proxygen::TransportDirection::DOWNSTREAM) ? 0 : 1; EXPECT_CALL(infoCb_, onWrite(testing::_, testing::_)) - .Times(testing::AtLeast(numCtrlStreams_)); + .Times(testing::AtLeast(1)); for (auto i = 0; i < numCtrlStreams_; i++) { folly::Optional expectedStreamID = i * 4 + 2 + dirModifier;