From 67e73060f8de1c8134799d9d0d6d171ed6ddf92b Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Mon, 6 Jan 2025 19:06:56 -0800 Subject: [PATCH] Some cleanup post-rebase, refresh comments --- src/bucket/BucketManager.cpp | 3 +- src/bucket/BucketManager.h | 3 +- src/bucket/LiveBucketList.h | 2 - src/bucket/test/BucketListTests.cpp | 2 +- src/bucket/test/BucketTestUtils.cpp | 6 +-- src/catchup/CatchupWork.cpp | 2 +- src/database/Database.h | 2 + src/herder/HerderImpl.cpp | 19 ++++++-- src/herder/HerderSCPDriver.cpp | 13 ++--- src/herder/TxSetFrame.cpp | 1 + .../BucketListIsConsistentWithDatabase.cpp | 1 - src/ledger/LedgerManagerImpl.cpp | 47 ++++++++++++------- src/ledger/LedgerManagerImpl.h | 23 +++++---- src/main/AppConnector.cpp | 11 ----- src/main/AppConnector.h | 1 - src/main/ApplicationImpl.cpp | 2 +- src/main/ApplicationUtils.cpp | 2 +- 17 files changed, 80 insertions(+), 60 deletions(-) diff --git a/src/bucket/BucketManager.cpp b/src/bucket/BucketManager.cpp index 209d828caa..9aecee8713 100644 --- a/src/bucket/BucketManager.cpp +++ b/src/bucket/BucketManager.cpp @@ -18,6 +18,7 @@ #include "ledger/LedgerManager.h" #include "ledger/LedgerTxn.h" #include "ledger/LedgerTypeUtils.h" +#include "ledger/NetworkConfig.h" #include "main/Application.h" #include "main/Config.h" #include "util/Fs.h" @@ -1080,7 +1081,7 @@ EvictedStateVectors BucketManager::resolveBackgroundEvictionScan( AbstractLedgerTxn& ltx, uint32_t ledgerSeq, LedgerKeySet const& modifiedKeys, uint32_t ledgerVers, - SorobanNetworkConfig& networkConfig) + SorobanNetworkConfig const& networkConfig) { ZoneScoped; releaseAssert(mEvictionStatistics); diff --git a/src/bucket/BucketManager.h b/src/bucket/BucketManager.h index d17517838f..32043b8870 100644 --- a/src/bucket/BucketManager.h +++ b/src/bucket/BucketManager.h @@ -36,6 +36,7 @@ class BucketSnapshotManager; class SearchableLiveBucketListSnapshot; struct BucketEntryCounters; enum class LedgerEntryTypeAndDurability : uint32_t; +class SorobanNetworkConfig; struct HistoryArchiveState; @@ -307,7 +308,7 @@ class BucketManager : NonMovableOrCopyable resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq, LedgerKeySet const& modifiedKeys, uint32_t ledgerVers, - SorobanNetworkConfig& networkConfig); + SorobanNetworkConfig const& networkConfig); medida::Meter& getBloomMissMeter() const; medida::Meter& getBloomLookupMeter() const; diff --git a/src/bucket/LiveBucketList.h b/src/bucket/LiveBucketList.h index 688f0acd22..c78f4bc22b 100644 --- a/src/bucket/LiveBucketList.h +++ b/src/bucket/LiveBucketList.h @@ -10,8 +10,6 @@ namespace stellar { -class SorobanNetworkConfig; - // The LiveBucketList stores the current canonical state of the ledger. It is // made up of LiveBucket buckets, which in turn store individual entries of type // BucketEntry. When an entry is "evicted" from the ledger, it is removed from diff --git a/src/bucket/test/BucketListTests.cpp b/src/bucket/test/BucketListTests.cpp index 5dfb6572f6..0a5b545097 100644 --- a/src/bucket/test/BucketListTests.cpp +++ b/src/bucket/test/BucketListTests.cpp @@ -869,7 +869,7 @@ TEST_CASE_VERSIONS("network config snapshots BucketList size", "[bucketlist]") LedgerManagerForBucketTests& lm = app->getLedgerManager(); auto& networkConfig = - app->getLedgerManager().getMutableSorobanNetworkConfig(); + app->getLedgerManager().getSorobanNetworkConfigReadOnly(); uint32_t windowSize = networkConfig.stateArchivalSettings() .bucketListSizeWindowSampleSize; diff --git a/src/bucket/test/BucketTestUtils.cpp b/src/bucket/test/BucketTestUtils.cpp index 34122c63d2..ea6d0f351f 100644 --- a/src/bucket/test/BucketTestUtils.cpp +++ b/src/bucket/test/BucketTestUtils.cpp @@ -101,10 +101,6 @@ closeLedger(Application& app, std::optional skToSignValue, app.getHerder().externalizeValue(TxSetXDRFrame::makeEmpty(lcl), ledgerNum, lcl.header.scpValue.closeTime, upgrades, skToSignValue); - testutil::crankUntil( - app, - [&lm, ledgerNum]() { return lm.getLastClosedLedgerNum() == ledgerNum; }, - std::chrono::seconds(10)); return lm.getLastClosedLedgerHeader().hash; } @@ -238,7 +234,7 @@ LedgerManagerForBucketTests::transferLedgerEntriesToBucketList( mApp.getBucketManager().resolveBackgroundEvictionScan( ltxEvictions, lh.ledgerSeq, keys, initialLedgerVers, mApp.getLedgerManager() - .getMutableSorobanNetworkConfig()); + .getSorobanNetworkConfigForApply()); if (protocolVersionStartsFrom( initialLedgerVers, diff --git a/src/catchup/CatchupWork.cpp b/src/catchup/CatchupWork.cpp index 89fc839791..dc46ae37c6 100644 --- a/src/catchup/CatchupWork.cpp +++ b/src/catchup/CatchupWork.cpp @@ -524,7 +524,7 @@ CatchupWork::runCatchupStep() // In this case we should actually have been caught-up during // the replay process and, if judged successful, our LCL should // be the one provided as well. - auto lastClosed = + auto& lastClosed = mApp.getLedgerManager().getLastClosedLedgerHeader(); releaseAssert(mLastApplied.hash == lastClosed.hash); releaseAssert(mLastApplied.header == lastClosed.header); diff --git a/src/database/Database.h b/src/database/Database.h index e58391b7a6..f96062a418 100644 --- a/src/database/Database.h +++ b/src/database/Database.h @@ -141,6 +141,8 @@ class Database : NonMovableOrCopyable // Save `vers` as schema version. void putSchemaVersion(unsigned long vers); + // Prepared statements cache may be accessed by mutliple threads (each using + // a different session), so use a mutex to synchronize access. std::mutex mutable mStatementsMutex; public: diff --git a/src/herder/HerderImpl.cpp b/src/herder/HerderImpl.cpp index a3ab30b28b..61ccf80497 100644 --- a/src/herder/HerderImpl.cpp +++ b/src/herder/HerderImpl.cpp @@ -480,6 +480,17 @@ HerderImpl::valueExternalized(uint64 slotIndex, StellarValue const& value, // Check to see if quorums have changed and we need to reanalyze. checkAndMaybeReanalyzeQuorumMap(); + + // heart beat *after* doing all the work (ensures that we do not include + // the overhead of externalization in the way we track SCP) + // Note: this only makes sense in the context of synchronous ledger + // application on the main thread. + if (!mApp.getConfig().parallelLedgerClose()) + { + // heart beat *after* doing all the work (ensures that we do not + // include the overhead of externalization in the way we track SCP) + trackingHeartBeat(); + } } else { @@ -1156,7 +1167,7 @@ HerderImpl::lastClosedLedgerIncreased(bool latest, TxSetXDRFrameConstPtr txSet) { // Re-start heartbeat tracking _after_ applying the most up-to-date // ledger. This guarantees out-of-sync timer won't fire while we have - // ledgers to apply. + // ledgers to apply (applicable during parallel ledger close). trackingHeartBeat(); // Ensure out of sync recovery did not get triggered while we were @@ -1369,6 +1380,8 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger, // If applying, the next ledger will trigger voting if (mLedgerManager.isApplying()) { + // This can only happen when closing ledgers in parallel + releaseAssert(mApp.getConfig().parallelLedgerClose()); CLOG_DEBUG(Herder, "triggerNextLedger: skipping (applying) : {}", mApp.getStateHuman()); return; @@ -1560,7 +1573,7 @@ HerderImpl::getUpgradesJson() void HerderImpl::forceSCPStateIntoSyncWithLastClosedLedger() { - auto header = mLedgerManager.getLastClosedLedgerHeader().header; + auto const& header = mLedgerManager.getLastClosedLedgerHeader().header; setTrackingSCPState(header.ledgerSeq, header.scpValue, /* isTrackingNetwork */ true); } @@ -2360,7 +2373,7 @@ HerderImpl::herderOutOfSync() // are no ledgers queued to be applied. If there are ledgers queued, it's // possible the rest of the network is waiting for this node to vote. In // this case we should _still_ remain in tracking and emit nomination; If - // the nodes does not hear anything from the network after that, then node + // the node does not hear anything from the network after that, then node // can go into out of sync recovery. releaseAssert(threadIsMain()); releaseAssert(!mLedgerManager.isApplying()); diff --git a/src/herder/HerderSCPDriver.cpp b/src/herder/HerderSCPDriver.cpp index 13f5bde947..89e3fddd14 100644 --- a/src/herder/HerderSCPDriver.cpp +++ b/src/herder/HerderSCPDriver.cpp @@ -221,15 +221,15 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b, } } - auto lhhe = mLedgerManager.getLastClosedLedgerHeader(); + auto const& lcl = mLedgerManager.getLastClosedLedgerHeader(); // when checking close time, start with what we have locally - lastCloseTime = lhhe.header.scpValue.closeTime; + lastCloseTime = lcl.header.scpValue.closeTime; // if this value is not for our local state, // perform as many checks as we can - if (slotIndex != (lhhe.header.ledgerSeq + 1)) + if (slotIndex != (lcl.header.ledgerSeq + 1)) { - if (slotIndex == lhhe.header.ledgerSeq) + if (slotIndex == lcl.header.ledgerSeq) { // previous ledger if (b.closeTime != lastCloseTime) @@ -240,7 +240,7 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b, return SCPDriver::kInvalidValue; } } - else if (slotIndex < lhhe.header.ledgerSeq) + else if (slotIndex < lcl.header.ledgerSeq) { // basic sanity check on older value if (b.closeTime >= lastCloseTime) @@ -323,7 +323,7 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b, res = SCPDriver::kInvalidValue; } - else if (!checkAndCacheTxSetValid(*txSet, lhhe, closeTimeOffset)) + else if (!checkAndCacheTxSetValid(*txSet, lcl, closeTimeOffset)) { CLOG_DEBUG(Herder, "HerderSCPDriver::validateValue i: {} invalid txSet {}", @@ -614,6 +614,7 @@ HerderSCPDriver::combineCandidates(uint64_t slotIndex, std::set aggSet; releaseAssert(!mLedgerManager.isApplying()); + releaseAssert(threadIsMain()); auto const& lcl = mLedgerManager.getLastClosedLedgerHeader(); Hash candidatesHash; diff --git a/src/herder/TxSetFrame.cpp b/src/herder/TxSetFrame.cpp index dce3cef463..72b58598bc 100644 --- a/src/herder/TxSetFrame.cpp +++ b/src/herder/TxSetFrame.cpp @@ -233,6 +233,7 @@ phaseTxsAreValid(TxSetTransactions const& phase, Application& app, uint64_t upperBoundCloseTimeOffset) { ZoneScoped; + releaseAssert(threadIsMain()); // This is done so minSeqLedgerGap is validated against the next // ledgerSeq, which is what will be used at apply time diff --git a/src/invariant/BucketListIsConsistentWithDatabase.cpp b/src/invariant/BucketListIsConsistentWithDatabase.cpp index 529077e721..d16a9bcdf3 100644 --- a/src/invariant/BucketListIsConsistentWithDatabase.cpp +++ b/src/invariant/BucketListIsConsistentWithDatabase.cpp @@ -7,7 +7,6 @@ #include "bucket/BucketManager.h" #include "bucket/LiveBucket.h" #include "bucket/LiveBucketList.h" -#include "crypto/Hex.h" #include "database/Database.h" #include "history/HistoryArchive.h" #include "invariant/InvariantManager.h" diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index f61f0f8207..7e7a691cf8 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -252,7 +252,7 @@ LedgerManagerImpl::startNewLedger(LedgerHeader const& genesisLedger) CLOG_INFO(Ledger, "Root account seed: {}", skey.getStrKeySeed().value); auto output = ledgerClosed(ltx, /*ledgerCloseMeta*/ nullptr, /*initialLedgerVers*/ 0); - updateCurrentLedgerState(output); + advanceLedgerPointers(output); ltx.commit(); } @@ -385,8 +385,8 @@ LedgerManagerImpl::loadLastKnownLedger(bool restoreBucketlist) } // Step 4. Restore LedgerManager's internal state - auto output = advanceLedgerPointers(*latestLedgerHeader, has); - updateCurrentLedgerState(output); + auto output = advanceLedgerStateSnapshot(*latestLedgerHeader, has); + advanceLedgerPointers(output); // Maybe truncate checkpoint files if we're restarting after a crash // in closeLedger (in which case any modifications to the ledger state have @@ -766,7 +766,10 @@ LedgerManagerImpl::ledgerCloseComplete(uint32_t lcl, bool calledViaExternalize, releaseAssert(latestQueuedToApply <= latestHeardFromNetwork); } - if (lcl == latestQueuedToApply) + // Without parallel ledger close, this should always be true + bool doneApplying = lcl == latestQueuedToApply; + releaseAssert(doneApplying || mApp.getConfig().parallelLedgerClose()); + if (doneApplying) { mCurrentlyApplyingLedger = false; } @@ -816,8 +819,20 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, LedgerTxn ltx(mApp.getLedgerTxnRoot()); auto header = ltx.loadHeader(); + // Note: closeLedger should be able to work correctly based on ledger header + // stored in LedgerTxn. The issue is that in tests LedgerTxn is somtimes + // modified manually, which changes ledger header hash compared to the + // cached one and causes tests to fail. auto prevHeader = - threadIsMain() ? getLastClosedLedgerHeader().header : header.current(); +#ifdef BUILD_TESTS + if (app.getConfig().MODE_USES_IN_MEMORY_LEDGER) + { + releaseAssert(threadIsMain()); + getLastClosedLedgerHeader().header; + } + else +#endif + header.current(); auto prevHash = xdrSha256(prevHeader); auto initialLedgerVers = header.current().ledgerVersion; @@ -830,7 +845,8 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, auto now = mApp.getClock().now(); mLedgerAgeClosed.Update(now - mLastClose); - // mLastClose is only accessed by a single thread + // mLastClose is only accessed by a single thread, so no synchronization + // needed mLastClose = now; mLedgerAge.set_count(0); @@ -1016,7 +1032,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, emitNextMeta(); } - // The next 5 steps happen in a relatively non-obvious, subtle order. + // The next 7 steps happen in a relatively non-obvious, subtle order. // This is unfortunate and it would be nice if we could make it not // be so subtle, but for the time being this is where we are. // @@ -1075,7 +1091,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData, [this, txs, ledgerSeq, calledViaExternalize, ledgerData, ledgerOutput = std::move(closeLedgerResult)]() mutable { releaseAssert(threadIsMain()); - updateCurrentLedgerState(ledgerOutput); + advanceLedgerPointers(ledgerOutput); // Step 5. Maybe kick off publishing on complete checkpoint files auto& hm = mApp.getHistoryManager(); @@ -1144,7 +1160,7 @@ LedgerManagerImpl::setLastClosedLedger( ltx.commit(); mRebuildInMemoryState = false; - updateCurrentLedgerState(advanceLedgerPointers(lastClosed.header, has)); + advanceLedgerPointers(advanceLedgerStateSnapshot(lastClosed.header, has)); LedgerTxn ltx2(mApp.getLedgerTxnRoot()); if (protocolVersionStartsFrom(ltx2.loadHeader().current().ledgerVersion, @@ -1167,8 +1183,8 @@ LedgerManagerImpl::manuallyAdvanceLedgerHeader(LedgerHeader const& header) has.fromString(mApp.getPersistentState().getState( PersistentState::kHistoryArchiveState, mApp.getDatabase().getSession())); - auto output = advanceLedgerPointers(header, has, false); - updateCurrentLedgerState(output); + auto output = advanceLedgerStateSnapshot(header, has); + advanceLedgerPointers(output); } void @@ -1315,7 +1331,7 @@ LedgerManagerImpl::getCurrentLedgerStateSnaphot() } void -LedgerManagerImpl::updateCurrentLedgerState(CloseLedgerOutput const& output) +LedgerManagerImpl::advanceLedgerPointers(CloseLedgerOutput const& output) { releaseAssert(threadIsMain()); CLOG_DEBUG( @@ -1330,9 +1346,8 @@ LedgerManagerImpl::updateCurrentLedgerState(CloseLedgerOutput const& output) } LedgerManagerImpl::CloseLedgerOutput -LedgerManagerImpl::advanceLedgerPointers(LedgerHeader const& header, - HistoryArchiveState const& has, - bool debugLog) +LedgerManagerImpl::advanceLedgerStateSnapshot(LedgerHeader const& header, + HistoryArchiveState const& has) { auto ledgerHash = xdrSha256(header); @@ -1839,7 +1854,7 @@ LedgerManagerImpl::ledgerClosed( mApp.getBucketManager().snapshotLedger(lh); auto has = storeCurrentLedger(lh, /* storeHeader */ true, /* appendToCheckpoint */ true); - res = advanceLedgerPointers(lh, has); + res = advanceLedgerStateSnapshot(lh, has); }); return res; diff --git a/src/ledger/LedgerManagerImpl.h b/src/ledger/LedgerManagerImpl.h index 20eaae711f..6f50381c6e 100644 --- a/src/ledger/LedgerManagerImpl.h +++ b/src/ledger/LedgerManagerImpl.h @@ -57,8 +57,7 @@ class LedgerManagerImpl : public LedgerManager std::filesystem::path mMetaDebugPath; private: - // Cache LCL state, updates once a ledger (synchronized with - // mLedgerStateMutex) + // Cache LCL state, accessible only from main thread LedgerHeaderHistoryEntry mLastClosedLedger; // Read-only Soroban network configuration, accessible by main thread only. @@ -74,6 +73,8 @@ class LedgerManagerImpl : public LedgerManager // variable is not synchronized, since it should only be used by one thread // (main or ledger close). std::shared_ptr mSorobanNetworkConfigForApply; + + // Cache most recent HAS, accessible only from main thread HistoryArchiveState mLastClosedLedgerHAS; SorobanMetrics mSorobanMetrics; @@ -94,13 +95,15 @@ class LedgerManagerImpl : public LedgerManager bool mRebuildInMemoryState{false}; SearchableSnapshotConstPtr mReadOnlyLedgerStateSnapshot; - // Use mutex to guard read access to LCL and Soroban network config + // Use mutex to guard ledger state during apply mutable std::recursive_mutex mLedgerStateMutex; medida::Timer& mCatchupDuration; std::unique_ptr mNextMetaToEmit; + // Use in the context of parallel ledger close to indicate background thread + // is currently closing a ledger or has ledgers queued to apply. bool mCurrentlyApplyingLedger{false}; static std::vector processFeesSeqNums( @@ -150,7 +153,8 @@ class LedgerManagerImpl : public LedgerManager // as the actual ledger usage. void publishSorobanMetrics(); - void updateCurrentLedgerState(CloseLedgerOutput const& output); + // Update cached ledger state values managed by this class. + void advanceLedgerPointers(CloseLedgerOutput const& output); protected: // initialLedgerVers must be the ledger version at the start of the ledger @@ -166,11 +170,12 @@ class LedgerManagerImpl : public LedgerManager std::unique_ptr const& ledgerCloseMeta, LedgerHeader lh, uint32_t initialLedgerVers); - // Update in-memory cached LCL state (this only happens at the end of ledger - // close) - CloseLedgerOutput advanceLedgerPointers(LedgerHeader const& header, - HistoryArchiveState const& has, - bool debugLog = true); + // Update ledger state snapshot, and construct CloseLedgerOutput return + // value, which contains all information relevant to ledger state (HAS, + // ledger header, network config, bucketlist snapshot). + CloseLedgerOutput + advanceLedgerStateSnapshot(LedgerHeader const& header, + HistoryArchiveState const& has); void logTxApplyMetrics(AbstractLedgerTxn& ltx, size_t numTxs, size_t numOps); diff --git a/src/main/AppConnector.cpp b/src/main/AppConnector.cpp index 904d6396f1..49b24d31c7 100644 --- a/src/main/AppConnector.cpp +++ b/src/main/AppConnector.cpp @@ -56,7 +56,6 @@ AppConnector::getSorobanNetworkConfigReadOnly() const SorobanNetworkConfig const& AppConnector::getSorobanNetworkConfigForApply() const { - // releaseAssert(!threadIsMain() || !mConfig.parallelLedgerClose()); return mApp.getLedgerManager().getSorobanNetworkConfigForApply(); } @@ -77,8 +76,6 @@ AppConnector::checkOnOperationApply(Operation const& operation, OperationResult const& opres, LedgerTxnDelta const& ltxDelta) { - // Only one thread can call this method - releaseAssert(threadIsMain() || mConfig.parallelLedgerClose()); mApp.getInvariantManager().checkOnOperationApply(operation, opres, ltxDelta); } @@ -142,12 +139,4 @@ AppConnector::checkScheduledAndCache( { return mApp.getOverlayManager().checkScheduledAndCache(msgTracker); } - -LedgerHeaderHistoryEntry -AppConnector::getLastClosedLedgerHeader() const -{ - // LCL is thread-safe (it's a copy) - return mApp.getLedgerManager().getLastClosedLedgerHeader(); -} - } \ No newline at end of file diff --git a/src/main/AppConnector.h b/src/main/AppConnector.h index 4f2c565982..9cdeb9be30 100644 --- a/src/main/AppConnector.h +++ b/src/main/AppConnector.h @@ -57,6 +57,5 @@ class AppConnector SorobanNetworkConfig const& getSorobanNetworkConfigForApply() const; medida::MetricsRegistry& getMetrics() const; - LedgerHeaderHistoryEntry getLastClosedLedgerHeader() const; }; } \ No newline at end of file diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 022f09785f..c39db76013 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -591,7 +591,7 @@ ApplicationImpl::~ApplicationImpl() try { // First, shutdown ledger close queue _before_ shutting down all the - // subsystems This ensures that any ledger currently being closed + // subsystems. This ensures that any ledger currently being closed // finishes okay shutdownLedgerCloseThread(); shutdownWorkScheduler(); diff --git a/src/main/ApplicationUtils.cpp b/src/main/ApplicationUtils.cpp index 81c0cacc13..54476fe79a 100644 --- a/src/main/ApplicationUtils.cpp +++ b/src/main/ApplicationUtils.cpp @@ -271,7 +271,7 @@ setAuthenticatedLedgerHashPair(Application::pointer app, if (lm.isSynced()) { - auto lhe = lm.getLastClosedLedgerHeader(); + auto const& lhe = lm.getLastClosedLedgerHeader(); tryCheckpoint(lhe.header.ledgerSeq, lhe.hash); } else