Skip to content

Commit

Permalink
Implement parallel ledger close, off by default
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Jan 6, 2025
1 parent a757b61 commit 9dce230
Show file tree
Hide file tree
Showing 68 changed files with 1,133 additions and 621 deletions.
2 changes: 0 additions & 2 deletions src/bucket/BucketListBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ template <typename BucketT>
void
BucketLevel<BucketT>::setNext(FutureBucket<BucketT> const& fb)
{
releaseAssert(threadIsMain());
mNextCurr = fb;
}

Expand All @@ -79,7 +78,6 @@ template <typename BucketT>
void
BucketLevel<BucketT>::setCurr(std::shared_ptr<BucketT> b)
{
releaseAssert(threadIsMain());
mNextCurr.clear();
mCurr = b;
}
Expand Down
2 changes: 0 additions & 2 deletions src/bucket/BucketListSnapshotBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ BucketListSnapshot<BucketT>::BucketListSnapshot(
BucketListBase<BucketT> const& bl, LedgerHeader header)
: mHeader(std::move(header))
{
releaseAssert(threadIsMain());

for (uint32_t i = 0; i < BucketListBase<BucketT>::kNumLevels; ++i)
{
auto const& level = bl.getLevel(i);
Expand Down
74 changes: 36 additions & 38 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ void
BucketManager::initialize()
{
ZoneScoped;
releaseAssert(threadIsMain());
std::string d = mConfig.BUCKET_DIR_PATH;

if (!fs::exists(d))
Expand Down Expand Up @@ -729,7 +730,7 @@ BucketManager::getBucketListReferencedBuckets() const
}

std::set<Hash>
BucketManager::getAllReferencedBuckets() const
BucketManager::getAllReferencedBuckets(HistoryArchiveState const& has) const
{
ZoneScoped;
auto referenced = getBucketListReferencedBuckets();
Expand All @@ -740,8 +741,7 @@ BucketManager::getAllReferencedBuckets() const

// retain any bucket referenced by the last closed ledger as recorded in the
// database (as merges complete, the bucket list drifts from that state)
auto lclHas = mApp.getLedgerManager().getLastClosedLedgerHAS();
auto lclBuckets = lclHas.allBuckets();
auto lclBuckets = has.allBuckets();
for (auto const& h : lclBuckets)
{
auto rit = referenced.emplace(hexToBin256(h));
Expand All @@ -752,39 +752,38 @@ BucketManager::getAllReferencedBuckets() const
}

// retain buckets that are referenced by a state in the publish queue.
auto pub = mApp.getHistoryManager().getBucketsReferencedByPublishQueue();
for (auto const& h :
HistoryManager::getBucketsReferencedByPublishQueue(mApp.getConfig()))
{
for (auto const& h : pub)
auto rhash = hexToBin256(h);
auto rit = referenced.emplace(rhash);
if (rit.second)
{
auto rhash = hexToBin256(h);
auto rit = referenced.emplace(rhash);
if (rit.second)
{
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);

// Project referenced bucket `rhash` -- which might be a merge
// input captured before a merge finished -- through our weak
// map of merge input/output relationships, to find any outputs
// we'll want to retain in order to resynthesize the merge in
// the future, rather than re-run it.
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
}
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);

// Project referenced bucket `rhash` -- which might be a merge
// input captured before a merge finished -- through our weak
// map of merge input/output relationships, to find any outputs
// we'll want to retain in order to resynthesize the merge in
// the future, rather than re-run it.
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
}
}
return referenced;
}

