Skip to content

Commit

Permalink
Some cleanup post-rebase, refresh comments
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Jan 7, 2025
1 parent ffc54e4 commit 67e7306
Show file tree
Hide file tree
Showing 17 changed files with 80 additions and 60 deletions.
3 changes: 2 additions & 1 deletion src/bucket/BucketManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
#include "ledger/LedgerTypeUtils.h"
#include "ledger/NetworkConfig.h"
#include "main/Application.h"
#include "main/Config.h"
#include "util/Fs.h"
Expand Down Expand Up @@ -1080,7 +1081,7 @@ EvictedStateVectors
BucketManager::resolveBackgroundEvictionScan(
AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys, uint32_t ledgerVers,
SorobanNetworkConfig& networkConfig)
SorobanNetworkConfig const& networkConfig)
{
ZoneScoped;
releaseAssert(mEvictionStatistics);
Expand Down
3 changes: 2 additions & 1 deletion src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class BucketSnapshotManager;
class SearchableLiveBucketListSnapshot;
struct BucketEntryCounters;
enum class LedgerEntryTypeAndDurability : uint32_t;
class SorobanNetworkConfig;

struct HistoryArchiveState;

Expand Down Expand Up @@ -307,7 +308,7 @@ class BucketManager : NonMovableOrCopyable
resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys,
uint32_t ledgerVers,
SorobanNetworkConfig& networkConfig);
SorobanNetworkConfig const& networkConfig);

