Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Improve quorum caching (again) #5761

Merged
merged 10 commits into from
Dec 20, 2023
6 changes: 1 addition & 5 deletions src/llmq/dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,6 @@ void CDKGSessionManager::CleanupOldContributions() const
const auto prefixes = {DB_VVEC, DB_SKCONTRIB, DB_ENC_CONTRIB};

for (const auto& params : Params().GetConsensus().llmqs) {
// For how many blocks recent DKG info should be kept
const int MAX_CYCLES = params.useRotation ? params.keepOldKeys / params.signingActiveQuorumCount : params.keepOldKeys;
const int MAX_STORE_DEPTH = MAX_CYCLES * params.dkgInterval;

LogPrint(BCLog::LLMQ, "CDKGSessionManager::%s -- looking for old entries for llmq type %d\n", __func__, ToUnderlying(params.type));

CDBBatch batch(*db);
Expand All @@ -487,7 +483,7 @@ void CDKGSessionManager::CleanupOldContributions() const
}
cnt_all++;
const CBlockIndex* pindexQuorum = m_chainstate.m_blockman.LookupBlockIndex(std::get<2>(k));
if (pindexQuorum == nullptr || m_chainstate.m_chain.Tip()->nHeight - pindexQuorum->nHeight > MAX_STORE_DEPTH) {
if (pindexQuorum == nullptr || m_chainstate.m_chain.Tip()->nHeight - pindexQuorum->nHeight > utils::max_store_depth(params)) {
// not found or too old
batch.Erase(k);
cnt_old++;
Expand Down
75 changes: 62 additions & 13 deletions src/llmq/quorums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate,
m_peerman(peerman)
{
utils::InitQuorumsCache(mapQuorumsCache, false);
utils::InitQuorumsCache(scanQuorumsCache, false);

quorumThreadInterrupt.reset();
}

Expand Down Expand Up @@ -502,14 +500,45 @@ std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp
return {};
}

const CBlockIndex* pIndexScanCommitments{pindexStart};
gsl::not_null<const CBlockIndex*> pindexStore{pindexStart};
const auto& llmq_params_opt = GetLLMQParams(llmqType);
assert(llmq_params_opt.has_value());
PastaPastaPasta marked this conversation as resolved.
Show resolved Hide resolved

// Quorum sets can only change during the mining phase of DKG.
// Find the closest known block index.
const int quorumCycleStartHeight = pindexStart->nHeight - (pindexStart->nHeight % llmq_params_opt->dkgInterval);
const int quorumCycleMiningStartHeight = quorumCycleStartHeight + llmq_params_opt->dkgMiningWindowStart;
const int quorumCycleMiningEndHeight = quorumCycleStartHeight + llmq_params_opt->dkgMiningWindowEnd;

if (pindexStart->nHeight < quorumCycleMiningStartHeight) {
// too early for this cycle, use the previous one
// bail out if it's below genesis block
if (quorumCycleMiningEndHeight < llmq_params_opt->dkgInterval) return {};
pindexStore = pindexStart->GetAncestor(quorumCycleMiningEndHeight - llmq_params_opt->dkgInterval);
} else if (pindexStart->nHeight > quorumCycleMiningEndHeight) {
// we are past the mining phase of this cycle, use it
pindexStore = pindexStart->GetAncestor(quorumCycleMiningEndHeight);
}
// everything else is inside the mining phase of this cycle, no pindexStore adjustment needed

gsl::not_null<const CBlockIndex*> pIndexScanCommitments{pindexStore};
size_t nScanCommitments{nCountRequested};
std::vector<CQuorumCPtr> vecResultQuorums;

{
LOCK(cs_scan_quorums);
if (scanQuorumsCache.empty()) {
for (const auto& llmq : Params().GetConsensus().llmqs) {
// NOTE: We store it for each block hash in the DKG mining phase here
// and not for a single quorum hash per quorum like we do for other caches.
// And we only do this for max_cycles() of the most recent quorums
// because signing by old quorums requires the exact quorum hash to be specified
// and quorum scanning isn't needed there.
scanQuorumsCache.try_emplace(llmq.type, utils::max_cycles(llmq, llmq.keepOldConnections) * (llmq.dkgMiningWindowEnd - llmq.dkgMiningWindowStart));
}
}
auto& cache = scanQuorumsCache[llmqType];
bool fCacheExists = cache.get(pindexStart->GetBlockHash(), vecResultQuorums);
bool fCacheExists = cache.get(pindexStore->GetBlockHash(), vecResultQuorums);
if (fCacheExists) {
// We have exactly what requested so just return it
if (vecResultQuorums.size() == nCountRequested) {
Expand All @@ -523,17 +552,17 @@ std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp
// scanning for the rests
if (!vecResultQuorums.empty()) {
nScanCommitments -= vecResultQuorums.size();
// bail out if it's below genesis block
if (vecResultQuorums.back()->m_quorum_base_block_index->pprev == nullptr) return {};
pIndexScanCommitments = vecResultQuorums.back()->m_quorum_base_block_index->pprev;
}
} else {
// If there is nothing in cache request at least cache.max_size() because this gets cached then later
nScanCommitments = std::max(nCountRequested, cache.max_size());
// If there is nothing in cache request at least keepOldConnections because this gets cached then later
nScanCommitments = std::max(nCountRequested, static_cast<size_t>(llmq_params_opt->keepOldConnections));
PastaPastaPasta marked this conversation as resolved.
Show resolved Hide resolved
}
}

// Get the block indexes of the mined commitments to build the required quorums from
const auto& llmq_params_opt = GetLLMQParams(llmqType);
assert(llmq_params_opt.has_value());
std::vector<const CBlockIndex*> pQuorumBaseBlockIndexes{ llmq_params_opt->useRotation ?
quorumBlockProcessor.GetMinedCommitmentsIndexedUntilBlock(llmqType, pIndexScanCommitments, nScanCommitments) :
quorumBlockProcessor.GetMinedCommitmentsUntilBlock(llmqType, pIndexScanCommitments, nScanCommitments)
Expand All @@ -550,10 +579,12 @@ std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp
const size_t nCountResult{vecResultQuorums.size()};
if (nCountResult > 0) {
LOCK(cs_scan_quorums);
// Don't cache more than cache.max_size() elements
// Don't cache more than keepOldConnections elements
// because signing by old quorums requires the exact quorum hash
// to be specified and quorum scanning isn't needed there.
auto& cache = scanQuorumsCache[llmqType];
const size_t nCacheEndIndex = std::min(nCountResult, cache.max_size());
cache.emplace(pindexStart->GetBlockHash(), {vecResultQuorums.begin(), vecResultQuorums.begin() + nCacheEndIndex});
const size_t nCacheEndIndex = std::min(nCountResult, static_cast<size_t>(llmq_params_opt->keepOldConnections));
PastaPastaPasta marked this conversation as resolved.
Show resolved Hide resolved
cache.emplace(pindexStore->GetBlockHash(), {vecResultQuorums.begin(), vecResultQuorums.begin() + nCacheEndIndex});
}
// Don't return more than nCountRequested elements
const size_t nResultEndIndex = std::min(nCountResult, nCountRequested);
Expand Down Expand Up @@ -1022,13 +1053,31 @@ void CQuorumManager::StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex)
workerPool.push([pIndex, t, this](int threadId) {
std::set<uint256> dbKeysToSkip;

if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) {
utils::InitQuorumsCache(cleanupQuorumsCache, false);
}
for (const auto& params : Params().GetConsensus().llmqs) {
if (quorumThreadInterrupt) {
break;
}
for (const auto& pQuorum : ScanQuorums(params.type, pIndex, params.keepOldKeys)) {
dbKeysToSkip.insert(MakeQuorumKey(*pQuorum));
LOCK(cs_cleanup);
auto& cache = cleanupQuorumsCache[params.type];
const CBlockIndex* pindex_loop{pIndex};
std::set<uint256> quorum_keys;
while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < utils::max_store_depth(params)) {
uint256 quorum_key;
if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) {
quorum_keys.insert(quorum_key);
if (quorum_keys.size() >= params.keepOldKeys) break; // extra safety belt
}
pindex_loop = pindex_loop->pprev;
}
for (const auto& pQuorum : ScanQuorums(params.type, pIndex, params.keepOldKeys - quorum_keys.size())) {
const uint256 quorum_key = MakeQuorumKey(*pQuorum);
quorum_keys.insert(quorum_key);
cache.insert(pQuorum->m_quorum_base_block_index->GetBlockHash(), quorum_key);
}
dbKeysToSkip.merge(quorum_keys);
}

if (!quorumThreadInterrupt) {
Expand Down
2 changes: 2 additions & 0 deletions src/llmq/quorums.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ class CQuorumManager
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, CQuorumPtr, StaticSaltedHasher>> mapQuorumsCache GUARDED_BY(cs_map_quorums);
mutable RecursiveMutex cs_scan_quorums;
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>> scanQuorumsCache GUARDED_BY(cs_scan_quorums);
mutable Mutex cs_cleanup;
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, uint256, StaticSaltedHasher>> cleanupQuorumsCache GUARDED_BY(cs_cleanup);

mutable ctpl::thread_pool workerPool;
mutable CThreadInterrupt quorumThreadInterrupt;
Expand Down
1 change: 1 addition & 0 deletions src/llmq/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,7 @@ template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>>& cache, bool limit_by_connections);
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>, std::less<Consensus::LLMQType>, std::allocator<std::pair<Consensus::LLMQType const, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>>>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>, std::less<Consensus::LLMQType>, std::allocator<std::pair<Consensus::LLMQType const, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>>>>&cache, bool limit_by_connections);
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, int, StaticSaltedHasher>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, int, StaticSaltedHasher>>& cache, bool limit_by_connections);
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, uint256, StaticSaltedHasher>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, uint256, StaticSaltedHasher>>& cache, bool limit_by_connections);

} // namespace utils

