Skip to content

Commit

Permalink
WIP: Background tryAdd functionality in TransactionQueue
Browse files Browse the repository at this point in the history
This is a *draft* change that will resolve stellar#4316 when it is complete.
The change makes `TransactionQueue` thread safe and runs the `tryAdd`
function in the background when the feature is enabled. The
implementation closely follows the
[design document](https://docs.google.com/document/d/1pU__XfEp-rR-17TNsuj-VhY6JfyendaFSYLTiq6tIj4/edit?usp=sharing)
I wrote.  The implementation still requires the main thread to
re-broadcast the transactions (for now). I've opened this PR for
visibility / early feedback on the implementation.

This change is very much a work in progress, with the following tasks
remaining:

* [ ] Fix catchup. I seem to have broken catchup in rebasing these
      changes on master. I need to figure out what is going on there and fix
      it.
* [ ] Fix failing tests. These are failing because they don't update
      `TransactionQueue`s new snapshots correctly.
* [ ] Rigorous testing, both for correctness and performance.
* [ ] I'd like to take a look at pushing the cut-point out a bit to
      enable flooding in the background as well. If this is a relatively
      simple change, I'd like to roll it into this PR. If it looks hairy,
      then I'll leave it for a separate change later.
  • Loading branch information
bboston7 committed Jan 9, 2025
1 parent d89edf1 commit 256f322
Show file tree
Hide file tree
Showing 38 changed files with 698 additions and 310 deletions.
94 changes: 63 additions & 31 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ HerderImpl::SCPMetrics::SCPMetrics(Application& app)
}

