Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel ledger close #4543

Merged
merged 10 commits into from
Jan 8, 2025
2 changes: 0 additions & 2 deletions src/bucket/BucketListBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ template <typename BucketT>
void
BucketLevel<BucketT>::setNext(FutureBucket<BucketT> const& fb)
{
releaseAssert(threadIsMain());
mNextCurr = fb;
}

Expand All @@ -79,7 +78,6 @@ template <typename BucketT>
void
BucketLevel<BucketT>::setCurr(std::shared_ptr<BucketT> b)
{
releaseAssert(threadIsMain());
mNextCurr.clear();
mCurr = b;
}
Expand Down
2 changes: 0 additions & 2 deletions src/bucket/BucketListSnapshotBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ BucketListSnapshot<BucketT>::BucketListSnapshot(
BucketListBase<BucketT> const& bl, LedgerHeader header)
: mHeader(std::move(header))
{
releaseAssert(threadIsMain());

for (uint32_t i = 0; i < BucketListBase<BucketT>::kNumLevels; ++i)
{
auto const& level = bl.getLevel(i);
Expand Down
77 changes: 38 additions & 39 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
#include "ledger/LedgerTypeUtils.h"
#include "ledger/NetworkConfig.h"
#include "main/Application.h"
#include "main/Config.h"
#include "util/Fs.h"
Expand Down Expand Up @@ -62,6 +63,7 @@ void
BucketManager::initialize()
{
ZoneScoped;
releaseAssert(threadIsMain());
std::string d = mConfig.BUCKET_DIR_PATH;

if (!fs::exists(d))
Expand Down Expand Up @@ -729,7 +731,7 @@ BucketManager::getBucketListReferencedBuckets() const
}

std::set<Hash>
BucketManager::getAllReferencedBuckets() const
BucketManager::getAllReferencedBuckets(HistoryArchiveState const& has) const
{
ZoneScoped;
auto referenced = getBucketListReferencedBuckets();
Expand All @@ -740,8 +742,7 @@ BucketManager::getAllReferencedBuckets() const

// retain any bucket referenced by the last closed ledger as recorded in the
// database (as merges complete, the bucket list drifts from that state)
auto lclHas = mApp.getLedgerManager().getLastClosedLedgerHAS();
auto lclBuckets = lclHas.allBuckets();
auto lclBuckets = has.allBuckets();
for (auto const& h : lclBuckets)
{
auto rit = referenced.emplace(hexToBin256(h));
Expand All @@ -752,39 +753,38 @@ BucketManager::getAllReferencedBuckets() const
}

// retain buckets that are referenced by a state in the publish queue.
auto pub = mApp.getHistoryManager().getBucketsReferencedByPublishQueue();
for (auto const& h :
HistoryManager::getBucketsReferencedByPublishQueue(mApp.getConfig()))
{
for (auto const& h : pub)
auto rhash = hexToBin256(h);
auto rit = referenced.emplace(rhash);
if (rit.second)
{
auto rhash = hexToBin256(h);
auto rit = referenced.emplace(rhash);
if (rit.second)
{
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);

// Project referenced bucket `rhash` -- which might be a merge
// input captured before a merge finished -- through our weak
// map of merge input/output relationships, to find any outputs
// we'll want to retain in order to resynthesize the merge in
// the future, rather than re-run it.
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
}
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);

// Project referenced bucket `rhash` -- which might be a merge
// input captured before a merge finished -- through our weak
// map of merge input/output relationships, to find any outputs
// we'll want to retain in order to resynthesize the merge in
// the future, rather than re-run it.
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
}
}
return referenced;
}