medida::Meter& getBloomMissMeter() const;
medida::Meter& getBloomLookupMeter() const;
Expand Down
2 changes: 0 additions & 2 deletions src/bucket/LiveBucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
namespace stellar
{

class SorobanNetworkConfig;

// The LiveBucketList stores the current canonical state of the ledger. It is
// made up of LiveBucket buckets, which in turn store individual entries of type
// BucketEntry. When an entry is "evicted" from the ledger, it is removed from
Expand Down
2 changes: 1 addition & 1 deletion src/bucket/test/BucketListTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ TEST_CASE_VERSIONS("network config snapshots BucketList size", "[bucketlist]")
LedgerManagerForBucketTests& lm = app->getLedgerManager();

auto& networkConfig =
app->getLedgerManager().getMutableSorobanNetworkConfig();
app->getLedgerManager().getSorobanNetworkConfigReadOnly();

uint32_t windowSize = networkConfig.stateArchivalSettings()
.bucketListSizeWindowSampleSize;
Expand Down
6 changes: 1 addition & 5 deletions src/bucket/test/BucketTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,6 @@ closeLedger(Application& app, std::optional<SecretKey> skToSignValue,
app.getHerder().externalizeValue(TxSetXDRFrame::makeEmpty(lcl), ledgerNum,
lcl.header.scpValue.closeTime, upgrades,
skToSignValue);
testutil::crankUntil(
app,
[&lm, ledgerNum]() { return lm.getLastClosedLedgerNum() == ledgerNum; },
std::chrono::seconds(10));
return lm.getLastClosedLedgerHeader().hash;
}

Expand Down Expand Up @@ -238,7 +234,7 @@ LedgerManagerForBucketTests::transferLedgerEntriesToBucketList(
mApp.getBucketManager().resolveBackgroundEvictionScan(
ltxEvictions, lh.ledgerSeq, keys, initialLedgerVers,
mApp.getLedgerManager()
.getMutableSorobanNetworkConfig());
.getSorobanNetworkConfigForApply());

if (protocolVersionStartsFrom(
initialLedgerVers,
Expand Down
2 changes: 1 addition & 1 deletion src/catchup/CatchupWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ CatchupWork::runCatchupStep()
// In this case we should actually have been caught-up during
// the replay process and, if judged successful, our LCL should
// be the one provided as well.
auto lastClosed =
auto& lastClosed =
mApp.getLedgerManager().getLastClosedLedgerHeader();
releaseAssert(mLastApplied.hash == lastClosed.hash);
releaseAssert(mLastApplied.header == lastClosed.header);
Expand Down
2 changes: 2 additions & 0 deletions src/database/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ class Database : NonMovableOrCopyable
// Save `vers` as schema version.
void putSchemaVersion(unsigned long vers);

// Prepared statements cache may be accessed by mutliple threads (each using
// a different session), so use a mutex to synchronize access.
std::mutex mutable mStatementsMutex;

public:
Expand Down
19 changes: 16 additions & 3 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,17 @@ HerderImpl::valueExternalized(uint64 slotIndex, StellarValue const& value,

// Check to see if quorums have changed and we need to reanalyze.
checkAndMaybeReanalyzeQuorumMap();

// heart beat *after* doing all the work (ensures that we do not include
// the overhead of externalization in the way we track SCP)
// Note: this only makes sense in the context of synchronous ledger
// application on the main thread.
if (!mApp.getConfig().parallelLedgerClose())
{
// heart beat *after* doing all the work (ensures that we do not
// include the overhead of externalization in the way we track SCP)
trackingHeartBeat();
}
}
else
{
Expand Down Expand Up @@ -1156,7 +1167,7 @@ HerderImpl::lastClosedLedgerIncreased(bool latest, TxSetXDRFrameConstPtr txSet)
{
// Re-start heartbeat tracking _after_ applying the most up-to-date
// ledger. This guarantees out-of-sync timer won't fire while we have
// ledgers to apply.
// ledgers to apply (applicable during parallel ledger close).
trackingHeartBeat();

// Ensure out of sync recovery did not get triggered while we were
Expand Down Expand Up @@ -1369,6 +1380,8 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
// If applying, the next ledger will trigger voting
if (mLedgerManager.isApplying())
{
// This can only happen when closing ledgers in parallel
releaseAssert(mApp.getConfig().parallelLedgerClose());
CLOG_DEBUG(Herder, "triggerNextLedger: skipping (applying) : {}",
mApp.getStateHuman());
return;
Expand Down Expand Up @@ -1560,7 +1573,7 @@ HerderImpl::getUpgradesJson()
void
HerderImpl::forceSCPStateIntoSyncWithLastClosedLedger()
{
auto header = mLedgerManager.getLastClosedLedgerHeader().header;
auto const& header = mLedgerManager.getLastClosedLedgerHeader().header;
setTrackingSCPState(header.ledgerSeq, header.scpValue,
/* isTrackingNetwork */ true);
}
Expand Down Expand Up @@ -2360,7 +2373,7 @@ HerderImpl::herderOutOfSync()
// are no ledgers queued to be applied. If there are ledgers queued, it's
// possible the rest of the network is waiting for this node to vote. In
// this case we should _still_ remain in tracking and emit nomination; If
// the nodes does not hear anything from the network after that, then node
// the node does not hear anything from the network after that, then node
// can go into out of sync recovery.
releaseAssert(threadIsMain());
releaseAssert(!mLedgerManager.isApplying());
Expand Down
13 changes: 7 additions & 6 deletions src/herder/HerderSCPDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b,
}
}

auto lhhe = mLedgerManager.getLastClosedLedgerHeader();
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
// when checking close time, start with what we have locally
lastCloseTime = lhhe.header.scpValue.closeTime;
lastCloseTime = lcl.header.scpValue.closeTime;

// if this value is not for our local state,
// perform as many checks as we can
if (slotIndex != (lhhe.header.ledgerSeq + 1))
if (slotIndex != (lcl.header.ledgerSeq + 1))
{
if (slotIndex == lhhe.header.ledgerSeq)
if (slotIndex == lcl.header.ledgerSeq)
{
// previous ledger
if (b.closeTime != lastCloseTime)
Expand All @@ -240,7 +240,7 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b,
return SCPDriver::kInvalidValue;
}
}
else if (slotIndex < lhhe.header.ledgerSeq)
else if (slotIndex < lcl.header.ledgerSeq)
{
// basic sanity check on older value
if (b.closeTime >= lastCloseTime)
Expand Down Expand Up @@ -323,7 +323,7 @@ HerderSCPDriver::validateValueHelper(uint64_t slotIndex, StellarValue const& b,

res = SCPDriver::kInvalidValue;
}
else if (!checkAndCacheTxSetValid(*txSet, lhhe, closeTimeOffset))
else if (!checkAndCacheTxSetValid(*txSet, lcl, closeTimeOffset))
{
CLOG_DEBUG(Herder,
"HerderSCPDriver::validateValue i: {} invalid txSet {}",
Expand Down Expand Up @@ -614,6 +614,7 @@ HerderSCPDriver::combineCandidates(uint64_t slotIndex,
std::set<TransactionFramePtr> aggSet;

releaseAssert(!mLedgerManager.isApplying());
releaseAssert(threadIsMain());
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();

Hash candidatesHash;
Expand Down
1 change: 1 addition & 0 deletions src/herder/TxSetFrame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ phaseTxsAreValid(TxSetTransactions const& phase, Application& app,
uint64_t upperBoundCloseTimeOffset)
{
ZoneScoped;
releaseAssert(threadIsMain());
// This is done so minSeqLedgerGap is validated against the next
// ledgerSeq, which is what will be used at apply time

Expand Down
1 change: 0 additions & 1 deletion src/invariant/BucketListIsConsistentWithDatabase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include "bucket/BucketManager.h"
#include "bucket/LiveBucket.h"
#include "bucket/LiveBucketList.h"
#include "crypto/Hex.h"
#include "database/Database.h"
#include "history/HistoryArchive.h"
#include "invariant/InvariantManager.h"
Expand Down
47 changes: 31 additions & 16 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ LedgerManagerImpl::startNewLedger(LedgerHeader const& genesisLedger)
CLOG_INFO(Ledger, "Root account seed: {}", skey.getStrKeySeed().value);
auto output =
ledgerClosed(ltx, /*ledgerCloseMeta*/ nullptr, /*initialLedgerVers*/ 0);
updateCurrentLedgerState(output);
advanceLedgerPointers(output);

ltx.commit();
}
Expand Down Expand Up @@ -385,8 +385,8 @@ LedgerManagerImpl::loadLastKnownLedger(bool restoreBucketlist)
}

// Step 4. Restore LedgerManager's internal state
auto output = advanceLedgerPointers(*latestLedgerHeader, has);
updateCurrentLedgerState(output);
auto output = advanceLedgerStateSnapshot(*latestLedgerHeader, has);
advanceLedgerPointers(output);

// Maybe truncate checkpoint files if we're restarting after a crash
// in closeLedger (in which case any modifications to the ledger state have
Expand Down Expand Up @@ -766,7 +766,10 @@ LedgerManagerImpl::ledgerCloseComplete(uint32_t lcl, bool calledViaExternalize,
releaseAssert(latestQueuedToApply <= latestHeardFromNetwork);
}

if (lcl == latestQueuedToApply)
// Without parallel ledger close, this should always be true
bool doneApplying = lcl == latestQueuedToApply;
releaseAssert(doneApplying || mApp.getConfig().parallelLedgerClose());
if (doneApplying)
{
mCurrentlyApplyingLedger = false;
}
Expand Down Expand Up @@ -816,8 +819,20 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData,

LedgerTxn ltx(mApp.getLedgerTxnRoot());
auto header = ltx.loadHeader();
// Note: closeLedger should be able to work correctly based on ledger header
// stored in LedgerTxn. The issue is that in tests LedgerTxn is somtimes
// modified manually, which changes ledger header hash compared to the
// cached one and causes tests to fail.
auto prevHeader =
threadIsMain() ? getLastClosedLedgerHeader().header : header.current();
#ifdef BUILD_TESTS
if (app.getConfig().MODE_USES_IN_MEMORY_LEDGER)
{
releaseAssert(threadIsMain());
getLastClosedLedgerHeader().header;
}
else
#endif
header.current();
auto prevHash = xdrSha256(prevHeader);

auto initialLedgerVers = header.current().ledgerVersion;
Expand All @@ -830,7 +845,8 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData,

auto now = mApp.getClock().now();
mLedgerAgeClosed.Update(now - mLastClose);
// mLastClose is only accessed by a single thread
// mLastClose is only accessed by a single thread, so no synchronization
// needed
mLastClose = now;
mLedgerAge.set_count(0);

Expand Down Expand Up @@ -1016,7 +1032,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData,
emitNextMeta();
}

// The next 5 steps happen in a relatively non-obvious, subtle order.
// The next 7 steps happen in a relatively non-obvious, subtle order.
// This is unfortunate and it would be nice if we could make it not
// be so subtle, but for the time being this is where we are.
//
Expand Down Expand Up @@ -1075,7 +1091,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData,
[this, txs, ledgerSeq, calledViaExternalize, ledgerData,
ledgerOutput = std::move(closeLedgerResult)]() mutable {
releaseAssert(threadIsMain());
updateCurrentLedgerState(ledgerOutput);
advanceLedgerPointers(ledgerOutput);

// Step 5. Maybe kick off publishing on complete checkpoint files
auto& hm = mApp.getHistoryManager();
Expand Down Expand Up @@ -1144,7 +1160,7 @@ LedgerManagerImpl::setLastClosedLedger(
ltx.commit();

mRebuildInMemoryState = false;
updateCurrentLedgerState(advanceLedgerPointers(lastClosed.header, has));
advanceLedgerPointers(advanceLedgerStateSnapshot(lastClosed.header, has));

LedgerTxn ltx2(mApp.getLedgerTxnRoot());
if (protocolVersionStartsFrom(ltx2.loadHeader().current().ledgerVersion,
Expand All @@ -1167,8 +1183,8 @@ LedgerManagerImpl::manuallyAdvanceLedgerHeader(LedgerHeader const& header)
has.fromString(mApp.getPersistentState().getState(
PersistentState::kHistoryArchiveState,
mApp.getDatabase().getSession()));
auto output = advanceLedgerPointers(header, has, false);
updateCurrentLedgerState(output);
auto output = advanceLedgerStateSnapshot(header, has);
advanceLedgerPointers(output);
}

void
Expand Down Expand Up @@ -1315,7 +1331,7 @@ LedgerManagerImpl::getCurrentLedgerStateSnaphot()
}

void
LedgerManagerImpl::updateCurrentLedgerState(CloseLedgerOutput const& output)
LedgerManagerImpl::advanceLedgerPointers(CloseLedgerOutput const& output)
{
releaseAssert(threadIsMain());
CLOG_DEBUG(
Expand All @@ -1330,9 +1346,8 @@ LedgerManagerImpl::updateCurrentLedgerState(CloseLedgerOutput const& output)
}

LedgerManagerImpl::CloseLedgerOutput
LedgerManagerImpl::advanceLedgerPointers(LedgerHeader const& header,
HistoryArchiveState const& has,
bool debugLog)
LedgerManagerImpl::advanceLedgerStateSnapshot(LedgerHeader const& header,
HistoryArchiveState const& has)
{
auto ledgerHash = xdrSha256(header);

Expand Down Expand Up @@ -1839,7 +1854,7 @@ LedgerManagerImpl::ledgerClosed(
mApp.getBucketManager().snapshotLedger(lh);
auto has = storeCurrentLedger(lh, /* storeHeader */ true,
/* appendToCheckpoint */ true);
res = advanceLedgerPointers(lh, has);
res = advanceLedgerStateSnapshot(lh, has);
});

return res;
Expand Down
Loading

0 comments on commit 67e7306

Please sign in to comment.