HerderImpl::HerderImpl(Application& app)
: mTransactionQueue(app, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
TRANSACTION_QUEUE_SIZE_MULTIPLIER)
, mPendingEnvelopes(app, *this)
: mPendingEnvelopes(app, *this)
, mHerderSCPDriver(app, *this, mUpgrades, mPendingEnvelopes)
, mLastSlotSaved(0)
, mTrackingTimer(app)
Expand Down Expand Up @@ -280,7 +277,10 @@ HerderImpl::shutdown()
"Shutdown interrupting quorum transitive closure analysis.");
mLastQuorumMapIntersectionState.mInterruptFlag = true;
}
mTransactionQueue.shutdown();
if (mTransactionQueue)
{
mTransactionQueue->shutdown();
}
if (mSorobanTransactionQueue)
{
mSorobanTransactionQueue->shutdown();
Expand Down Expand Up @@ -594,7 +594,7 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
mSorobanTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
!tx->isSoroban();
bool hasClassic =
mTransactionQueue.sourceAccountPending(tx->getSourceID()) &&
mTransactionQueue->sourceAccountPending(tx->getSourceID()) &&
tx->isSoroban();
if (hasSoroban || hasClassic)
{
Expand All @@ -608,11 +608,31 @@ HerderImpl::recvTransaction(TransactionFrameBasePtr tx, bool submittedFromSelf)
}
else if (!tx->isSoroban())
{
result = mTransactionQueue.tryAdd(tx, submittedFromSelf);
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
{
mApp.postOnOverlayThread(
[this, tx]() { mTransactionQueue->tryAdd(tx, false); },
"try add tx");
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
}
else
{
result = mTransactionQueue->tryAdd(tx, submittedFromSelf);
}
}
else if (mSorobanTransactionQueue)
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
if (mApp.getConfig().BACKGROUND_TX_QUEUE && !submittedFromSelf)
{
mApp.postOnOverlayThread(
[this, tx]() { mSorobanTransactionQueue->tryAdd(tx, false); },
"try add tx");
result.code = TransactionQueue::AddResultCode::ADD_STATUS_UNKNOWN;
}
else
{
result = mSorobanTransactionQueue->tryAdd(tx, submittedFromSelf);
}
}
else
{
Expand Down Expand Up @@ -914,7 +934,7 @@ HerderImpl::externalizeValue(TxSetXDRFrameConstPtr txSet, uint32_t ledgerSeq,
bool
HerderImpl::sourceAccountPending(AccountID const& accountID) const
{
bool accPending = mTransactionQueue.sourceAccountPending(accountID);
bool accPending = mTransactionQueue->sourceAccountPending(accountID);
if (mSorobanTransactionQueue)
{
accPending = accPending ||
Expand Down Expand Up @@ -1083,7 +1103,7 @@ HerderImpl::getPendingEnvelopes()
ClassicTransactionQueue&
HerderImpl::getTransactionQueue()
{
return mTransactionQueue;
return *mTransactionQueue;
}
SorobanTransactionQueue&
HerderImpl::getSorobanTransactionQueue()
Expand Down Expand Up @@ -1351,7 +1371,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
// during last few ledger closes
auto const& lcl = mLedgerManager.getLastClosedLedgerHeader();
PerPhaseTransactionList txPhases;
txPhases.emplace_back(mTransactionQueue.getTransactions(lcl.header));
txPhases.emplace_back(mTransactionQueue->getTransactions(lcl.header));

if (protocolVersionStartsFrom(lcl.header.ledgerVersion,
SOROBAN_PROTOCOL_VERSION))
Expand Down Expand Up @@ -1430,7 +1450,7 @@ HerderImpl::triggerNextLedger(uint32_t ledgerSeqToTrigger,
invalidTxPhases[static_cast<size_t>(TxSetPhase::SOROBAN)]);
}

mTransactionQueue.ban(
mTransactionQueue->ban(
invalidTxPhases[static_cast<size_t>(TxSetPhase::CLASSIC)]);

auto txSetHash = proposedSet->getContentsHash();
Expand Down Expand Up @@ -2129,9 +2149,11 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
{
if (!mSorobanTransactionQueue)
{
releaseAssert(mTxQueueBucketSnapshot);
mSorobanTransactionQueue =
std::make_unique<SorobanTransactionQueue>(
mApp, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
mApp, mTxQueueBucketSnapshot,
TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS,
SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER);
}
Expand All @@ -2146,6 +2168,15 @@ HerderImpl::maybeSetupSorobanQueue(uint32_t protocolVersion)
void
HerderImpl::start()
{
releaseAssert(!mTxQueueBucketSnapshot);
mTxQueueBucketSnapshot = mApp.getBucketManager()
.getBucketSnapshotManager()
.copySearchableLiveBucketListSnapshot();
releaseAssert(!mTransactionQueue);
mTransactionQueue = std::make_unique<ClassicTransactionQueue>(
mApp, mTxQueueBucketSnapshot, TRANSACTION_QUEUE_TIMEOUT_LEDGERS,
TRANSACTION_QUEUE_BAN_LEDGERS, TRANSACTION_QUEUE_SIZE_MULTIPLIER);

mMaxTxSize = mApp.getHerder().getMaxClassicTxSize();
{
uint32_t version = mApp.getLedgerManager()
Expand Down Expand Up @@ -2289,23 +2320,23 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)

auto lhhe = mLedgerManager.getLastClosedLedgerHeader();

auto updateQueue = [&](auto& queue, auto const& applied) {
queue.removeApplied(applied);
queue.shift();

auto txs = queue.getTransactions(lhhe.header);

auto invalidTxs = TxSetUtils::getInvalidTxList(
auto filterInvalidTxs = [&](TxFrameList const& txs) {
return TxSetUtils::getInvalidTxList(
txs, mApp, 0,
getUpperBoundCloseTimeOffset(mApp, lhhe.header.scpValue.closeTime));
queue.ban(invalidTxs);

queue.rebroadcast();
getUpperBoundCloseTimeOffset(mApp.getAppConnector(),
lhhe.header.scpValue.closeTime));
};
// Update bucket list snapshot, if needed. Note that this modifies the
// pointer itself on update, so we need to pass the potentially new pointer
// to the tx queues.
mApp.getBucketManager()
.getBucketSnapshotManager()
.maybeCopySearchableBucketListSnapshot(mTxQueueBucketSnapshot);
if (txsPerPhase.size() > static_cast<size_t>(TxSetPhase::CLASSIC))
{
updateQueue(mTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)]);
mTransactionQueue->update(
txsPerPhase[static_cast<size_t>(TxSetPhase::CLASSIC)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}

// Even if we're in protocol 20, still check for number of phases, in case
Expand All @@ -2314,8 +2345,9 @@ HerderImpl::updateTransactionQueue(TxSetXDRFrameConstPtr externalizedTxSet)
if (mSorobanTransactionQueue != nullptr &&
txsPerPhase.size() > static_cast<size_t>(TxSetPhase::SOROBAN))
{
updateQueue(*mSorobanTransactionQueue,
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)]);
mSorobanTransactionQueue->update(
txsPerPhase[static_cast<size_t>(TxSetPhase::SOROBAN)], lhhe.header,
mTxQueueBucketSnapshot, filterInvalidTxs);
}
}

Expand Down Expand Up @@ -2423,7 +2455,7 @@ HerderImpl::isNewerNominationOrBallotSt(SCPStatement const& oldSt,
size_t
HerderImpl::getMaxQueueSizeOps() const
{
return mTransactionQueue.getMaxQueueSizeOps();
return mTransactionQueue->getMaxQueueSizeOps();
}

size_t
Expand All @@ -2437,7 +2469,7 @@ HerderImpl::getMaxQueueSizeSorobanOps() const
bool
HerderImpl::isBannedTx(Hash const& hash) const
{
auto banned = mTransactionQueue.isBanned(hash);
auto banned = mTransactionQueue->isBanned(hash);
if (mSorobanTransactionQueue)
{
banned = banned || mSorobanTransactionQueue->isBanned(hash);
Expand All @@ -2448,7 +2480,7 @@ HerderImpl::isBannedTx(Hash const& hash) const
TransactionFrameBaseConstPtr
HerderImpl::getTx(Hash const& hash) const
{
auto classic = mTransactionQueue.getTx(hash);
auto classic = mTransactionQueue->getTx(hash);
if (!classic && mSorobanTransactionQueue)
{
return mSorobanTransactionQueue->getTx(hash);
Expand Down
5 changes: 4 additions & 1 deletion src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ class HerderImpl : public Herder
void purgeOldPersistedTxSets();
void writeDebugTxSet(LedgerCloseData const& lcd);

ClassicTransactionQueue mTransactionQueue;
std::unique_ptr<ClassicTransactionQueue> mTransactionQueue;
std::unique_ptr<SorobanTransactionQueue> mSorobanTransactionQueue;

void updateTransactionQueue(TxSetXDRFrameConstPtr txSet);
Expand Down Expand Up @@ -298,6 +298,9 @@ class HerderImpl : public Herder
Application& mApp;
LedgerManager& mLedgerManager;

// Bucket list snapshot to use for transaction queues
SearchableSnapshotConstPtr mTxQueueBucketSnapshot;

struct SCPMetrics
{
medida::Meter& mLostSync;
Expand Down
Loading

0 comments on commit 256f322

Please sign in to comment.