void
BucketManager::cleanupStaleFiles()
BucketManager::cleanupStaleFiles(HistoryArchiveState const& has)
{
ZoneScoped;
releaseAssert(threadIsMain());
if (mConfig.DISABLE_BUCKET_GC)
{
return;
}

std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
std::transform(std::begin(mSharedLiveBuckets), std::end(mSharedLiveBuckets),
std::inserter(referenced, std::end(referenced)),
[](std::pair<Hash, std::shared_ptr<LiveBucket>> const& p) {
Expand Down Expand Up @@ -818,11 +818,11 @@ BucketManager::cleanupStaleFiles()
}

void
BucketManager::forgetUnreferencedBuckets()
BucketManager::forgetUnreferencedBuckets(HistoryArchiveState const& has)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
auto blReferenced = getBucketListReferencedBuckets();

auto bucketMapLoop = [&](auto& bucketMap, auto& futureMap) {
Expand Down Expand Up @@ -867,7 +867,7 @@ BucketManager::forgetUnreferencedBuckets()
Bucket,
"BucketManager::forgetUnreferencedBuckets dropping {}",
filename);
if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC)
if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC)
{
CLOG_TRACE(Bucket, "removing bucket file: {}", filename);
std::filesystem::remove(filename);
Expand Down Expand Up @@ -1049,15 +1049,15 @@ BucketManager::maybeSetIndex(std::shared_ptr<BucketBase> b,

void
BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
uint32_t ledgerVers)
uint32_t ledgerVers,
SorobanNetworkConfig const& cfg)
{
releaseAssert(mSnapshotManager);
releaseAssert(!mEvictionFuture.valid());
releaseAssert(mEvictionStatistics);

auto searchableBL =
mSnapshotManager->copySearchableLiveBucketListSnapshot();
auto const& cfg = mApp.getLedgerManager().getSorobanNetworkConfigForApply();
auto const& sas = cfg.stateArchivalSettings();

using task_t = std::packaged_task<EvictionResultCandidates()>;
Expand All @@ -1078,31 +1078,27 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
}

EvictedStateVectors
BucketManager::resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys,
uint32_t ledgerVers)
BucketManager::resolveBackgroundEvictionScan(
AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys, uint32_t ledgerVers,
SorobanNetworkConfig const& networkConfig)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssert(mEvictionStatistics);

if (!mEvictionFuture.valid())
{
startBackgroundEvictionScan(ledgerSeq, ledgerVers);
startBackgroundEvictionScan(ledgerSeq, ledgerVers, networkConfig);
}

auto evictionCandidates = mEvictionFuture.get();

auto const& networkConfig =
mApp.getLedgerManager().getSorobanNetworkConfigForApply();

// If eviction related settings changed during the ledger, we have to
// restart the scan
if (!evictionCandidates.isValid(ledgerSeq,
networkConfig.stateArchivalSettings()))
{
startBackgroundEvictionScan(ledgerSeq, ledgerVers);
startBackgroundEvictionScan(ledgerSeq, ledgerVers, networkConfig);
evictionCandidates = mEvictionFuture.get();
}

Expand Down Expand Up @@ -1229,6 +1225,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion, bool restartMerges)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// TODO: Assume archival bucket state
Expand Down Expand Up @@ -1277,7 +1274,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
}
cleanupStaleFiles();
cleanupStaleFiles(has);
}

