From b9516ddbb70bd4f73ea4c673237659acf10764f7 Mon Sep 17 00:00:00 2001 From: Joseph Beshay Date: Thu, 14 Dec 2023 00:24:12 -0800 Subject: [PATCH] Move all mvfst use-cases to the new Eventbase, Timer, and Socket interfaces Summary: This is the major transition that updates mvfst code to use the new interfaces. The new Folly implementations of the interfaces maintain all the existing behavior of folly types so this should not introduce any functional change. The core changes are: - Update the BatchWriters to use the new interfaces. - Update the FunctionLooper to use the new interfaces. - Change QuicServerTransport to take the folly types and wrap them in the new types for use in the QuicTransportBase. The rest of the diff is for updating all the existing uses of the QuicTrasnport to initialize the necessary types and pass them to the QUIC transport instead of directly passing folly types. Reviewed By: mjoras Differential Revision: D51413481 fbshipit-source-id: 5ed607e12b9a52b96148ad9b4f8f43899655d936 --- .../httpserver/samples/hq/H1QDownstreamSession.h | 8 ++++++-- proxygen/httpserver/samples/hq/H1QUpstreamSession.h | 8 ++++++-- proxygen/httpserver/samples/hq/HQClient.cpp | 12 +++++++----- proxygen/httpserver/samples/hq/HQClient.h | 5 +++-- proxygen/httpserver/samples/hq/HQServer.cpp | 5 +++-- proxygen/lib/http/HQConnector.cpp | 12 +++++++----- proxygen/lib/http/HQConnector.h | 1 + proxygen/lib/http/session/HQSession.cpp | 9 ++++----- proxygen/lib/http/session/HQSession.h | 7 +++++-- proxygen/lib/http/session/HQStreamDispatcher.h | 2 ++ proxygen/lib/http/session/HQUpstreamSession.cpp | 6 ++++-- proxygen/lib/http/session/HQUpstreamSession.h | 1 + .../lib/http/session/test/MockQuicSocketDriver.h | 6 ++++-- proxygen/lib/transport/H3DatagramAsyncSocket.cpp | 6 ++++-- 14 files changed, 57 insertions(+), 31 deletions(-) diff --git a/proxygen/httpserver/samples/hq/H1QDownstreamSession.h b/proxygen/httpserver/samples/hq/H1QDownstreamSession.h index 9fd01bcc98..d1a8763d93 100644 --- a/proxygen/httpserver/samples/hq/H1QDownstreamSession.h +++ b/proxygen/httpserver/samples/hq/H1QDownstreamSession.h @@ -12,6 +12,7 @@ #include #include #include +#include namespace quic::samples { @@ -54,8 +55,11 @@ class H1QDownstreamSession : public quic::QuicSocket::ConnectionCallback { /*force1_1=*/false); wangle::TransportInfo tinfo; auto session = new proxygen::HTTPDownstreamSession( - proxygen::WheelTimerInstance(std::chrono::seconds(5), - sock_->getEventBase()), + proxygen::WheelTimerInstance( + std::chrono::seconds(5), + sock_->getEventBase() + ->getTypedEventBase() + ->getBackingEventBase()), std::move(streamTransport), sock_->getLocalAddress(), sock_->getPeerAddress(), diff --git a/proxygen/httpserver/samples/hq/H1QUpstreamSession.h b/proxygen/httpserver/samples/hq/H1QUpstreamSession.h index b8a114f9cd..0c3470fffa 100644 --- a/proxygen/httpserver/samples/hq/H1QUpstreamSession.h +++ b/proxygen/httpserver/samples/hq/H1QUpstreamSession.h @@ -13,6 +13,7 @@ #include #include #include +#include namespace quic::samples { @@ -49,8 +50,11 @@ class H1QUpstreamSession codec->setReleaseEgressAfterRequest(true); wangle::TransportInfo tinfo; auto session = new proxygen::HTTPUpstreamSession( - proxygen::WheelTimerInstance(std::chrono::seconds(5), - sock_->getEventBase()), + proxygen::WheelTimerInstance( + std::chrono::seconds(5), + sock_->getEventBase() + ->getTypedEventBase() + ->getBackingEventBase()), std::move(streamTransport), sock_->getLocalAddress(), sock_->getPeerAddress(), diff --git a/proxygen/httpserver/samples/hq/HQClient.cpp b/proxygen/httpserver/samples/hq/HQClient.cpp index 02c7fdcc5c..91ad7b54c5 100644 --- a/proxygen/httpserver/samples/hq/HQClient.cpp +++ b/proxygen/httpserver/samples/hq/HQClient.cpp @@ -26,15 +26,17 @@ #include #include #include +#include #include #include #include namespace quic::samples { -HQClient::HQClient(const HQToolClientParams& params) : params_(params) { +HQClient::HQClient(const HQToolClientParams& params) + : params_(params), qEvb_(std::make_shared(&evb_)) { if (params_.transportSettings.pacingEnabled) { - pacingTimer_ = TimerHighRes::newTimer( + pacingTimer_ = std::make_shared( &evb_, params_.transportSettings.pacingTimerResolution); } } @@ -53,7 +55,7 @@ int HQClient::start() { evb_.loopForever(); if (params_.migrateClient) { quicClient_->onNetworkSwitch( - std::make_unique(&evb_)); + std::make_unique(qEvb_)); sendRequests(true, quicClient_->getNumOpenableBidirectionalStreams()); } evb_.loop(); @@ -259,9 +261,9 @@ void HQClient::connectError(const quic::QuicError& error) { } void HQClient::initializeQuicClient() { - auto sock = std::make_unique(&evb_); + auto sock = std::make_unique(qEvb_); auto client = std::make_shared( - &evb_, + qEvb_, std::move(sock), quic::FizzClientQuicHandshakeContext::Builder() .setFizzClientContext( diff --git a/proxygen/httpserver/samples/hq/HQClient.h b/proxygen/httpserver/samples/hq/HQClient.h index 2dd21906e5..fa911dd278 100644 --- a/proxygen/httpserver/samples/hq/HQClient.h +++ b/proxygen/httpserver/samples/hq/HQClient.h @@ -14,7 +14,7 @@ #include #include #include -#include +#include namespace quic { @@ -83,8 +83,9 @@ class HQClient : private quic::QuicSocket::ConnectionSetupCallback { std::shared_ptr quicClient_; - TimerHighRes::SharedPtr pacingTimer_; + QuicTimer::SharedPtr pacingTimer_; + std::shared_ptr qEvb_; folly::EventBase evb_; // H3 diff --git a/proxygen/httpserver/samples/hq/HQServer.cpp b/proxygen/httpserver/samples/hq/HQServer.cpp index 71f11808ae..c80191f414 100644 --- a/proxygen/httpserver/samples/hq/HQServer.cpp +++ b/proxygen/httpserver/samples/hq/HQServer.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -39,7 +40,7 @@ class HQServerTransportFactory : public quic::QuicServerTransportFactory { // Creates new quic server transport quic::QuicServerTransport::Ptr make( folly::EventBase* evb, - std::unique_ptr socket, + std::unique_ptr socket, const folly::SocketAddress& /* peerAddr */, quic::QuicVersion quicVersion, std::shared_ptr ctx) noexcept @@ -212,7 +213,7 @@ HQServerTransportFactory::HQServerTransportFactory( QuicServerTransport::Ptr HQServerTransportFactory::make( folly::EventBase* evb, - std::unique_ptr socket, + std::unique_ptr socket, const folly::SocketAddress& /* peerAddr */, quic::QuicVersion, std::shared_ptr ctx) noexcept { diff --git a/proxygen/lib/http/HQConnector.cpp b/proxygen/lib/http/HQConnector.cpp index 565035f3f0..486e5e3ece 100644 --- a/proxygen/lib/http/HQConnector.cpp +++ b/proxygen/lib/http/HQConnector.cpp @@ -12,10 +12,11 @@ #include #include #include +#include +#include #include #include -using namespace folly; using namespace std; using namespace fizz::client; @@ -47,13 +48,13 @@ void HQConnector::setQuicPskCache( } void HQConnector::connect( - EventBase* eventBase, + folly::EventBase* eventBase, folly::Optional localAddr, const folly::SocketAddress& connectAddr, std::shared_ptr fizzContext, std::shared_ptr verifier, std::chrono::milliseconds connectTimeout, - const SocketOptionMap& socketOptions, + const folly::SocketOptionMap& socketOptions, folly::Optional sni, std::shared_ptr qLogger, std::shared_ptr quicLoopDetectorCallback, @@ -61,9 +62,10 @@ void HQConnector::connect( quicTransportStatsCallback) { DCHECK(!isBusy()); - auto sock = std::make_unique(eventBase); + auto qEvb = std::make_shared(eventBase); + auto sock = std::make_unique(qEvb); auto quicClient = quic::QuicClientTransport::newClient( - eventBase, + std::move(qEvb), std::move(sock), quic::FizzClientQuicHandshakeContext::Builder() .setFizzClientContext(fizzContext) diff --git a/proxygen/lib/http/HQConnector.h b/proxygen/lib/http/HQConnector.h index 331e4d9016..c45102dc5f 100644 --- a/proxygen/lib/http/HQConnector.h +++ b/proxygen/lib/http/HQConnector.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include diff --git a/proxygen/lib/http/session/HQSession.cpp b/proxygen/lib/http/session/HQSession.cpp index 2a0b0be510..1b0f7a3c80 100644 --- a/proxygen/lib/http/session/HQSession.cpp +++ b/proxygen/lib/http/session/HQSession.cpp @@ -1097,10 +1097,9 @@ void HQSession::scheduleWrite() { } void HQSession::scheduleLoopCallback(bool thisIteration) { - if (!isLoopCallbackScheduled()) { - auto evb = getEventBase(); - if (evb) { - evb->runInLoop(this, thisIteration); + if (sock_ && sock_->getEventBase()) { + if (!isLoopCallbackScheduled()) { + getEventBase()->runInLoop(this, thisIteration); } } } @@ -1617,7 +1616,7 @@ void HQSession::onPushPriority(hq::PushId pushId, const HTTPPriority& pri) { void HQSession::notifyEgressBodyBuffered(int64_t bytes) { if (HTTPSessionBase::notifyEgressBodyBuffered(bytes, true) && !inLoopCallback_ && !isLoopCallbackScheduled() && sock_) { - sock_->getEventBase()->runInLoop(this); + getEventBase()->runInLoop(this); } } diff --git a/proxygen/lib/http/session/HQSession.h b/proxygen/lib/http/session/HQSession.h index 576134a6e3..16ebe4a939 100644 --- a/proxygen/lib/http/session/HQSession.h +++ b/proxygen/lib/http/session/HQSession.h @@ -34,6 +34,7 @@ #include #include #include +#include namespace proxygen { @@ -256,8 +257,10 @@ class HQSession } folly::EventBase* getEventBase() const override { - if (sock_) { - return sock_->getEventBase(); + if (sock_ && sock_->getEventBase()) { + return sock_->getEventBase() + ->getTypedEventBase() + ->getBackingEventBase(); } return nullptr; } diff --git a/proxygen/lib/http/session/HQStreamDispatcher.h b/proxygen/lib/http/session/HQStreamDispatcher.h index 5a928c95c7..80333914d2 100644 --- a/proxygen/lib/http/session/HQStreamDispatcher.h +++ b/proxygen/lib/http/session/HQStreamDispatcher.h @@ -13,6 +13,8 @@ #include #include +#include + namespace proxygen { // Base class for the unidirectional stream callbacks diff --git a/proxygen/lib/http/session/HQUpstreamSession.cpp b/proxygen/lib/http/session/HQUpstreamSession.cpp index 61275d9d25..430d121afb 100644 --- a/proxygen/lib/http/session/HQUpstreamSession.cpp +++ b/proxygen/lib/http/session/HQUpstreamSession.cpp @@ -7,6 +7,7 @@ */ #include +#include #include namespace proxygen { @@ -130,12 +131,13 @@ void HQUpstreamSession::attachThreadLocals( txnEgressQueue_.attachThreadLocals(timeout); setController(controller); setSessionStats(stats); + auto qEvbWrapper = std::make_shared(eventBase); if (sock_) { - sock_->attachEventBase(eventBase); + sock_->attachEventBase(std::move(qEvbWrapper)); } codec_.foreach (fn); setHeaderCodecStats(headerCodecStats); - sock_->getEventBase()->runInLoop(this); + getEventBase()->runInLoop(this); // The caller MUST re-add the connection to a new connection manager. } diff --git a/proxygen/lib/http/session/HQUpstreamSession.h b/proxygen/lib/http/session/HQUpstreamSession.h index 233fb11016..aba27253cc 100644 --- a/proxygen/lib/http/session/HQUpstreamSession.h +++ b/proxygen/lib/http/session/HQUpstreamSession.h @@ -10,6 +10,7 @@ #include #include +#include namespace proxygen { diff --git a/proxygen/lib/http/session/test/MockQuicSocketDriver.h b/proxygen/lib/http/session/test/MockQuicSocketDriver.h index e9acc41e2d..1e811e9ed3 100644 --- a/proxygen/lib/http/session/test/MockQuicSocketDriver.h +++ b/proxygen/lib/http/session/test/MockQuicSocketDriver.h @@ -9,11 +9,11 @@ #pragma once #include -#include #include #include #include #include +#include #include namespace quic { @@ -115,6 +115,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { TransportEnum transportType, std::string alpn = "h3") : eventBase_(eventBase), + quicEventBase_(std::make_shared(eventBase_)), transportType_(transportType), sock_(std::make_shared(eventBase, connSetupCb, connCb)), alpn_(alpn) { @@ -176,7 +177,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { .WillRepeatedly(testing::ReturnPointee(&sockGood_)); EXPECT_CALL(*sock_, getEventBase()) - .WillRepeatedly(testing::ReturnPointee(&eventBase_)); + .WillRepeatedly(testing::Return(quicEventBase_)); EXPECT_CALL(*sock_, setControlStream(testing::_)) .WillRepeatedly(testing::Invoke( @@ -1753,6 +1754,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback { bool strictErrorCheck_{true}; folly::EventBase* eventBase_; + std::shared_ptr quicEventBase_; TransportSettings transportSettings_; uint64_t bufferAvailable_{std::numeric_limits::max()}; // keeping this ordered for better debugging diff --git a/proxygen/lib/transport/H3DatagramAsyncSocket.cpp b/proxygen/lib/transport/H3DatagramAsyncSocket.cpp index 5a4da22094..af6354a93e 100644 --- a/proxygen/lib/transport/H3DatagramAsyncSocket.cpp +++ b/proxygen/lib/transport/H3DatagramAsyncSocket.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -192,14 +193,15 @@ void H3DatagramAsyncSocket::startClient() { transportSettings.datagramConfig.readBufSize = rcvBufPkts_; } if (!upstreamSession_) { - auto sock = std::make_unique(evb_); + auto qEvb = std::make_shared(evb_); + auto sock = std::make_unique(qEvb); auto fizzClientContext = quic::FizzClientQuicHandshakeContext::Builder() .setFizzClientContext(createFizzClientContext()) .setCertificateVerifier(options_.certVerifier_) .build(); auto client = std::make_shared( - evb_, std::move(sock), fizzClientContext); + qEvb, std::move(sock), fizzClientContext); CHECK(connectAddress_.isInitialized()); client->addNewPeerAddress(connectAddress_); if (bindAddress_.isInitialized()) {