Expand Down
11 changes: 11 additions & 0 deletions src/llmq/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,17 @@ void IterateNodesRandom(NodesContainer& nodeStates, Continue&& cont, Callback&&
template <typename CacheType>
void InitQuorumsCache(CacheType& cache, bool limit_by_connections = true);

[[ nodiscard ]] static constexpr int max_cycles(const Consensus::LLMQParams& llmqParams, int quorums_count)
{
return llmqParams.useRotation ? quorums_count / llmqParams.signingActiveQuorumCount : quorums_count;
}

[[ nodiscard ]] static constexpr int max_store_depth(const Consensus::LLMQParams& llmqParams)
{
// For how many blocks recent DKG info should be kept
return max_cycles(llmqParams, llmqParams.keepOldKeys) * llmqParams.dkgInterval;
}

} // namespace utils

[[ nodiscard ]] const std::optional<Consensus::LLMQParams> GetLLMQParams(Consensus::LLMQType llmqType);
Expand Down
11 changes: 8 additions & 3 deletions src/rpc/quorums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ static void quorum_list_help(const JSONRPCRequest& request)
RPCHelpMan{"quorum list",
"List of on-chain quorums\n",
{
{"count", RPCArg::Type::NUM, /* default */ "", "Number of quorums to list. Will list active quorums if \"count\" is not specified."},
{"count", RPCArg::Type::NUM, /* default */ "",
"Number of quorums to list. Will list active quorums if \"count\" is not specified.\n"
"Can be CPU/disk heavy when the value is larger than the number of active quorums."
},
},
RPCResult{
RPCResult::Type::OBJ, "", "",
Expand Down Expand Up @@ -365,8 +368,10 @@ static void quorum_memberof_help(const JSONRPCRequest& request)
{
{"proTxHash", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "ProTxHash of the masternode."},
{"scanQuorumsCount", RPCArg::Type::NUM, /* default */ "",
"Number of quorums to scan for. If not specified,\n"
"the active quorum count for each specific quorum type is used."},
"Number of quorums to scan for.\n"
"If not specified, the active quorum count for each specific quorum type is used.\n"
"Can be CPU/disk heavy when the value is larger than the number of active quorums."
},
},
RPCResults{},
RPCExamples{""},
Expand Down
Loading