void
BucketManager::cleanupStaleFiles()
BucketManager::cleanupStaleFiles(HistoryArchiveState const& has)
{
ZoneScoped;
releaseAssert(threadIsMain());
if (mConfig.DISABLE_BUCKET_GC)
{
return;
}

std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
std::transform(std::begin(mSharedLiveBuckets), std::end(mSharedLiveBuckets),
std::inserter(referenced, std::end(referenced)),
[](std::pair<Hash, std::shared_ptr<LiveBucket>> const& p) {
Expand Down Expand Up @@ -818,11 +817,11 @@ BucketManager::cleanupStaleFiles()
}

void
BucketManager::forgetUnreferencedBuckets()
BucketManager::forgetUnreferencedBuckets(HistoryArchiveState const& has)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
auto blReferenced = getBucketListReferencedBuckets();

auto bucketMapLoop = [&](auto& bucketMap, auto& futureMap) {
Expand Down Expand Up @@ -867,7 +866,7 @@ BucketManager::forgetUnreferencedBuckets()
Bucket,
"BucketManager::forgetUnreferencedBuckets dropping {}",
filename);
if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC)
if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC)
{
CLOG_TRACE(Bucket, "removing bucket file: {}", filename);
std::filesystem::remove(filename);
Expand Down Expand Up @@ -1048,15 +1047,15 @@ BucketManager::maybeSetIndex(std::shared_ptr<BucketBase> b,
}

void
BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq)
BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
SorobanNetworkConfig const& cfg)
{
releaseAssert(mSnapshotManager);
releaseAssert(!mEvictionFuture.valid());
releaseAssert(mEvictionStatistics);

auto searchableBL =
mSnapshotManager->copySearchableLiveBucketListSnapshot();
auto const& cfg = mApp.getLedgerManager().getSorobanNetworkConfigForApply();
auto const& sas = cfg.stateArchivalSettings();

using task_t = std::packaged_task<EvictionResult()>;
Expand All @@ -1076,30 +1075,26 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq)
}

void
BucketManager::resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys)
BucketManager::resolveBackgroundEvictionScan(
AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys, SorobanNetworkConfig& networkConfig)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssert(mEvictionStatistics);

if (!mEvictionFuture.valid())
{
startBackgroundEvictionScan(ledgerSeq);
startBackgroundEvictionScan(ledgerSeq, networkConfig);
}

auto evictionCandidates = mEvictionFuture.get();

auto const& networkConfig =
mApp.getLedgerManager().getSorobanNetworkConfigForApply();

// If eviction related settings changed during the ledger, we have to
// restart the scan
if (!evictionCandidates.isValid(ledgerSeq,
networkConfig.stateArchivalSettings()))
{
startBackgroundEvictionScan(ledgerSeq);
startBackgroundEvictionScan(ledgerSeq, networkConfig);
evictionCandidates = mEvictionFuture.get();
}

Expand Down Expand Up @@ -1209,6 +1204,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion, bool restartMerges)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// TODO: Assume archival bucket state
Expand Down Expand Up @@ -1257,7 +1253,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
}
cleanupStaleFiles();
cleanupStaleFiles(has);
}

