Skip to content

Commit

Permalink
Index bucket during verify step
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Jan 30, 2025
1 parent 3424a6b commit b4dfd1e
Show file tree
Hide file tree
Showing 19 changed files with 100 additions and 53 deletions.
10 changes: 6 additions & 4 deletions src/bucket/BucketIndexUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ getPageSizeFromConfig(Config const& cfg)
template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx)
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher)
{
BUCKET_TYPE_ASSERT(BucketT);

Expand All @@ -40,7 +41,7 @@ createIndex(BucketManager& bm, std::filesystem::path const& filename,
try
{
return std::unique_ptr<typename BucketT::IndexT const>(
new typename BucketT::IndexT(bm, filename, hash, ctx));
new typename BucketT::IndexT(bm, filename, hash, ctx, hasher));
}
// BucketIndex throws if BucketManager shuts down before index finishes,
// so return empty index instead of partial index
Expand Down Expand Up @@ -87,11 +88,12 @@ loadIndex(BucketManager const& bm, std::filesystem::path const& filename,
template std::unique_ptr<typename LiveBucket::IndexT const>
createIndex<LiveBucket>(BucketManager& bm,
std::filesystem::path const& filename, Hash const& hash,
asio::io_context& ctx);
asio::io_context& ctx, std::optional<SHA256>& hasher);
template std::unique_ptr<typename HotArchiveBucket::IndexT const>
createIndex<HotArchiveBucket>(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

template std::unique_ptr<typename LiveBucket::IndexT const>
loadIndex<LiveBucket>(BucketManager const& bm,
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketIndexUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "crypto/SHA.h"
#include "util/GlobalChecks.h"
#include "util/XDROperators.h" // IWYU pragma: keep
#include "xdr/Stellar-ledger-entries.h"
Expand Down Expand Up @@ -99,7 +100,8 @@ std::streamoff getPageSizeFromConfig(Config const& cfg);
template <class BucketT>
std::unique_ptr<typename BucketT::IndexT const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

// Loads index from given file. If file does not exist or if saved
// index does not have expected version or pageSize, return null
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1592,8 +1592,9 @@ BucketManager::scheduleVerifyReferencedBucketsWork(
throw std::runtime_error(fmt::format(
FMT_STRING("Missing referenced bucket {}"), binToHex(h)));
}
std::unique_ptr<LiveBucketIndex const> index;
seq.emplace_back(std::make_shared<VerifyBucketWork>(
mApp, b->getFilename().string(), b->getHash(), nullptr));
mApp, b->getFilename().string(), b->getHash(), index, nullptr));
}
return mApp.getWorkScheduler().scheduleWork<WorkSequence>(
"verify-referenced-buckets", seq);
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ BucketOutputIterator<BucketT>::getBucket(BucketManager& bucketManager,
if (auto b = bucketManager.getBucketIfExists<BucketT>(hash);
!b || !b->isIndexed())
{
index = createIndex<BucketT>(bucketManager, mFilename, hash, mCtx);
std::optional<SHA256> empty{};
index =
createIndex<BucketT>(bucketManager, mFilename, hash, mCtx, empty);
}

return bucketManager.adoptFileAsBucket<BucketT>(mFilename.string(), hash,
Expand Down
6 changes: 4 additions & 2 deletions src/bucket/DiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ template <class BucketT>
DiskIndex<BucketT>::DiskIndex(BucketManager& bm,
std::filesystem::path const& filename,
std::streamoff pageSize, Hash const& hash,
asio::io_context& ctx)
asio::io_context& ctx,
std::optional<SHA256>& hasher)
: mBloomLookupMeter(bm.getBloomLookupMeter<BucketT>())
, mBloomMissMeter(bm.getBloomMissMeter<BucketT>())
{
Expand All @@ -147,8 +148,9 @@ DiskIndex<BucketT>::DiskIndex(BucketManager& bm,

std::vector<uint64_t> keyHashes;
auto seed = shortHash::getShortHashInitKey();
SHA256* hasherPtr = hasher.has_value() ? &hasher.value() : nullptr;

while (in && in.readOne(be))
while (in && in.readOne(be, hasherPtr))
{
// peridocially check if bucket manager is exiting to stop indexing
// gracefully
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/DiskIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class io_context;
namespace stellar
{
class BucketManager;
class SHA256;

// maps smallest and largest LedgerKey on a given page inclusively
// [lowerBound, upperbound]
Expand Down Expand Up @@ -118,7 +119,8 @@ template <class BucketT> class DiskIndex : public NonMovableOrCopyable

// Constructor for creating a fresh index.
DiskIndex(BucketManager& bm, std::filesystem::path const& filename,
std::streamoff pageSize, Hash const& hash, asio::io_context& ctx);
std::streamoff pageSize, Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

// Constructor for loading pre-existing index from disk. Must call preLoad
// before calling this constructor to properly deserialize index.
Expand Down
5 changes: 3 additions & 2 deletions src/bucket/HotArchiveBucketIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ namespace stellar

HotArchiveBucketIndex::HotArchiveBucketIndex(
BucketManager& bm, std::filesystem::path const& filename, Hash const& hash,
asio::io_context& ctx)
: mDiskIndex(bm, filename, getPageSize(bm.getConfig(), 0), hash, ctx)
asio::io_context& ctx, std::optional<SHA256>& hasher)
: mDiskIndex(bm, filename, getPageSize(bm.getConfig(), 0), hash, ctx,
hasher)
{
ZoneScoped;
releaseAssert(!filename.empty());
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/HotArchiveBucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ class HotArchiveBucketIndex : public NonMovableOrCopyable

HotArchiveBucketIndex(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

template <class Archive>
HotArchiveBucketIndex(BucketManager const& bm, Archive& ar,
Expand Down
6 changes: 4 additions & 2 deletions src/bucket/InMemoryIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ InMemoryBucketState::scan(IterT start, LedgerKey const& searchKey) const
}

InMemoryIndex::InMemoryIndex(BucketManager const& bm,
std::filesystem::path const& filename)
std::filesystem::path const& filename,
std::optional<SHA256>& hasher)
{
XDRInputFileStream in;
in.open(filename.string());
Expand All @@ -60,8 +61,9 @@ InMemoryIndex::InMemoryIndex(BucketManager const& bm,
std::streamoff lastOffset = 0;
std::optional<std::streamoff> firstOffer;
std::optional<std::streamoff> lastOffer;
SHA256* hasherPtr = hasher.has_value() ? &hasher.value() : nullptr;

while (in && in.readOne(be))
while (in && in.readOne(be, hasherPtr))
{
if (++iter >= 1000)
{
Expand Down
5 changes: 4 additions & 1 deletion src/bucket/InMemoryIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
namespace stellar
{

class SHA256;

// For small Buckets, we can cache all contents in memory. Because we cache all
// entries, the index is just as large as the Bucket itself, so we never persist
// this index type. It is always recreated on startup.
Expand Down Expand Up @@ -66,7 +68,8 @@ class InMemoryIndex
using IterT = InMemoryBucketState::IterT;

InMemoryIndex(BucketManager const& bm,
std::filesystem::path const& filename);
std::filesystem::path const& filename,
std::optional<SHA256>& hasher);

IterT
begin() const
Expand Down
7 changes: 4 additions & 3 deletions src/bucket/LiveBucketIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ LiveBucketIndex::getPageSize(Config const& cfg, size_t bucketSize)

LiveBucketIndex::LiveBucketIndex(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx)
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher)
{
ZoneScoped;
releaseAssert(!filename.empty());
Expand All @@ -50,7 +51,7 @@ LiveBucketIndex::LiveBucketIndex(BucketManager& bm,
"LiveBucketIndex::createIndex() using in-memory index for "
"bucket {}",
filename);
mInMemoryIndex = std::make_unique<InMemoryIndex>(bm, filename);
mInMemoryIndex = std::make_unique<InMemoryIndex>(bm, filename, hasher);
}
else
{
Expand All @@ -59,7 +60,7 @@ LiveBucketIndex::LiveBucketIndex(BucketManager& bm,
"page size {} in bucket {}",
pageSize, filename);
mDiskIndex = std::make_unique<DiskIndex<LiveBucket>>(
bm, filename, pageSize, hash, ctx);
bm, filename, pageSize, hash, ctx, hasher);
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/bucket/LiveBucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace stellar
*/

class BucketManager;

class SHA256;
class LiveBucketIndex : public NonMovableOrCopyable
{
public:
Expand Down Expand Up @@ -74,7 +74,8 @@ class LiveBucketIndex : public NonMovableOrCopyable

// Constructor for creating new index from Bucketfile
LiveBucketIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash, asio::io_context& ctx);
Hash const& hash, asio::io_context& ctx,
std::optional<SHA256>& hasher);

// Constructor for loading pre-existing index from disk
template <class Archive>
Expand Down
8 changes: 5 additions & 3 deletions src/catchup/IndexBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "bucket/BucketManager.h"
#include "bucket/DiskIndex.h"
#include "bucket/LiveBucket.h"
#include "crypto/SHA.h"
#include "util/Fs.h"
#include "util/Logging.h"
#include "util/UnorderedSet.h"
Expand Down Expand Up @@ -86,9 +87,10 @@ IndexBucketsWork::IndexWork::postWork()
if (!self->mIndex)
{
// TODO: Fix this when archive BucketLists assume state
self->mIndex =
createIndex<LiveBucket>(bm, self->mBucket->getFilename(),
self->mBucket->getHash(), ctx);
std::optional<SHA256> empty{};
self->mIndex = createIndex<LiveBucket>(
bm, self->mBucket->getFilename(), self->mBucket->getHash(),
ctx, empty);
}

app.postOnMainThread(
Expand Down
20 changes: 17 additions & 3 deletions src/historywork/DownloadBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,35 @@ DownloadBucketsWork::yieldMoreWork()
};
std::weak_ptr<DownloadBucketsWork> weak(
std::static_pointer_cast<DownloadBucketsWork>(shared_from_this()));
auto successCb = [weak, ft, hash](Application& app) -> bool {

auto currId = mIndexId++;
auto [indexIter, inserted] = mIndexMap.emplace(currId, nullptr);
releaseAssertOrThrow(inserted);

auto successCb = [weak, ft, hash, currId](Application& app) -> bool {
auto self = weak.lock();
if (self)
{
// To avoid dangling references, maintain a map of index pointers
// and do a lookup inside the callback instead of capturing anything
// by reference.
auto indexIter = self->mIndexMap.find(currId);
releaseAssertOrThrow(indexIter != self->mIndexMap.end());
releaseAssertOrThrow(indexIter->second);

auto bucketPath = ft.localPath_nogz();
auto b = app.getBucketManager().adoptFileAsBucket<LiveBucket>(
bucketPath, hexToBin256(hash),
/*mergeKey=*/nullptr,
/*index=*/nullptr);
/*index=*/std::move(indexIter->second));
self->mBuckets[hash] = b;
self->mIndexMap.erase(currId);
}
return true;
};
auto w2 = std::make_shared<VerifyBucketWork>(mApp, ft.localPath_nogz(),
hexToBin256(hash), failureCb);
hexToBin256(hash),
indexIter->second, failureCb);
auto w3 = std::make_shared<WorkWithCallback>(mApp, "adopt-verified-bucket",
successCb);
std::vector<std::shared_ptr<BasicWork>> seq{w1, w2, w3};
Expand Down
4 changes: 4 additions & 0 deletions src/historywork/DownloadBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ class DownloadBucketsWork : public BatchWork
TmpDir const& mDownloadDir;
std::shared_ptr<HistoryArchive> mArchive;

// Store indexes of downloaded buckets
std::map<int, std::unique_ptr<LiveBucketIndex const>> mIndexMap;
int mIndexId{0};

public:
DownloadBucketsWork(
Application& app,
Expand Down
3 changes: 3 additions & 0 deletions src/historywork/GetRemoteFileWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ GetRemoteFileWork::getCommand()
{
mCurrentArchive = mApp.getHistoryArchiveManager()
.selectRandomReadableHistoryArchive();
CLOG_INFO(History, "Selected archive {} to download {}",
mCurrentArchive->getName(),
std::filesystem::path(mRemote).filename().string());
}
releaseAssert(mCurrentArchive);
releaseAssert(mCurrentArchive->hasGetCmd());
Expand Down
38 changes: 15 additions & 23 deletions src/historywork/VerifyBucketWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "historywork/VerifyBucketWork.h"
#include "bucket/LiveBucketIndex.h"
#include "crypto/Hex.h"
#include "crypto/SHA.h"
#include "main/Application.h"
Expand All @@ -20,13 +21,13 @@
namespace stellar
{

VerifyBucketWork::VerifyBucketWork(Application& app,
std::string const& bucketFile,
uint256 const& hash,
OnFailureCallback failureCb)
VerifyBucketWork::VerifyBucketWork(
Application& app, std::string const& bucketFile, uint256 const& hash,
std::unique_ptr<LiveBucketIndex const>& index, OnFailureCallback failureCb)
: BasicWork(app, "verify-bucket-hash-" + bucketFile, BasicWork::RETRY_NEVER)
, mBucketFile(bucketFile)
, mHash(hash)
, mIndex(index)
, mOnFailure(failureCb)
{
}
Expand Down Expand Up @@ -57,8 +58,8 @@ VerifyBucketWork::spawnVerifier()
std::weak_ptr<VerifyBucketWork> weak(
std::static_pointer_cast<VerifyBucketWork>(shared_from_this()));
app.postOnBackgroundThread(
[&app, filename, weak, hash]() {
SHA256 hasher;
[&app, filename, weak, hash, &index = mIndex]() {
auto hasher = std::make_optional<SHA256>();
asio::error_code ec;

// No point in verifying buckets if things are shutting down
Expand All @@ -71,24 +72,15 @@ VerifyBucketWork::spawnVerifier()
try
{
ZoneNamedN(verifyZone, "bucket verify", true);
CLOG_INFO(History, "Verifying bucket {}", binToHex(hash));
CLOG_INFO(History, "Verifying and indexing bucket {}",
binToHex(hash));

// ensure that the stream gets its own scope to avoid race with
// main thread
std::ifstream in(filename, std::ifstream::binary);
if (!in)
{
throw std::runtime_error(fmt::format(
FMT_STRING("Error opening file {}"), filename));
}
in.exceptions(std::ios::badbit);
char buf[4096];
while (in)
{
in.read(buf, sizeof(buf));
hasher.add(ByteSlice(buf, in.gcount()));
}
uint256 vHash = hasher.finish();
index = createIndex<LiveBucket>(
app.getBucketManager(), filename, hash,
app.getWorkerIOContext(), hasher);
releaseAssertOrThrow(index);

uint256 vHash = hasher->finish();
if (vHash == hash)
{
CLOG_DEBUG(History, "Verified hash ({}) for {}",
Expand Down
Loading

0 comments on commit b4dfd1e

Please sign in to comment.