Skip to content

Commit

Permalink
Merge branch 'master' into hot-archive-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
graydon authored Feb 4, 2025
2 parents 0a599d7 + 39caa20 commit 3c7360a
Show file tree
Hide file tree
Showing 62 changed files with 2,319 additions and 1,555 deletions.
14 changes: 7 additions & 7 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ bucketlist-archive.size.bytes | counter | total size of the hot ar
bucketlist.size.bytes | counter | total size of the BucketList in bytes
bucketlist.entryCounts.-<X> | counter | number of entries of type <X> in the BucketList
bucketlist.entrySizes.-<X> | counter | size of entries of type <X> in the BucketList
bucketlistDB.bloom.lookups | meter | number of bloom filter lookups
bucketlistDB.bloom.misses | meter | number of bloom filter false positives
bucketlistDB.bulk.loads | meter | number of entries BucketListDB queried to prefetch
bucketlistDB.bulk.inflationWinners | timer | time to load inflation winners
bucketlistDB.bulk.poolshareTrustlines | timer | time to load poolshare trustlines by accountID and assetID
bucketlistDB.bulk.prefetch | timer | time to prefetch
bucketlistDB.point.<X> | timer | time to load single entry of type <X> (if no bloom miss occurred)
bucketlistDB-<X>.bloom.lookups | meter | number of bloom filter lookups on BucketList <X> (live/hotArchive)
bucketlistDB-<X>.bloom.misses | meter | number of bloom filter false positives on BucketList <X> (live/hotArchive)
bucketlistDB-<X>.bulk.loads | meter | number of entries BucketListDB queried to prefetch on BucketList <X> (live/hot-archive)
bucketlistDB-live.bulk.inflationWinners | timer | time to load inflation winners
bucketlistDB-live.bulk.poolshareTrustlines | timer | time to load poolshare trustlines by accountID and assetID
bucketlistDB-live.bulk.prefetch | timer | time to prefetch
bucketlistDB-<X>.point.<y> | timer | time to load single entry of type <Y> on BucketList <X> (live/hotArchive)
crypto.verify.hit | meter | number of signature cache hits
crypto.verify.miss | meter | number of signature cache misses
crypto.verify.total | meter | sum of both hits and misses
Expand Down
4 changes: 2 additions & 2 deletions docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,13 @@ MAX_DEX_TX_OPERATIONS_IN_TX_SET = 0
# 0, indiviudal index is always used. Default page size 16 kb.
BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT = 14

# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 20
# BUCKETLIST_DB_INDEX_CUTOFF (Integer) default 250
# Size, in MB, determining whether a bucket should have an individual
# key index or a key range index. If bucket size is below this value, range
# based index will be used. If set to 0, all buckets are range indexed. If
# BUCKETLIST_DB_INDEX_PAGE_SIZE_EXPONENT == 0, value ingnored and all
# buckets have individual key index.
BUCKETLIST_DB_INDEX_CUTOFF = 20
BUCKETLIST_DB_INDEX_CUTOFF = 250

# BUCKETLIST_DB_PERSIST_INDEX (bool) default true
# Determines whether BucketListDB indexes are saved to disk for faster
Expand Down
4 changes: 2 additions & 2 deletions src/bucket/BucketApplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ shouldApplyEntry(BucketEntry const& e)
{
if (e.type() == LIVEENTRY || e.type() == INITENTRY)
{
return BucketIndex::typeNotSupported(e.liveEntry().data.type());
return LiveBucketIndex::typeNotSupported(e.liveEntry().data.type());
}

if (e.type() != DEADENTRY)
{
throw std::runtime_error(
"Malformed bucket: unexpected non-INIT/LIVE/DEAD entry.");
}
return BucketIndex::typeNotSupported(e.deadEntry().type());
return LiveBucketIndex::typeNotSupported(e.deadEntry().type());
}

