Skip to content

Commit

Permalink
Parallel ledger close (#4543)
Browse files Browse the repository at this point in the history
Resolves #4317
Concludes #4128

The implementation of this proposal requires massive changes to the
stellar-core codebase, and touches almost every subsystem. There are
some paradigm shifts in how the program executes, that I will discuss
below for posterity. The same ideas are reflected in code comments as
well, as it’ll be important for code maintenance and extensibility

## Database access
Currently, only Postgres DB backend is supported, as it required minimal
changes to how DB queries are structured (Postgres provides a fairly
nice concurrency model).

SQLite concurrency support is a lot more rudimentary, with only a single
writer allowed, and the whole database is locked during writing. This
necessitates further changes in core (such as splitting the database
into two). Given that most network infrastructure is on Postgres right
now, SQLite support can be added later.

### Reduced responsibilities of SQL

SQL tables have been trimmed as much as possible to avoid conflicts,
essentially we only store persistent state such as the latest LCL and
SCP history, as well as legacy OFFER table.

## Asynchronous externalize flow
There are three important subsystems in core that are in charge of
tracking consensus, externalizing and applying ledgers, and advancing
the state machine to catchup or synced state:

- Herder: receives SCP messages, forwards them to SCP, decides if a
ledger is externalized, triggers voting for the next ledger
- LedgerManager: implements closing of a ledger, sets catchup vs synced
state, advances and persists last closed ledger.
- CatchupManager: Keep track of any externalized ledgers that are not
LCL+1. That is, keep track of future externalizing ledgers, attempt
applying them to keep core in sync, and trigger catchup if needed.

Prior to this change, the externalize flow had two different flows:

- If core received LCL+1, it would immediately apply it. Which means the
flow externalize → closeLedger → set “synced” state happened in one
synchronous function. After application, core triggers the next ledger,
usually asynchronously, as it needs to wait to meet the 5s ledger
requirement.
- If core received ledger LCL+2..LCL+N it would asynchronously buffer
it, and continue buffering new ledgers. If core can’t close the gap and
apply everything sequentially, it would go into catchup flow.

With the new changes, the triggering ledger close flow moved to
CatchupManager completely. Essentially, CatchupManager::processLedger
became a centralized place to decide whether to apply a ledger, or
trigger catchup. Because ledger close happens in the background, the
transition between externalize and “closeLedger→set synced” becomes
asynchronous.

## Concurrent ledger close
List of core items that moved to the background followed by explanation
why it is safe to do so:
### Emitting meta
Ledger application is the only process that touches the meta pipe, no
conflicts with other subsystems
### Writing checkpoint files
Only the background thread writes in-progress checkpoint files. Main
thread deals exclusively with “complete” checkpoints, which after
completion must not be touched by any subsystem except publishing.
### Updating ledger state
The rest of the system operates strictly on read-only BucketList
snapshots, and is unaffected by changing state. Note: there are some
calls to LedgerTxn in the codebase still, but those only appear on
startup during setup (when node is not operational) or in offline
commands.
### Incrementing current LCL
Because ledger close moved to the background, guarantees about ledger
state and its staleness are now different. Previously, ledger state
queried by subsystems outside of apply was always up-to-date. With this
change, it is possible the snapshot used by main thread may become
slightly stale (if background just closed a new ledger, but main thread
hasn't refreshed its snapshot yet). There are different use cases of
main thread's ledger state, which must be treated with caution and
evaluated individually:
- When it is safe: in cases, where LCL is used more like a heuristic or
an approximation. Program correctness does not depend on the exact state
of LCL. Example: post-externalize cleanup of transaction queue. We load
LCL’s close time to purge invalid transactions from the queue. This is
safe because if LCL has been updated while we call this, the queue is
still in a consistent state. In fact, anything in the transaction queue
is essentially an approximation, so a slightly stale snapshot should be
safe to use.
- When it is not safe: when LCL is needed in places where the latest
ledger state is critical, like voting in SCP, validating blocks, etc. To
avoid any unnecessary headaches, we introduce a new invariant:
“applying” is a new state in the state machine, which does not allow
voting and triggering next ledgers. Core must first complete applying to
be able to vote on the “latest state”. In the meantime, if ledgers
arrive while applying, we treat them like “future ledgers” and apply the
same procedures in herder that we do today (don’t perform validation
checks, don’t vote on them, and buffer them in a separate queue). The
state machine remains on the main thread _only_, which ensures SCP can
safely execute as long as the state transitions are correct (for
example, executing a block production function can safely grab the LCL
at the beginning of the function without worrying that it might change
in the background).

### Reflecting state change in the bucketlist
Close ledger is the only place in the code that updates the BucketList.
Other subsystems may only read it. Example is garbage collection, which
queries the latest BucketList state to decide which buckets to delete.
These are protected with a mutex (the same LCL mutex used in LM, as
bucketlist is conceptually a part of LCL as well).
  • Loading branch information
marta-lokhova authored Jan 8, 2025
2 parents d89edf1 + e417314 commit c669c8f
Show file tree
Hide file tree
Showing 108 changed files with 2,039 additions and 1,371 deletions.
2 changes: 0 additions & 2 deletions src/bucket/BucketListBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ template <typename BucketT>
void
BucketLevel<BucketT>::setNext(FutureBucket<BucketT> const& fb)
{
releaseAssert(threadIsMain());
mNextCurr = fb;
}

Expand All @@ -79,7 +78,6 @@ template <typename BucketT>
void
BucketLevel<BucketT>::setCurr(std::shared_ptr<BucketT> b)
{
releaseAssert(threadIsMain());
mNextCurr.clear();
mCurr = b;
}
Expand Down
2 changes: 0 additions & 2 deletions src/bucket/BucketListSnapshotBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ BucketListSnapshot<BucketT>::BucketListSnapshot(
BucketListBase<BucketT> const& bl, LedgerHeader header)
: mHeader(std::move(header))
{
releaseAssert(threadIsMain());

for (uint32_t i = 0; i < BucketListBase<BucketT>::kNumLevels; ++i)
{
auto const& level = bl.getLevel(i);
Expand Down
77 changes: 38 additions & 39 deletions src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
#include "ledger/LedgerTypeUtils.h"
#include "ledger/NetworkConfig.h"
#include "main/Application.h"
#include "main/Config.h"
#include "util/Fs.h"
Expand Down Expand Up @@ -62,6 +63,7 @@ void
BucketManager::initialize()
{
ZoneScoped;
releaseAssert(threadIsMain());
std::string d = mConfig.BUCKET_DIR_PATH;

if (!fs::exists(d))
Expand Down Expand Up @@ -729,7 +731,7 @@ BucketManager::getBucketListReferencedBuckets() const
}

std::set<Hash>
BucketManager::getAllReferencedBuckets() const
BucketManager::getAllReferencedBuckets(HistoryArchiveState const& has) const
{
ZoneScoped;
auto referenced = getBucketListReferencedBuckets();
Expand All @@ -740,8 +742,7 @@ BucketManager::getAllReferencedBuckets() const

// retain any bucket referenced by the last closed ledger as recorded in the
// database (as merges complete, the bucket list drifts from that state)
auto lclHas = mApp.getLedgerManager().getLastClosedLedgerHAS();
auto lclBuckets = lclHas.allBuckets();
auto lclBuckets = has.allBuckets();
for (auto const& h : lclBuckets)
{
auto rit = referenced.emplace(hexToBin256(h));
Expand All @@ -752,39 +753,38 @@ BucketManager::getAllReferencedBuckets() const
}

// retain buckets that are referenced by a state in the publish queue.
auto pub = mApp.getHistoryManager().getBucketsReferencedByPublishQueue();
for (auto const& h :
HistoryManager::getBucketsReferencedByPublishQueue(mApp.getConfig()))
{
for (auto const& h : pub)
auto rhash = hexToBin256(h);
auto rit = referenced.emplace(rhash);
if (rit.second)
{
auto rhash = hexToBin256(h);
auto rit = referenced.emplace(rhash);
if (rit.second)
{
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);

// Project referenced bucket `rhash` -- which might be a merge
// input captured before a merge finished -- through our weak
// map of merge input/output relationships, to find any outputs
// we'll want to retain in order to resynthesize the merge in
// the future, rather than re-run it.
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
}
CLOG_TRACE(Bucket, "{} referenced by publish queue", h);

// Project referenced bucket `rhash` -- which might be a merge
// input captured before a merge finished -- through our weak
// map of merge input/output relationships, to find any outputs
// we'll want to retain in order to resynthesize the merge in
// the future, rather than re-run it.
mFinishedMerges.getOutputsUsingInput(rhash, referenced);
}
}
return referenced;
}

void
BucketManager::cleanupStaleFiles()
BucketManager::cleanupStaleFiles(HistoryArchiveState const& has)
{
ZoneScoped;
releaseAssert(threadIsMain());
if (mConfig.DISABLE_BUCKET_GC)
{
return;
}

std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
std::transform(std::begin(mSharedLiveBuckets), std::end(mSharedLiveBuckets),
std::inserter(referenced, std::end(referenced)),
[](std::pair<Hash, std::shared_ptr<LiveBucket>> const& p) {
Expand Down Expand Up @@ -818,11 +818,11 @@ BucketManager::cleanupStaleFiles()
}

void
BucketManager::forgetUnreferencedBuckets()
BucketManager::forgetUnreferencedBuckets(HistoryArchiveState const& has)
{
ZoneScoped;
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
auto referenced = getAllReferencedBuckets();
auto referenced = getAllReferencedBuckets(has);
auto blReferenced = getBucketListReferencedBuckets();

auto bucketMapLoop = [&](auto& bucketMap, auto& futureMap) {
Expand Down Expand Up @@ -867,7 +867,7 @@ BucketManager::forgetUnreferencedBuckets()
Bucket,
"BucketManager::forgetUnreferencedBuckets dropping {}",
filename);
if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC)
if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC)
{
CLOG_TRACE(Bucket, "removing bucket file: {}", filename);
std::filesystem::remove(filename);
Expand Down Expand Up @@ -1049,15 +1049,15 @@ BucketManager::maybeSetIndex(std::shared_ptr<BucketBase> b,

void
BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
uint32_t ledgerVers)
uint32_t ledgerVers,
SorobanNetworkConfig const& cfg)
{
releaseAssert(mSnapshotManager);
releaseAssert(!mEvictionFuture.valid());
releaseAssert(mEvictionStatistics);

auto searchableBL =
mSnapshotManager->copySearchableLiveBucketListSnapshot();
auto const& cfg = mApp.getLedgerManager().getSorobanNetworkConfigForApply();
auto const& sas = cfg.stateArchivalSettings();

using task_t = std::packaged_task<EvictionResultCandidates()>;
Expand All @@ -1078,31 +1078,27 @@ BucketManager::startBackgroundEvictionScan(uint32_t ledgerSeq,
}

EvictedStateVectors
BucketManager::resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx,
uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys,
uint32_t ledgerVers)
BucketManager::resolveBackgroundEvictionScan(
AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys, uint32_t ledgerVers,
SorobanNetworkConfig const& networkConfig)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssert(mEvictionStatistics);

if (!mEvictionFuture.valid())
{
startBackgroundEvictionScan(ledgerSeq, ledgerVers);
startBackgroundEvictionScan(ledgerSeq, ledgerVers, networkConfig);
}

auto evictionCandidates = mEvictionFuture.get();

auto const& networkConfig =
mApp.getLedgerManager().getSorobanNetworkConfigForApply();

// If eviction related settings changed during the ledger, we have to
// restart the scan
if (!evictionCandidates.isValid(ledgerSeq,
networkConfig.stateArchivalSettings()))
{
startBackgroundEvictionScan(ledgerSeq, ledgerVers);
startBackgroundEvictionScan(ledgerSeq, ledgerVers, networkConfig);
evictionCandidates = mEvictionFuture.get();
}

Expand Down Expand Up @@ -1229,6 +1225,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion, bool restartMerges)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// TODO: Assume archival bucket state
Expand Down Expand Up @@ -1277,7 +1274,7 @@ BucketManager::assumeState(HistoryArchiveState const& has,
mLiveBucketList->restartMerges(mApp, maxProtocolVersion,
has.currentLedger);
}
cleanupStaleFiles();
cleanupStaleFiles(has);
}

void
Expand Down Expand Up @@ -1378,7 +1375,7 @@ std::shared_ptr<LiveBucket>
BucketManager::mergeBuckets(HistoryArchiveState const& has)
{
ZoneScoped;

releaseAssert(threadIsMain());
std::map<LedgerKey, LedgerEntry> ledgerMap = loadCompleteLedgerState(has);
BucketMetadata meta;
MergeCounters mc;
Expand Down Expand Up @@ -1568,9 +1565,11 @@ BucketManager::visitLedgerEntries(
}

std::shared_ptr<BasicWork>
BucketManager::scheduleVerifyReferencedBucketsWork()
BucketManager::scheduleVerifyReferencedBucketsWork(
HistoryArchiveState const& has)
{
std::set<Hash> hashes = getAllReferencedBuckets();
releaseAssert(threadIsMain());
std::set<Hash> hashes = getAllReferencedBuckets(has);
std::vector<std::shared_ptr<BasicWork>> seq;
for (auto const& h : hashes)
{
Expand Down
22 changes: 16 additions & 6 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class BucketSnapshotManager;
class SearchableLiveBucketListSnapshot;
struct BucketEntryCounters;
enum class LedgerEntryTypeAndDurability : uint32_t;
class SorobanNetworkConfig;

struct HistoryArchiveState;

Expand Down Expand Up @@ -70,6 +71,11 @@ class BucketManager : NonMovableOrCopyable

static std::string const kLockFilename;

// NB: ideally, BucketManager should have no access to mApp, as it's too
// dangerous in the context of parallel application. BucketManager is quite
// bloated, with lots of legacy code, so to ensure safety, annotate all
// functions using mApp with `releaseAssert(threadIsMain())` and avoid
// accessing mApp in the background.
Application& mApp;
std::unique_ptr<LiveBucketList> mLiveBucketList;
std::unique_ptr<HotArchiveBucketList> mHotArchiveBucketList;
Expand Down Expand Up @@ -124,7 +130,7 @@ class BucketManager : NonMovableOrCopyable

std::atomic<bool> mIsShutdown{false};

void cleanupStaleFiles();
void cleanupStaleFiles(HistoryArchiveState const& has);
void deleteTmpDirAndUnlockBucketDir();
void deleteEntireBucketDir();

Expand Down Expand Up @@ -260,7 +266,7 @@ class BucketManager : NonMovableOrCopyable
// not immediately cause the buckets to delete themselves, if someone else
// is using them via a shared_ptr<>, but the BucketManager will no longer
// independently keep them alive.
void forgetUnreferencedBuckets();
void forgetUnreferencedBuckets(HistoryArchiveState const& has);

// Feed a new batch of entries to the bucket list. This interface expects to
// be given separate init (created) and live (updated) entry vectors. The
Expand Down Expand Up @@ -290,7 +296,8 @@ class BucketManager : NonMovableOrCopyable
// Scans BucketList for non-live entries to evict starting at the entry
// pointed to by EvictionIterator. Evicts until `maxEntriesToEvict` entries
// have been evicted or maxEvictionScanSize bytes have been scanned.
void startBackgroundEvictionScan(uint32_t ledgerSeq, uint32_t ledgerVers);
void startBackgroundEvictionScan(uint32_t ledgerSeq, uint32_t ledgerVers,
SorobanNetworkConfig const& cfg);

// Returns a pair of vectors representing entries evicted this ledger, where
// the first vector constains all deleted keys (TTL and temporary), and
Expand All @@ -300,7 +307,8 @@ class BucketManager : NonMovableOrCopyable
EvictedStateVectors
resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys,
uint32_t ledgerVers);
uint32_t ledgerVers,
SorobanNetworkConfig const& networkConfig);

medida::Meter& getBloomMissMeter() const;
medida::Meter& getBloomLookupMeter() const;
Expand All @@ -325,7 +333,8 @@ class BucketManager : NonMovableOrCopyable

// Return the set of buckets referenced by the BucketList, LCL HAS,
// and publish queue.
std::set<Hash> getAllReferencedBuckets() const;
std::set<Hash>
getAllReferencedBuckets(HistoryArchiveState const& has) const;

// Check for missing bucket files that would prevent `assumeState` from
// succeeding
Expand Down Expand Up @@ -382,7 +391,8 @@ class BucketManager : NonMovableOrCopyable

// Schedule a Work class that verifies the hashes of all referenced buckets
// on background threads.
std::shared_ptr<BasicWork> scheduleVerifyReferencedBucketsWork();
std::shared_ptr<BasicWork>
scheduleVerifyReferencedBucketsWork(HistoryArchiveState const& has);

Config const& getConfig() const;

Expand Down
3 changes: 0 additions & 3 deletions src/bucket/BucketSnapshotManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ BucketSnapshotManager::recordBulkLoadMetrics(std::string const& label,
{
// For now, only keep metrics for the main thread. We can decide on what
// metrics make sense when more background services are added later.
releaseAssert(threadIsMain());

if (numEntries != 0)
{
Expand Down Expand Up @@ -153,8 +152,6 @@ BucketSnapshotManager::updateCurrentSnapshot(
SnapshotPtrT<LiveBucket>&& liveSnapshot,
SnapshotPtrT<HotArchiveBucket>&& hotArchiveSnapshot)
{
releaseAssert(threadIsMain());

auto updateSnapshot = [numHistoricalSnapshots = mNumHistoricalSnapshots](
auto& currentSnapshot, auto& historicalSnapshots,
auto&& newSnapshot) {
Expand Down
55 changes: 0 additions & 55 deletions src/bucket/PublishQueueBuckets.cpp

This file was deleted.

Loading

0 comments on commit c669c8f

Please sign in to comment.