void
Expand Down Expand Up @@ -1358,7 +1354,7 @@ std::shared_ptr<LiveBucket>
BucketManager::mergeBuckets(HistoryArchiveState const& has)
{
ZoneScoped;

releaseAssert(threadIsMain());
std::map<LedgerKey, LedgerEntry> ledgerMap = loadCompleteLedgerState(has);
BucketMetadata meta;
MergeCounters mc;
Expand Down Expand Up @@ -1548,9 +1544,11 @@ BucketManager::visitLedgerEntries(
}

std::shared_ptr<BasicWork>
BucketManager::scheduleVerifyReferencedBucketsWork()
BucketManager::scheduleVerifyReferencedBucketsWork(
HistoryArchiveState const& has)
{
std::set<Hash> hashes = getAllReferencedBuckets();
releaseAssert(threadIsMain());
std::set<Hash> hashes = getAllReferencedBuckets(has);
std::vector<std::shared_ptr<BasicWork>> seq;
for (auto const& h : hashes)
{
Expand Down
21 changes: 15 additions & 6 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ class BucketManager : NonMovableOrCopyable

static std::string const kLockFilename;

// NB: ideally, BucketManager should have no access to mApp, as it's too
// dangerous in the context of parallel application. BucketManager is quite
// bloated, with lots of legacy code, so to ensure safety, annotate all
// functions using mApp with `releaseAssert(threadIsMain())` and avoid
// accessing mApp in the background.
Application& mApp;
std::unique_ptr<LiveBucketList> mLiveBucketList;
std::unique_ptr<HotArchiveBucketList> mHotArchiveBucketList;
Expand Down Expand Up @@ -124,7 +129,7 @@ class BucketManager : NonMovableOrCopyable

std::atomic<bool> mIsShutdown{false};

void cleanupStaleFiles();
void cleanupStaleFiles(HistoryArchiveState const& has);
void deleteTmpDirAndUnlockBucketDir();
void deleteEntireBucketDir();

Expand Down Expand Up @@ -260,7 +265,7 @@ class BucketManager : NonMovableOrCopyable
// not immediately cause the buckets to delete themselves, if someone else
// is using them via a shared_ptr<>, but the BucketManager will no longer
// independently keep them alive.
void forgetUnreferencedBuckets();
void forgetUnreferencedBuckets(HistoryArchiveState const& has);

// Feed a new batch of entries to the bucket list. This interface expects to
// be given separate init (created) and live (updated) entry vectors. The
Expand Down Expand Up @@ -290,10 +295,12 @@ class BucketManager : NonMovableOrCopyable
// Scans BucketList for non-live entries to evict starting at the entry
// pointed to by EvictionIterator. Evicts until `maxEntriesToEvict` entries
// have been evicted or maxEvictionScanSize bytes have been scanned.
void startBackgroundEvictionScan(uint32_t ledgerSeq);
void startBackgroundEvictionScan(uint32_t ledgerSeq,
SorobanNetworkConfig const& cfg);
void resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys);
LedgerKeySet const& modifiedKeys,
SorobanNetworkConfig& networkConfig);

medida::Meter& getBloomMissMeter() const;
medida::Meter& getBloomLookupMeter() const;
Expand All @@ -318,7 +325,8 @@ class BucketManager : NonMovableOrCopyable

// Return the set of buckets referenced by the BucketList, LCL HAS,
// and publish queue.
std::set<Hash> getAllReferencedBuckets() const;
std::set<Hash>
getAllReferencedBuckets(HistoryArchiveState const& has) const;

// Check for missing bucket files that would prevent `assumeState` from
// succeeding
Expand Down Expand Up @@ -375,7 +383,8 @@ class BucketManager : NonMovableOrCopyable

// Schedule a Work class that verifies the hashes of all referenced buckets
// on background threads.
std::shared_ptr<BasicWork> scheduleVerifyReferencedBucketsWork();
std::shared_ptr<BasicWork>
scheduleVerifyReferencedBucketsWork(HistoryArchiveState const& has);

Config const& getConfig() const;

Expand Down
3 changes: 0 additions & 3 deletions src/bucket/BucketSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label,
{
// For now, only keep metrics for the main thread. We can decide on what
// metrics make sense when more background services are added later.
releaseAssert(threadIsMain());

if (numEntries != 0)
{
Expand Down Expand Up @@ -153,8 +152,6 @@ BucketSnapshotManager::updateCurrentSnapshot(
SnapshotPtrT<LiveBucket>&& liveSnapshot,
SnapshotPtrT<HotArchiveBucket>&& hotArchiveSnapshot)
{
releaseAssert(threadIsMain());

auto updateSnapshot = [numHistoricalSnapshots = mNumHistoricalSnapshots](
auto& currentSnapshot, auto& historicalSnapshots,
auto&& newSnapshot) {
Expand Down
3 changes: 3 additions & 0 deletions src/bucket/LiveBucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

namespace stellar
{

class SorobanNetworkConfig;

// The LiveBucketList stores the current canonical state of the ledger. It is
// made up of LiveBucket buckets, which in turn store individual entries of type
// BucketEntry. When an entry is "evicted" from the ledger, it is removed from
Expand Down
2 changes: 0 additions & 2 deletions src/bucket/SearchableBucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ SearchableLiveBucketListSnapshot::loadPoolShareTrustLinesByAccountAndAsset(
ZoneScoped;

// This query should only be called during TX apply
releaseAssert(threadIsMain());
releaseAssert(mSnapshot);

LedgerKeySet trustlinesToLoad;
Expand Down Expand Up @@ -153,7 +152,6 @@ SearchableLiveBucketListSnapshot::loadInflationWinners(size_t maxWinners,

// This is a legacy query, should only be called by main thread during
// catchup
releaseAssert(threadIsMain());
auto timer = mSnapshotManager.recordBulkLoadMetrics("inflationWinners", 0)
.TimeScope();

Expand Down
2 changes: 1 addition & 1 deletion src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ TEST_CASE_VERSIONS("network config snapshots BucketList size", "[bucketlist]")
LedgerManagerForBucketTests& lm = app->getLedgerManager();

auto& networkConfig =
app->getLedgerManager().getSorobanNetworkConfigReadOnly();
app->getLedgerManager().getMutableSorobanNetworkConfig();

uint32_t windowSize = networkConfig.stateArchivalSettings()
.bucketListSizeWindowSampleSize;
Expand Down
Loading

0 comments on commit 9dce230

Please sign in to comment.