Skip to content

Commit

Permalink
Move all mvfst use-cases to the new Eventbase, Timer, and Socket inte…
Browse files Browse the repository at this point in the history
…rfaces

Summary:
This is the major transition that updates mvfst code to use the new interfaces. The new Folly implementations of the interfaces maintain all the existing behavior of folly types so this should not introduce any functional change. The core changes are:
- Update the BatchWriters to use the new interfaces.
- Update the FunctionLooper to use the new interfaces.
- Change QuicServerTransport to take the folly types and wrap them in the new types for use in the QuicTransportBase.

The rest of the diff is for updating all the existing uses of the QuicTrasnport to initialize the necessary types and pass them to the QUIC transport instead of directly passing folly types.

Reviewed By: mjoras

Differential Revision: D51413481

fbshipit-source-id: 5ed607e12b9a52b96148ad9b4f8f43899655d936
  • Loading branch information
jbeshay authored and facebook-github-bot committed Dec 14, 2023
1 parent 9c0e9b7 commit b9516dd
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 31 deletions.
8 changes: 6 additions & 2 deletions proxygen/httpserver/samples/hq/H1QDownstreamSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <proxygen/lib/http/session/HTTPDownstreamSession.h>
#include <quic/api/QuicSocket.h>
#include <quic/api/QuicStreamAsyncTransport.h>
#include <quic/common/events/FollyQuicEventBase.h>