size_t
Expand Down
103 changes: 49 additions & 54 deletions src/bucket/BucketBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// else.
#include "util/asio.h" // IWYU pragma: keep
#include "bucket/BucketBase.h"
#include "bucket/BucketIndex.h"
#include "bucket/BucketInputIterator.h"
#include "bucket/BucketManager.h"
#include "bucket/BucketOutputIterator.h"
Expand All @@ -30,36 +29,35 @@
namespace stellar
{

BucketIndex const&
BucketBase::getIndex() const
template <class BucketT, class IndexT>
IndexT const&
BucketBase<BucketT, IndexT>::getIndex() const
{
ZoneScoped;
releaseAssertOrThrow(!mFilename.empty());
releaseAssertOrThrow(mIndex);
return *mIndex;
}

template <class BucketT, class IndexT>
bool
BucketBase::isIndexed() const
BucketBase<BucketT, IndexT>::isIndexed() const
{
return static_cast<bool>(mIndex);
}

std::optional<std::pair<std::streamoff, std::streamoff>>
BucketBase::getOfferRange() const
{
return getIndex().getOfferRange();
}

template <class BucketT, class IndexT>
void
BucketBase::setIndex(std::unique_ptr<BucketIndex const>&& index)
BucketBase<BucketT, IndexT>::setIndex(std::unique_ptr<IndexT const>&& index)
{
releaseAssertOrThrow(!mIndex);
mIndex = std::move(index);
}

BucketBase::BucketBase(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index)
template <class BucketT, class IndexT>
BucketBase<BucketT, IndexT>::BucketBase(std::string const& filename,
Hash const& hash,
std::unique_ptr<IndexT const>&& index)
: mFilename(filename), mHash(hash), mIndex(std::move(index))
{
releaseAssert(filename.empty() || fs::exists(filename));
Expand All @@ -71,30 +69,34 @@ BucketBase::BucketBase(std::string const& filename, Hash const& hash,
}
}

BucketBase::BucketBase()
template <class BucketT, class IndexT> BucketBase<BucketT, IndexT>::BucketBase()
{
}

template <class BucketT, class IndexT>
Hash const&
BucketBase::getHash() const
BucketBase<BucketT, IndexT>::getHash() const
{
return mHash;
}

template <class BucketT, class IndexT>
std::filesystem::path const&
BucketBase::getFilename() const
BucketBase<BucketT, IndexT>::getFilename() const
{
return mFilename;
}

template <class BucketT, class IndexT>
size_t
BucketBase::getSize() const
BucketBase<BucketT, IndexT>::getSize() const
{
return mSize;
}

template <class BucketT, class IndexT>
bool
BucketBase::isEmpty() const
BucketBase<BucketT, IndexT>::isEmpty() const
{
if (mFilename.empty() || isZero(mHash))
{
Expand All @@ -105,14 +107,17 @@ BucketBase::isEmpty() const
return false;
}

template <class BucketT, class IndexT>
void
BucketBase::freeIndex()
BucketBase<BucketT, IndexT>::freeIndex()
{
mIndex.reset(nullptr);
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomFileName(std::string const& tmpDir, std::string ext)
BucketBase<BucketT, IndexT>::randomFileName(std::string const& tmpDir,
std::string ext)
{
ZoneScoped;
for (;;)
Expand All @@ -127,14 +132,16 @@ BucketBase::randomFileName(std::string const& tmpDir, std::string ext)
}
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomBucketName(std::string const& tmpDir)
BucketBase<BucketT, IndexT>::randomBucketName(std::string const& tmpDir)
{
return randomFileName(tmpDir, ".xdr");
}

template <class BucketT, class IndexT>
std::string
BucketBase::randomBucketIndexName(std::string const& tmpDir)
BucketBase<BucketT, IndexT>::randomBucketIndexName(std::string const& tmpDir)
{
return randomFileName(tmpDir, ".index");
}
Expand Down Expand Up @@ -172,7 +179,7 @@ BucketBase::randomBucketIndexName(std::string const& tmpDir)
// and shadowing protocol simultaneously, the moment the first new-protocol
// bucket enters the youngest level. At least one new bucket is in every merge's
// shadows from then on in, so they all upgrade (and preserve lifecycle events).
template <class BucketT>
template <class BucketT, class IndexT>
static void
calculateMergeProtocolVersion(
MergeCounters& mc, uint32_t maxProtocolVersion,
Expand Down Expand Up @@ -253,7 +260,7 @@ calculateMergeProtocolVersion(
// side, or entries that compare non-equal. In all these cases we just
// take the lesser (or existing) entry and advance only one iterator,
// not scrutinizing the entry type further.
template <class BucketT>
template <class BucketT, class IndexT>
static bool
mergeCasesWithDefaultAcceptance(
BucketEntryIdCmp<BucketT> const& cmp, MergeCounters& mc,
Expand Down Expand Up @@ -299,14 +306,15 @@ mergeCasesWithDefaultAcceptance(
return false;
}

template <class BucketT>
template <class BucketT, class IndexT>
std::shared_ptr<BucketT>
BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents,
asio::io_context& ctx, bool doFsync)
BucketBase<BucketT, IndexT>::merge(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
std::shared_ptr<BucketT> const& newBucket,
std::vector<std::shared_ptr<BucketT>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync)
{
BUCKET_TYPE_ASSERT(BucketT);

Expand All @@ -326,9 +334,9 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,

uint32_t protocolVersion;
bool keepShadowedLifecycleEntries;
calculateMergeProtocolVersion<BucketT>(mc, maxProtocolVersion, oi, ni,
shadowIterators, protocolVersion,
keepShadowedLifecycleEntries);
calculateMergeProtocolVersion<BucketT, IndexT>(
mc, maxProtocolVersion, oi, ni, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries);

auto timer = bucketManager.getMergeTimer().TimeScope();
BucketMetadata meta;
Expand All @@ -340,14 +348,14 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
{
releaseAssertOrThrow(protocolVersionStartsFrom(
maxProtocolVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
BucketT::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
meta.ext = ni.getMetadata().ext;
}
else if (oi.getMetadata().ext.v() == 1)
{
releaseAssertOrThrow(protocolVersionStartsFrom(
maxProtocolVersion,
BucketBase::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
BucketT::FIRST_PROTOCOL_SUPPORTING_PERSISTENT_EVICTION));
meta.ext = oi.getMetadata().ext;
}

Expand All @@ -374,9 +382,9 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
}
}

if (!mergeCasesWithDefaultAcceptance(cmp, mc, oi, ni, out,
shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
if (!mergeCasesWithDefaultAcceptance<BucketT, IndexT>(
cmp, mc, oi, ni, out, shadowIterators, protocolVersion,
keepShadowedLifecycleEntries))
{
BucketT::mergeCasesWithEqualKeys(mc, oi, ni, out, shadowIterators,
protocolVersion,
Expand All @@ -400,19 +408,6 @@ BucketBase::merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
return out.getBucket(bucketManager, &mk);
}

template std::shared_ptr<LiveBucket> BucketBase::merge<LiveBucket>(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<LiveBucket> const& oldBucket,
std::shared_ptr<LiveBucket> const& newBucket,
std::vector<std::shared_ptr<LiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);

template std::shared_ptr<HotArchiveBucket> BucketBase::merge<HotArchiveBucket>(
BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<HotArchiveBucket> const& oldBucket,
std::shared_ptr<HotArchiveBucket> const& newBucket,
std::vector<std::shared_ptr<HotArchiveBucket>> const& shadows,
bool keepTombstoneEntries, bool countMergeEvents, asio::io_context& ctx,
bool doFsync);
template class BucketBase<LiveBucket, LiveBucket::IndexT>;
template class BucketBase<HotArchiveBucket, HotArchiveBucket::IndexT>;
}
42 changes: 27 additions & 15 deletions src/bucket/BucketBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// under the apache license, version 2.0. see the copying file at the root
// of this distribution or at http://www.apache.org/licenses/license-2.0

#include "bucket/BucketIndex.h"
#include "bucket/BucketUtils.h"
#include "util/NonCopyable.h"
#include "util/ProtocolVersion.h"
#include "xdr/Stellar-types.h"
Expand Down Expand Up @@ -47,17 +47,37 @@ enum class Loop
INCOMPLETE
};

class HotArchiveBucket;
class LiveBucket;
class LiveBucketIndex;
class HotArchiveBucketIndex;

template <class BucketT, class IndexT>
class BucketBase : public NonMovableOrCopyable
{
BUCKET_TYPE_ASSERT(BucketT);

// Because of the CRTP design with derived Bucket classes, this base class
// does not have direct access to BucketT::IndexT, so we take two templates
// and make this assert.
static_assert(
std::is_same_v<
IndexT,
std::conditional_t<
std::is_same_v<BucketT, LiveBucket>, LiveBucketIndex,
std::conditional_t<std::is_same_v<BucketT, HotArchiveBucket>,
HotArchiveBucketIndex, void>>>,
"IndexT must match BucketT::IndexT");

protected:
std::filesystem::path const mFilename;
Hash const mHash;
size_t mSize{0};

std::unique_ptr<BucketIndex const> mIndex{};
std::unique_ptr<IndexT const> mIndex{};

// Returns index, throws if index not yet initialized
BucketIndex const& getIndex() const;
IndexT const& getIndex() const;

static std::string randomFileName(std::string const& tmpDir,
std::string ext);
Expand All @@ -74,7 +94,7 @@ class BucketBase : public NonMovableOrCopyable
// exists, but does not check that the hash is the bucket's hash. Caller
// needs to ensure that.
BucketBase(std::string const& filename, Hash const& hash,
std::unique_ptr<BucketIndex const>&& index);
std::unique_ptr<IndexT const>&& index);

Hash const& getHash() const;
std::filesystem::path const& getFilename() const;
Expand All @@ -88,13 +108,8 @@ class BucketBase : public NonMovableOrCopyable
// Returns true if bucket is indexed, false otherwise
bool isIndexed() const;

// Returns [lowerBound, upperBound) of file offsets for all offers in the
// bucket, or std::nullopt if no offers exist
std::optional<std::pair<std::streamoff, std::streamoff>>
getOfferRange() const;

// Sets index, throws if index is already set
void setIndex(std::unique_ptr<BucketIndex const>&& index);
void setIndex(std::unique_ptr<IndexT const>&& index);

// Merge two buckets together, producing a fresh one. Entries in `oldBucket`
// are overridden in the fresh bucket by keywise-equal entries in
Expand All @@ -107,7 +122,6 @@ class BucketBase : public NonMovableOrCopyable
// `maxProtocolVersion` bounds this (for error checking) and should usually
// be the protocol of the ledger header at which the merge is starting. An
// exception will be thrown if any provided bucket versions exceed it.
template <class BucketT>
static std::shared_ptr<BucketT>
merge(BucketManager& bucketManager, uint32_t maxProtocolVersion,
std::shared_ptr<BucketT> const& oldBucket,
Expand All @@ -120,16 +134,14 @@ class BucketBase : public NonMovableOrCopyable
static std::string randomBucketIndexName(std::string const& tmpDir);

#ifdef BUILD_TESTS
BucketIndex const&
IndexT const&
getIndexForTesting() const
{
return getIndex();
}

#endif // BUILD_TESTS

virtual uint32_t getBucketVersion() const = 0;

template <class BucketT> friend class BucketSnapshotBase;
template <class T> friend class BucketSnapshotBase;
};
}
Loading

0 comments on commit 3c7360a

Please sign in to comment.