Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply buckets optimization #4634

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/bucket/BucketIndexUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ getPageSizeFromConfig(Config const& cfg)
template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx)
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher)
{
BUCKET_TYPE_ASSERT(BucketT);

Expand All @@ -41,7 +42,7 @@ createIndex(BucketManager& bm, std::filesystem::path const& filename,
try
{
return std::unique_ptr<typename BucketT::IndexT const>(
new typename BucketT::IndexT(bm, filename, hash, ctx));
new typename BucketT::IndexT(bm, filename, hash, ctx, hasher));
}
// BucketIndex throws if BucketManager shuts down before index finishes,
// so return empty index instead of partial index
Expand Down Expand Up @@ -88,11 +89,12 @@ loadIndex(BucketManager const& bm, std::filesystem::path const& filename,
template std::unique_ptr<typename LiveBucket::IndexT const>
createIndex<LiveBucket>(BucketManager& bm,
std::filesystem::path const& filename, Hash const& hash,
asio::io_context& ctx);
asio::io_context& ctx, std::optional<SHA256>& hasher);
template std::unique_ptr<typename HotArchiveBucket::IndexT const>
createIndex<HotArchiveBucket>(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

template std::unique_ptr<typename LiveBucket::IndexT const>
loadIndex<LiveBucket>(BucketManager const& bm,
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketIndexUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "crypto/SHA.h"
#include "util/GlobalChecks.h"
#include "util/XDROperators.h" // IWYU pragma: keep
#include "xdr/Stellar-ledger-entries.h"
Expand Down Expand Up @@ -100,7 +101,8 @@ std::streamoff getPageSizeFromConfig(Config const& cfg);
template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

// Loads index from given file. If file does not exist or if saved
// index does not have expected version or pageSize, return null
Expand Down
12 changes: 11 additions & 1 deletion src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,13 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
releaseAssert(threadIsMain());
std::set<Hash> hashes = getAllReferencedBuckets(has);
std::vector<std::shared_ptr<BasicWork>> seq;

// Persist a map of indexes so we don't have dangling references in
// VerifyBucketsWork. We don't actually need to use the indexes created by
// VerifyBucketsWork here, so a throwaway static map is fine.
static std::map<int, std::unique_ptr<LiveBucketIndex const>> indexMap;

int i = 0;
Comment on lines +1596 to +1602
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused about this. It looks like VerifyBucketWork stores a reference to a unique pointer and this map exists to ensure that by the time that task runs, the pointee hasn't been deallocated due to this function going out of scope. Why not have VerifyBucketWork take ownership of the pointer completely and get rid of this map?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this seems sketchy

for (auto const& h : hashes)
{
if (isZero(h))
Expand All @@ -1608,8 +1615,11 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
throw std::runtime_error(fmt::format(
FMT_STRING("Missing referenced bucket {}"), binToHex(h)));
}

auto [indexIter, _] = indexMap.emplace(i++, nullptr);
seq.emplace_back(std::make_shared<VerifyBucketWork>(
mApp, b->getFilename().string(), b->getHash(), nullptr));
mApp, b->getFilename().string(), b->getHash(), indexIter->second,
nullptr));
}
return mApp.getWorkScheduler().scheduleWork<WorkSequence>(
"verify-referenced-buckets", seq);
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ BucketOutputIterator<BucketT>::getBucket(BucketManager& bucketManager,
if (auto b = bucketManager.getBucketIfExists<BucketT>(hash);
!b || !b->isIndexed())
{
index = createIndex<BucketT>(bucketManager, mFilename, hash, mCtx);
std::optional<SHA256> empty{};
index =
createIndex<BucketT>(bucketManager, mFilename, hash, mCtx, empty);
}

return bucketManager.adoptFileAsBucket<BucketT>(mFilename.string(), hash,
Expand Down
6 changes: 4 additions & 2 deletions src/bucket/DiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ template <class BucketT>
DiskIndex<BucketT>::DiskIndex(BucketManager& bm,
std::filesystem::path const& filename,
std::streamoff pageSize, Hash const& hash,
asio::io_context& ctx)
asio::io_context& ctx,
std::optional<SHA256>& hasher)
: mBloomLookupMeter(bm.getBloomLookupMeter<BucketT>())
, mBloomMissMeter(bm.getBloomMissMeter<BucketT>())
{
Expand All @@ -147,8 +148,9 @@ DiskIndex<BucketT>::DiskIndex(BucketManager& bm,

std::vector<uint64_t> keyHashes;
auto seed = shortHash::getShortHashInitKey();
SHA256* hasherPtr = hasher.has_value() ? &hasher.value() : nullptr;

while (in && in.readOne(be))
while (in && in.readOne(be, hasherPtr))
{
// peridocially check if bucket manager is exiting to stop indexing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// peridocially check if bucket manager is exiting to stop indexing
// periodically check if bucket manager is exiting to stop indexing

// gracefully
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/DiskIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class io_context;
namespace stellar
{
class BucketManager;
class SHA256;

// maps smallest and largest LedgerKey on a given page inclusively
// [lowerBound, upperbound]
Expand Down Expand Up @@ -118,7 +119,8 @@ template <class BucketT> class DiskIndex : public NonMovableOrCopyable

// Constructor for creating a fresh index.
DiskIndex(BucketManager& bm, std::filesystem::path const& filename,
std::streamoff pageSize, Hash const& hash, asio::io_context& ctx);
std::streamoff pageSize, Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

// Constructor for loading pre-existing index from disk. Must call preLoad
// before calling this constructor to properly deserialize index.
Expand Down
5 changes: 3 additions & 2 deletions src/bucket/HotArchiveBucketIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ namespace stellar

HotArchiveBucketIndex::HotArchiveBucketIndex(
BucketManager& bm, std::filesystem::path const& filename, Hash const& hash,
asio::io_context& ctx)
: mDiskIndex(bm, filename, getPageSize(bm.getConfig(), 0), hash, ctx)
asio::io_context& ctx, std::optional<SHA256>& hasher)
: mDiskIndex(bm, filename, getPageSize(bm.getConfig(), 0), hash, ctx,
hasher)
{
ZoneScoped;
releaseAssert(!filename.empty());
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/HotArchiveBucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class HotArchiveBucketIndex : public NonMovableOrCopyable

HotArchiveBucketIndex(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

template <class Archive>
HotArchiveBucketIndex(BucketManager const& bm, Archive& ar,
Expand Down
6 changes: 4 additions & 2 deletions src/bucket/InMemoryIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ InMemoryBucketState::scan(IterT start, LedgerKey const& searchKey) const
}

InMemoryIndex::InMemoryIndex(BucketManager const& bm,
std::filesystem::path const& filename)
std::filesystem::path const& filename,
std::optional<SHA256>& hasher)
{
XDRInputFileStream in;
in.open(filename.string());
Expand All @@ -59,8 +60,9 @@ InMemoryIndex::InMemoryIndex(BucketManager const& bm,
std::streamoff lastOffset = 0;
std::optional<std::streamoff> firstOffer;
std::optional<std::streamoff> lastOffer;
SHA256* hasherPtr = hasher.has_value() ? &hasher.value() : nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not pass the pointer all around? Pointer is still the best we have now for optional<reference> intended here.


while (in && in.readOne(be))
while (in && in.readOne(be, hasherPtr))
{
if (++iter >= 1000)
{
Expand Down
5 changes: 4 additions & 1 deletion src/bucket/InMemoryIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
namespace stellar
{

class SHA256;

// For small Buckets, we can cache all contents in memory. Because we cache all
// entries, the index is just as large as the Bucket itself, so we never persist
// this index type. It is always recreated on startup.
Expand Down Expand Up @@ -66,7 +68,8 @@ class InMemoryIndex
using IterT = InMemoryBucketState::IterT;

InMemoryIndex(BucketManager const& bm,
std::filesystem::path const& filename);
std::filesystem::path const& filename,
std::optional<SHA256>& hasher);

IterT
begin() const
Expand Down
7 changes: 4 additions & 3 deletions src/bucket/LiveBucketIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ LiveBucketIndex::getPageSize(Config const& cfg, size_t bucketSize)

LiveBucketIndex::LiveBucketIndex(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx)
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher)
: mCacheHitMeter(bm.getCacheHitMeter())
, mCacheMissMeter(bm.getCacheMissMeter())
{
Expand All @@ -54,7 +55,7 @@ LiveBucketIndex::LiveBucketIndex(BucketManager& bm,
"LiveBucketIndex::createIndex() using in-memory index for "
"bucket {}",
filename);
mInMemoryIndex = std::make_unique<InMemoryIndex>(bm, filename);
mInMemoryIndex = std::make_unique<InMemoryIndex>(bm, filename, hasher);
}
else
{
Expand All @@ -63,7 +64,7 @@ LiveBucketIndex::LiveBucketIndex(BucketManager& bm,
"page size {} in bucket {}",
pageSize, filename);
mDiskIndex = std::make_unique<DiskIndex<LiveBucket>>(
bm, filename, pageSize, hash, ctx);
bm, filename, pageSize, hash, ctx, hasher);

auto percentCached = bm.getConfig().BUCKETLIST_DB_CACHED_PERCENT;
if (percentCached > 0)
Expand Down
5 changes: 3 additions & 2 deletions src/bucket/LiveBucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ namespace stellar
*/

class BucketManager;

class SHA256;
class LiveBucketIndex : public NonMovableOrCopyable
{
public:
Expand Down Expand Up @@ -98,7 +98,8 @@ class LiveBucketIndex : public NonMovableOrCopyable

// Constructor for creating new index from Bucketfile
LiveBucketIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

// Constructor for loading pre-existing index from disk
template <class Archive>
Expand Down
8 changes: 5 additions & 3 deletions src/catchup/IndexBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "bucket/BucketManager.h"
#include "bucket/DiskIndex.h"
#include "bucket/LiveBucket.h"
#include "crypto/SHA.h"
#include "util/Fs.h"
#include "util/Logging.h"
#include "util/UnorderedSet.h"
Expand Down Expand Up @@ -86,9 +87,10 @@ IndexBucketsWork::IndexWork::postWork()
if (!self->mIndex)
{
// TODO: Fix this when archive BucketLists assume state
self->mIndex =
createIndex<LiveBucket>(bm, self->mBucket->getFilename(),
self->mBucket->getHash(), ctx);
std::optional<SHA256> empty{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass std::nullopt directly?

self->mIndex = createIndex<LiveBucket>(
bm, self->mBucket->getFilename(), self->mBucket->getHash(),
ctx, empty);
}

app.postOnMainThread(
Expand Down
20 changes: 17 additions & 3 deletions src/historywork/DownloadBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,35 @@ DownloadBucketsWork::yieldMoreWork()
};
std::weak_ptr<DownloadBucketsWork> weak(
std::static_pointer_cast<DownloadBucketsWork>(shared_from_this()));
auto successCb = [weak, ft, hash](Application& app) -> bool {

auto currId = mIndexId++;
auto [indexIter, inserted] = mIndexMap.emplace(currId, nullptr);
releaseAssertOrThrow(inserted);

auto successCb = [weak, ft, hash, currId](Application& app) -> bool {
auto self = weak.lock();
if (self)
{
// To avoid dangling references, maintain a map of index pointers
// and do a lookup inside the callback instead of capturing anything
// by reference.
auto indexIter = self->mIndexMap.find(currId);
releaseAssertOrThrow(indexIter != self->mIndexMap.end());
releaseAssertOrThrow(indexIter->second);

auto bucketPath = ft.localPath_nogz();
auto b = app.getBucketManager().adoptFileAsBucket<LiveBucket>(
bucketPath, hexToBin256(hash),
/*mergeKey=*/nullptr,
/*index=*/nullptr);
/*index=*/std::move(indexIter->second));
self->mBuckets[hash] = b;
self->mIndexMap.erase(currId);
}
return true;
};
auto w2 = std::make_shared<VerifyBucketWork>(mApp, ft.localPath_nogz(),
hexToBin256(hash), failureCb);
hexToBin256(hash),
indexIter->second, failureCb);
auto w3 = std::make_shared<WorkWithCallback>(mApp, "adopt-verified-bucket",
successCb);
std::vector<std::shared_ptr<BasicWork>> seq{w1, w2, w3};
Expand Down
4 changes: 4 additions & 0 deletions src/historywork/DownloadBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class DownloadBucketsWork : public BatchWork
TmpDir const& mDownloadDir;
std::shared_ptr<HistoryArchive> mArchive;

// Store indexes of downloaded buckets
std::map<int, std::unique_ptr<LiveBucketIndex const>> mIndexMap;
int mIndexId{0};

public:
DownloadBucketsWork(
Application& app,
Expand Down
3 changes: 3 additions & 0 deletions src/historywork/GetRemoteFileWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ GetRemoteFileWork::getCommand()
{
mCurrentArchive = mApp.getHistoryArchiveManager()
.selectRandomReadableHistoryArchive();
CLOG_INFO(History, "Selected archive {} to download {}",
mCurrentArchive->getName(),
std::filesystem::path(mRemote).filename().string());
}
releaseAssert(mCurrentArchive);
releaseAssert(mCurrentArchive->hasGetCmd());
Expand Down
38 changes: 15 additions & 23 deletions src/historywork/VerifyBucketWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "historywork/VerifyBucketWork.h"
#include "bucket/LiveBucketIndex.h"
#include "crypto/Hex.h"
#include "crypto/SHA.h"
#include "main/Application.h"
Expand All @@ -20,13 +21,13 @@
namespace stellar
{

VerifyBucketWork::VerifyBucketWork(Application& app,
std::string const& bucketFile,
uint256 const& hash,
OnFailureCallback failureCb)
VerifyBucketWork::VerifyBucketWork(
Application& app, std::string const& bucketFile, uint256 const& hash,
std::unique_ptr<LiveBucketIndex const>& index, OnFailureCallback failureCb)
: BasicWork(app, "verify-bucket-hash-" + bucketFile, BasicWork::RETRY_NEVER)
, mBucketFile(bucketFile)
, mHash(hash)
, mIndex(index)
, mOnFailure(failureCb)
{
}
Expand Down Expand Up @@ -57,8 +58,8 @@ VerifyBucketWork::spawnVerifier()
std::weak_ptr<VerifyBucketWork> weak(
std::static_pointer_cast<VerifyBucketWork>(shared_from_this()));
app.postOnBackgroundThread(
[&app, filename, weak, hash]() {
SHA256 hasher;
[&app, filename, weak, hash, &index = mIndex]() {
auto hasher = std::make_optional<SHA256>();
asio::error_code ec;

// No point in verifying buckets if things are shutting down
Expand All @@ -71,24 +72,15 @@ VerifyBucketWork::spawnVerifier()
try
{
ZoneNamedN(verifyZone, "bucket verify", true);
CLOG_INFO(History, "Verifying bucket {}", binToHex(hash));
CLOG_INFO(History, "Verifying and indexing bucket {}",
binToHex(hash));

// ensure that the stream gets its own scope to avoid race with
// main thread
std::ifstream in(filename, std::ifstream::binary);
if (!in)
{
throw std::runtime_error(fmt::format(
FMT_STRING("Error opening file {}"), filename));
}
in.exceptions(std::ios::badbit);
char buf[4096];
while (in)
{
in.read(buf, sizeof(buf));
hasher.add(ByteSlice(buf, in.gcount()));
}
uint256 vHash = hasher.finish();
index = createIndex<LiveBucket>(
app.getBucketManager(), filename, hash,
app.getWorkerIOContext(), hasher);
releaseAssertOrThrow(index);

uint256 vHash = hasher->finish();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Assert hasher.has_value() before dereferencing here or use hasher.value().finish.

if (vHash == hash)
{
CLOG_DEBUG(History, "Verified hash ({}) for {}",
Expand Down
Loading
Loading