namespace quic::samples {

Expand Down Expand Up @@ -54,8 +55,11 @@ class H1QDownstreamSession : public quic::QuicSocket::ConnectionCallback {
/*force1_1=*/false);
wangle::TransportInfo tinfo;
auto session = new proxygen::HTTPDownstreamSession(
proxygen::WheelTimerInstance(std::chrono::seconds(5),
sock_->getEventBase()),
proxygen::WheelTimerInstance(
std::chrono::seconds(5),
sock_->getEventBase()
->getTypedEventBase<FollyQuicEventBase>()
->getBackingEventBase()),
std::move(streamTransport),
sock_->getLocalAddress(),
sock_->getPeerAddress(),
Expand Down
8 changes: 6 additions & 2 deletions proxygen/httpserver/samples/hq/H1QUpstreamSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <proxygen/lib/http/session/HTTPUpstreamSession.h>
#include <quic/api/QuicSocket.h>
#include <quic/api/QuicStreamAsyncTransport.h>
#include <quic/common/events/FollyQuicEventBase.h>

namespace quic::samples {

Expand Down Expand Up @@ -49,8 +50,11 @@ class H1QUpstreamSession
codec->setReleaseEgressAfterRequest(true);
wangle::TransportInfo tinfo;
auto session = new proxygen::HTTPUpstreamSession(
proxygen::WheelTimerInstance(std::chrono::seconds(5),
sock_->getEventBase()),
proxygen::WheelTimerInstance(
std::chrono::seconds(5),
sock_->getEventBase()
->getTypedEventBase<FollyQuicEventBase>()
->getBackingEventBase()),
std::move(streamTransport),
sock_->getLocalAddress(),
sock_->getPeerAddress(),
Expand Down
12 changes: 7 additions & 5 deletions proxygen/httpserver/samples/hq/HQClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@
#include <proxygen/lib/utils/UtilInl.h>
#include <quic/api/QuicSocket.h>
#include <quic/client/QuicClientTransport.h>
#include <quic/common/udpsocket/FollyQuicAsyncUDPSocket.h>
#include <quic/congestion_control/CongestionControllerFactory.h>
#include <quic/fizz/client/handshake/FizzClientQuicHandshakeContext.h>
#include <quic/logging/FileQLogger.h>

namespace quic::samples {

HQClient::HQClient(const HQToolClientParams& params) : params_(params) {
HQClient::HQClient(const HQToolClientParams& params)
: params_(params), qEvb_(std::make_shared<FollyQuicEventBase>(&evb_)) {
if (params_.transportSettings.pacingEnabled) {
pacingTimer_ = TimerHighRes::newTimer(
pacingTimer_ = std::make_shared<HighResQuicTimer>(
&evb_, params_.transportSettings.pacingTimerResolution);
}
}
Expand All @@ -53,7 +55,7 @@ int HQClient::start() {
evb_.loopForever();
if (params_.migrateClient) {
quicClient_->onNetworkSwitch(
std::make_unique<quic::QuicAsyncUDPSocketWrapperImpl>(&evb_));
std::make_unique<FollyQuicAsyncUDPSocket>(qEvb_));
sendRequests(true, quicClient_->getNumOpenableBidirectionalStreams());
}
evb_.loop();
Expand Down Expand Up @@ -259,9 +261,9 @@ void HQClient::connectError(const quic::QuicError& error) {
}

void HQClient::initializeQuicClient() {
auto sock = std::make_unique<quic::QuicAsyncUDPSocketWrapperImpl>(&evb_);
auto sock = std::make_unique<FollyQuicAsyncUDPSocket>(qEvb_);
auto client = std::make_shared<quic::QuicClientTransport>(
&evb_,
qEvb_,
std::move(sock),
quic::FizzClientQuicHandshakeContext::Builder()
.setFizzClientContext(
Expand Down
5 changes: 3 additions & 2 deletions proxygen/httpserver/samples/hq/HQClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#include <proxygen/httpserver/samples/hq/H1QUpstreamSession.h>
#include <proxygen/httpserver/samples/hq/HQCommandLine.h>
#include <proxygen/lib/http/session/HQUpstreamSession.h>
#include <quic/common/Timers.h>
#include <quic/common/events/HighResQuicTimer.h>

namespace quic {

Expand Down Expand Up @@ -83,8 +83,9 @@ class HQClient : private quic::QuicSocket::ConnectionSetupCallback {

std::shared_ptr<quic::QuicClientTransport> quicClient_;

TimerHighRes::SharedPtr pacingTimer_;
QuicTimer::SharedPtr pacingTimer_;

std::shared_ptr<FollyQuicEventBase> qEvb_;
folly::EventBase evb_;

// H3
Expand Down
5 changes: 3 additions & 2 deletions proxygen/httpserver/samples/hq/HQServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <proxygen/httpserver/samples/hq/HQServer.h>

#include <ostream>
#include <quic/common/udpsocket/FollyQuicAsyncUDPSocket.h>
#include <string>

#include <folly/io/async/EventBaseLocal.h>
Expand Down Expand Up @@ -39,7 +40,7 @@ class HQServerTransportFactory : public quic::QuicServerTransportFactory {
// Creates new quic server transport
quic::QuicServerTransport::Ptr make(
folly::EventBase* evb,
std::unique_ptr<quic::QuicAsyncUDPSocketWrapper> socket,
std::unique_ptr<quic::FollyAsyncUDPSocketAlias> socket,
const folly::SocketAddress& /* peerAddr */,
quic::QuicVersion quicVersion,
std::shared_ptr<const fizz::server::FizzServerContext> ctx) noexcept
Expand Down Expand Up @@ -212,7 +213,7 @@ HQServerTransportFactory::HQServerTransportFactory(

QuicServerTransport::Ptr HQServerTransportFactory::make(
folly::EventBase* evb,
std::unique_ptr<quic::QuicAsyncUDPSocketWrapper> socket,
std::unique_ptr<quic::FollyAsyncUDPSocketAlias> socket,
const folly::SocketAddress& /* peerAddr */,
quic::QuicVersion,
std::shared_ptr<const FizzServerContext> ctx) noexcept {
Expand Down
12 changes: 7 additions & 5 deletions proxygen/lib/http/HQConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
#include <folly/io/async/AsyncSSLSocket.h>
#include <proxygen/lib/http/session/HQSession.h>
#include <quic/api/QuicSocket.h>
#include <quic/common/events/FollyQuicEventBase.h>
#include <quic/common/udpsocket/FollyQuicAsyncUDPSocket.h>
#include <quic/congestion_control/CongestionControllerFactory.h>
#include <quic/fizz/client/handshake/FizzClientQuicHandshakeContext.h>

using namespace folly;
using namespace std;
using namespace fizz::client;

Expand Down Expand Up @@ -47,23 +48,24 @@ void HQConnector::setQuicPskCache(
}

void HQConnector::connect(
EventBase* eventBase,
folly::EventBase* eventBase,
folly::Optional<folly::SocketAddress> localAddr,
const folly::SocketAddress& connectAddr,
std::shared_ptr<const FizzClientContext> fizzContext,
std::shared_ptr<const fizz::CertificateVerifier> verifier,
std::chrono::milliseconds connectTimeout,
const SocketOptionMap& socketOptions,
const folly::SocketOptionMap& socketOptions,
folly::Optional<std::string> sni,
std::shared_ptr<quic::QLogger> qLogger,
std::shared_ptr<quic::LoopDetectorCallback> quicLoopDetectorCallback,
std::shared_ptr<quic::QuicTransportStatsCallback>
quicTransportStatsCallback) {

DCHECK(!isBusy());
auto sock = std::make_unique<quic::QuicAsyncUDPSocketWrapperImpl>(eventBase);
auto qEvb = std::make_shared<quic::FollyQuicEventBase>(eventBase);
auto sock = std::make_unique<quic::FollyQuicAsyncUDPSocket>(qEvb);
auto quicClient = quic::QuicClientTransport::newClient(
eventBase,
std::move(qEvb),
std::move(sock),
quic::FizzClientQuicHandshakeContext::Builder()
.setFizzClientContext(fizzContext)
Expand Down
1 change: 1 addition & 0 deletions proxygen/lib/http/HQConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <quic/api/LoopDetectorCallback.h>
#include <quic/api/QuicSocket.h>
#include <quic/client/QuicClientTransport.h>
#include <quic/common/events/FollyQuicEventBase.h>
#include <quic/fizz/client/handshake/QuicPskCache.h>
#include <quic/logging/QLogger.h>

Expand Down
9 changes: 4 additions & 5 deletions proxygen/lib/http/session/HQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1097,10 +1097,9 @@ void HQSession::scheduleWrite() {
}

void HQSession::scheduleLoopCallback(bool thisIteration) {
if (!isLoopCallbackScheduled()) {
auto evb = getEventBase();
if (evb) {
evb->runInLoop(this, thisIteration);
if (sock_ && sock_->getEventBase()) {
if (!isLoopCallbackScheduled()) {
getEventBase()->runInLoop(this, thisIteration);
}
}
}
Expand Down Expand Up @@ -1617,7 +1616,7 @@ void HQSession::onPushPriority(hq::PushId pushId, const HTTPPriority& pri) {
void HQSession::notifyEgressBodyBuffered(int64_t bytes) {
if (HTTPSessionBase::notifyEgressBodyBuffered(bytes, true) &&
!inLoopCallback_ && !isLoopCallbackScheduled() && sock_) {
sock_->getEventBase()->runInLoop(this);
getEventBase()->runInLoop(this);
}
}

Expand Down
7 changes: 5 additions & 2 deletions proxygen/lib/http/session/HQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <proxygen/lib/utils/ConditionalGate.h>
#include <quic/api/QuicSocket.h>
#include <quic/common/BufUtil.h>
#include <quic/common/events/FollyQuicEventBase.h>

namespace proxygen {

Expand Down Expand Up @@ -256,8 +257,10 @@ class HQSession
}

folly::EventBase* getEventBase() const override {
if (sock_) {
return sock_->getEventBase();
if (sock_ && sock_->getEventBase()) {
return sock_->getEventBase()
->getTypedEventBase<quic::FollyQuicEventBase>()
->getBackingEventBase();
}
return nullptr;
}
Expand Down
2 changes: 2 additions & 0 deletions proxygen/lib/http/session/HQStreamDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <proxygen/lib/http/codec/HQFramer.h>
#include <proxygen/lib/http/codec/HQUnidirectionalCodec.h>

#include <folly/io/async/EventBase.h>

namespace proxygen {

// Base class for the unidirectional stream callbacks
Expand Down
6 changes: 4 additions & 2 deletions proxygen/lib/http/session/HQUpstreamSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

#include <proxygen/lib/http/session/HQUpstreamSession.h>
#include <quic/common/events/FollyQuicEventBase.h>
#include <wangle/acceptor/ConnectionManager.h>

namespace proxygen {
Expand Down Expand Up @@ -130,12 +131,13 @@ void HQUpstreamSession::attachThreadLocals(
txnEgressQueue_.attachThreadLocals(timeout);
setController(controller);
setSessionStats(stats);
auto qEvbWrapper = std::make_shared<quic::FollyQuicEventBase>(eventBase);
if (sock_) {
sock_->attachEventBase(eventBase);
sock_->attachEventBase(std::move(qEvbWrapper));
}
codec_.foreach (fn);
setHeaderCodecStats(headerCodecStats);
sock_->getEventBase()->runInLoop(this);
getEventBase()->runInLoop(this);
// The caller MUST re-add the connection to a new connection manager.
}

Expand Down
1 change: 1 addition & 0 deletions proxygen/lib/http/session/HQUpstreamSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <proxygen/lib/http/session/HQSession.h>

#include <folly/io/async/HHWheelTimer.h>
#include <quic/common/events/FollyQuicEventBase.h>

namespace proxygen {

Expand Down
6 changes: 4 additions & 2 deletions proxygen/lib/http/session/test/MockQuicSocketDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
#pragma once

#include <folly/Format.h>
#include <folly/io/async/EventBase.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <proxygen/lib/transport/test/MockAsyncTransportCertificate.h>
#include <quic/api/test/MockQuicSocket.h>
#include <quic/common/events/FollyQuicEventBase.h>
#include <unordered_map>

namespace quic {
Expand Down Expand Up @@ -115,6 +115,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
TransportEnum transportType,
std::string alpn = "h3")
: eventBase_(eventBase),
quicEventBase_(std::make_shared<quic::FollyQuicEventBase>(eventBase_)),
transportType_(transportType),
sock_(std::make_shared<MockQuicSocket>(eventBase, connSetupCb, connCb)),
alpn_(alpn) {
Expand Down Expand Up @@ -176,7 +177,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {
.WillRepeatedly(testing::ReturnPointee(&sockGood_));

EXPECT_CALL(*sock_, getEventBase())
.WillRepeatedly(testing::ReturnPointee(&eventBase_));
.WillRepeatedly(testing::Return(quicEventBase_));

EXPECT_CALL(*sock_, setControlStream(testing::_))
.WillRepeatedly(testing::Invoke(
Expand Down Expand Up @@ -1753,6 +1754,7 @@ class MockQuicSocketDriver : public folly::EventBase::LoopCallback {

bool strictErrorCheck_{true};
folly::EventBase* eventBase_;
std::shared_ptr<quic::FollyQuicEventBase> quicEventBase_;
TransportSettings transportSettings_;
uint64_t bufferAvailable_{std::numeric_limits<uint64_t>::max()};
// keeping this ordered for better debugging
Expand Down
6 changes: 4 additions & 2 deletions proxygen/lib/transport/H3DatagramAsyncSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <proxygen/lib/transport/H3DatagramAsyncSocket.h>

#include <folly/FileUtil.h>
#include <quic/common/udpsocket/FollyQuicAsyncUDPSocket.h>
#include <quic/fizz/client/handshake/FizzClientQuicHandshakeContext.h>
#include <utility>
#include <wangle/acceptor/TransportInfo.h>
Expand Down Expand Up @@ -192,14 +193,15 @@ void H3DatagramAsyncSocket::startClient() {
transportSettings.datagramConfig.readBufSize = rcvBufPkts_;
}
if (!upstreamSession_) {
auto sock = std::make_unique<quic::QuicAsyncUDPSocketWrapperImpl>(evb_);
auto qEvb = std::make_shared<quic::FollyQuicEventBase>(evb_);
auto sock = std::make_unique<quic::FollyQuicAsyncUDPSocket>(qEvb);
auto fizzClientContext =
quic::FizzClientQuicHandshakeContext::Builder()
.setFizzClientContext(createFizzClientContext())
.setCertificateVerifier(options_.certVerifier_)
.build();
auto client = std::make_shared<quic::QuicClientTransport>(
evb_, std::move(sock), fizzClientContext);
qEvb, std::move(sock), fizzClientContext);
CHECK(connectAddress_.isInitialized());
client->addNewPeerAddress(connectAddress_);
if (bindAddress_.isInitialized()) {
Expand Down

0 comments on commit b9516dd

Please sign in to comment.