void
Expand Down Expand Up @@ -1378,7 +1375,7 @@ std::shared_ptr<LiveBucket>
BucketManager::mergeBuckets(HistoryArchiveState const& has)
{
ZoneScoped;

releaseAssert(threadIsMain());
std::map<LedgerKey, LedgerEntry> ledgerMap = loadCompleteLedgerState(has);
BucketMetadata meta;
MergeCounters mc;
Expand Down Expand Up @@ -1568,9 +1565,11 @@ BucketManager::visitLedgerEntries(
}

std::shared_ptr<BasicWork>
BucketManager::scheduleVerifyReferencedBucketsWork()
BucketManager::scheduleVerifyReferencedBucketsWork(
HistoryArchiveState const& has)
{
std::set<Hash> hashes = getAllReferencedBuckets();
releaseAssert(threadIsMain());
std::set<Hash> hashes = getAllReferencedBuckets(has);
std::vector<std::shared_ptr<BasicWork>> seq;
for (auto const& h : hashes)
{
Expand Down
22 changes: 16 additions & 6 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class BucketSnapshotManager;
class SearchableLiveBucketListSnapshot;
struct BucketEntryCounters;
enum class LedgerEntryTypeAndDurability : uint32_t;
class SorobanNetworkConfig;

struct HistoryArchiveState;

Expand Down Expand Up @@ -70,6 +71,11 @@ class BucketManager : NonMovableOrCopyable

static std::string const kLockFilename;

// NB: ideally, BucketManager should have no access to mApp, as it's too
// dangerous in the context of parallel application. BucketManager is quite
// bloated, with lots of legacy code, so to ensure safety, annotate all
// functions using mApp with `releaseAssert(threadIsMain())` and avoid
// accessing mApp in the background.
Application& mApp;
std::unique_ptr<LiveBucketList> mLiveBucketList;
std::unique_ptr<HotArchiveBucketList> mHotArchiveBucketList;
Expand Down Expand Up @@ -124,7 +130,7 @@ class BucketManager : NonMovableOrCopyable

std::atomic<bool> mIsShutdown{false};

void cleanupStaleFiles();
void cleanupStaleFiles(HistoryArchiveState const& has);
void deleteTmpDirAndUnlockBucketDir();
void deleteEntireBucketDir();

Expand Down Expand Up @@ -260,7 +266,7 @@ class BucketManager : NonMovableOrCopyable
// not immediately cause the buckets to delete themselves, if someone else
// is using them via a shared_ptr<>, but the BucketManager will no longer
// independently keep them alive.
void forgetUnreferencedBuckets();
void forgetUnreferencedBuckets(HistoryArchiveState const& has);

// Feed a new batch of entries to the bucket list. This interface expects to
// be given separate init (created) and live (updated) entry vectors. The
Expand Down Expand Up @@ -290,7 +296,8 @@ class BucketManager : NonMovableOrCopyable
// Scans BucketList for non-live entries to evict starting at the entry
// pointed to by EvictionIterator. Evicts until `maxEntriesToEvict` entries
// have been evicted or maxEvictionScanSize bytes have been scanned.
void startBackgroundEvictionScan(uint32_t ledgerSeq, uint32_t ledgerVers);
void startBackgroundEvictionScan(uint32_t ledgerSeq, uint32_t ledgerVers,
SorobanNetworkConfig const& cfg);

// Returns a pair of vectors representing entries evicted this ledger, where
// the first vector constains all deleted keys (TTL and temporary), and
Expand All @@ -300,7 +307,8 @@ class BucketManager : NonMovableOrCopyable
EvictedStateVectors
resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys,
uint32_t ledgerVers);
uint32_t ledgerVers,
SorobanNetworkConfig const& networkConfig);

medida::Meter& getBloomMissMeter() const;
medida::Meter& getBloomLookupMeter() const;
Expand All @@ -325,7 +333,8 @@ class BucketManager : NonMovableOrCopyable

// Return the set of buckets referenced by the BucketList, LCL HAS,
// and publish queue.
std::set<Hash> getAllReferencedBuckets() const;
std::set<Hash>
getAllReferencedBuckets(HistoryArchiveState const& has) const;

// Check for missing bucket files that would prevent `assumeState` from
// succeeding
Expand Down Expand Up @@ -382,7 +391,8 @@ class BucketManager : NonMovableOrCopyable

// Schedule a Work class that verifies the hashes of all referenced buckets
// on background threads.
std::shared_ptr<BasicWork> scheduleVerifyReferencedBucketsWork();
std::shared_ptr<BasicWork>
scheduleVerifyReferencedBucketsWork(HistoryArchiveState const& has);

Config const& getConfig() const;

Expand Down
3 changes: 0 additions & 3 deletions src/bucket/BucketSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label,
{
// For now, only keep metrics for the main thread. We can decide on what
// metrics make sense when more background services are added later.
releaseAssert(threadIsMain());

if (numEntries != 0)
{
Expand Down Expand Up @@ -153,8 +152,6 @@ BucketSnapshotManager::updateCurrentSnapshot(
SnapshotPtrT<LiveBucket>&& liveSnapshot,
SnapshotPtrT<HotArchiveBucket>&& hotArchiveSnapshot)
{
releaseAssert(threadIsMain());

auto updateSnapshot = [numHistoricalSnapshots = mNumHistoricalSnapshots](
auto& currentSnapshot, auto& historicalSnapshots,
auto&& newSnapshot) {
Expand Down
55 changes: 0 additions & 55 deletions src/bucket/PublishQueueBuckets.cpp

This file was deleted.

Loading
Loading