Skip to content

Commit

Permalink
Support reusable module cache with multithreaded pre-compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
graydon committed Jan 15, 2025
1 parent 06adc32 commit 89d85c2
Show file tree
Hide file tree
Showing 13 changed files with 663 additions and 14 deletions.
7 changes: 7 additions & 0 deletions src/ledger/LedgerManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "catchup/LedgerApplyManager.h"
#include "history/HistoryManager.h"
#include "ledger/NetworkConfig.h"
#include "rust/RustBridge.h"
#include <memory>

namespace stellar
Expand Down Expand Up @@ -200,6 +201,12 @@ class LedgerManager
virtual void manuallyAdvanceLedgerHeader(LedgerHeader const& header) = 0;

virtual SorobanMetrics& getSorobanMetrics() = 0;
virtual rust_bridge::SorobanModuleCache& getModuleCache() = 0;

// Compiles all contracts in the current ledger, for ledger protocols
// starting at minLedgerVersion and running through to
// Config::CURRENT_LEDGER_PROTOCOL_VERSION (to enable upgrades).
virtual void compileAllContractsInLedger(uint32_t minLedgerVersion) = 0;

virtual ~LedgerManager()
{
Expand Down
27 changes: 27 additions & 0 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ledger/LedgerTxn.h"
#include "ledger/LedgerTxnEntry.h"
#include "ledger/LedgerTxnHeader.h"
#include "ledger/SharedModuleCacheCompiler.h"
#include "main/Application.h"
#include "main/Config.h"
#include "main/ErrorMessages.h"
Expand All @@ -42,6 +43,7 @@
#include "work/WorkScheduler.h"
#include "xdrpp/printer.h"

#include <cstdint>
#include <fmt/format.h>

#include "xdr/Stellar-ledger.h"
Expand Down Expand Up @@ -155,6 +157,7 @@ LedgerManagerImpl::LedgerManagerImpl(Application& app)
, mCatchupDuration(
app.getMetrics().NewTimer({"ledger", "catchup", "duration"}))
, mState(LM_BOOTING_STATE)
, mModuleCache(::rust_bridge::new_module_cache())

{
setupLedgerCloseMetaStream();
Expand Down Expand Up @@ -403,6 +406,9 @@ LedgerManagerImpl::loadLastKnownLedger(bool restoreBucketlist)
updateNetworkConfig(ltx);
mSorobanNetworkConfigReadOnly = mSorobanNetworkConfigForApply;
}

// Prime module cache with ledger content.
compileAllContractsInLedger(latestLedgerHeader->ledgerVersion);
}

Database&
Expand Down Expand Up @@ -566,6 +572,27 @@ LedgerManagerImpl::getSorobanMetrics()
return mSorobanMetrics;
}

rust_bridge::SorobanModuleCache&
LedgerManagerImpl::getModuleCache()
{
return *mModuleCache;
}

void
LedgerManagerImpl::compileAllContractsInLedger(uint32_t minLedgerVersion)
{
auto& moduleCache = getModuleCache();
std::vector<uint32_t> ledgerVersions;
for (uint32_t i = minLedgerVersion;
i <= Config::CURRENT_LEDGER_PROTOCOL_VERSION; i++)
{
ledgerVersions.push_back(i);
}
auto compiler = std::make_shared<SharedModuleCacheCompiler>(
mApp, moduleCache, ledgerVersions);
compiler->run();
}

void
LedgerManagerImpl::publishSorobanMetrics()
{
Expand Down
6 changes: 6 additions & 0 deletions src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ledger/NetworkConfig.h"
#include "ledger/SorobanMetrics.h"
#include "main/PersistentState.h"
#include "rust/RustBridge.h"
#include "transactions/TransactionFrame.h"
#include "util/XDRStream.h"
#include "xdr/Stellar-ledger.h"
Expand Down Expand Up @@ -152,6 +153,9 @@ class LedgerManagerImpl : public LedgerManager
// Update cached ledger state values managed by this class.
void advanceLedgerPointers(CloseLedgerOutput const& output);

// The reusable / inter-ledger soroban module cache.
::rust::Box<rust_bridge::SorobanModuleCache> mModuleCache;

protected:
// initialLedgerVers must be the ledger version at the start of the ledger
// and currLedgerVers is the ledger version in the current ltx header. These
Expand Down Expand Up @@ -251,5 +255,7 @@ class LedgerManagerImpl : public LedgerManager
{
return mCurrentlyApplyingLedger;
}
rust_bridge::SorobanModuleCache& getModuleCache() override;
void compileAllContractsInLedger(uint32_t minLedgerVersion) override;
};
}
161 changes: 161 additions & 0 deletions src/ledger/SharedModuleCacheCompiler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#include "ledger/SharedModuleCacheCompiler.h"
#include "bucket/SearchableBucketList.h"
#include "crypto/Hex.h"
#include "crypto/SHA.h"
#include "main/Application.h"
#include "rust/RustBridge.h"
#include "util/Logging.h"
#include <chrono>

using namespace stellar;

SharedModuleCacheCompiler::SharedModuleCacheCompiler(
Application& app, rust_bridge::SorobanModuleCache& moduleCache,
std::vector<uint32_t> const& ledgerVersions)
: mApp(app)
, mModuleCache(moduleCache)
, mSnap(app.getBucketManager()
.getBucketSnapshotManager()
.copySearchableLiveBucketListSnapshot())
, mNumThreads(
static_cast<size_t>(std::max(2, app.getConfig().WORKER_THREADS) - 1))
, mLedgerVersions(ledgerVersions)
{
}

void
SharedModuleCacheCompiler::pushWasm(xdr::xvector<uint8_t> const& vec)
{
std::unique_lock<std::mutex> lock(mMutex);
mHaveSpace.wait(
lock, [&] { return mBytesLoaded - mBytesCompiled < MAX_MEM_BYTES; });
xdr::xvector<uint8_t> buf(vec);
auto size = buf.size();
mWasms.emplace_back(std::move(buf));
mBytesLoaded += size;
lock.unlock();
mHaveContracts.notify_all();
LOG_INFO(DEFAULT_LOG, "Loaded contract with {} bytes of wasm code", size);
}

bool
SharedModuleCacheCompiler::isFinishedCompiling(
std::unique_lock<std::mutex>& lock)
{
releaseAssert(lock.owns_lock());
return mLoadedAll && mBytesCompiled == mBytesLoaded;
}

void
SharedModuleCacheCompiler::setFinishedLoading()
{
std::unique_lock lock(mMutex);
mLoadedAll = true;
lock.unlock();
mHaveContracts.notify_all();
}

bool
SharedModuleCacheCompiler::popAndCompileWasm(size_t thread,
std::unique_lock<std::mutex>& lock)
{

releaseAssert(lock.owns_lock());

// Wait for a new contract to compile (or being done).
mHaveContracts.wait(
lock, [&] { return !mWasms.empty() || isFinishedCompiling(lock); });

// Check to see if we were woken up due to end-of-compilation.
if (isFinishedCompiling(lock))
{
return false;
}

xdr::xvector<uint8_t> wasm = std::move(mWasms.front());
mWasms.pop_front();

// Make a local shallow copy of the cache, so we don't race on the
// shared host.
auto cache = mModuleCache.shallow_clone();

lock.unlock();

auto start = std::chrono::steady_clock::now();
auto slice = rust::Slice<const uint8_t>(wasm.data(), wasm.size());
try
{
for (auto ledgerVersion : mLedgerVersions)
{
cache->compile(ledgerVersion, slice);
}
}
catch (std::exception const& e)
{
LOG_ERROR(DEFAULT_LOG, "Thread {} failed to compile wasm code: {}",
thread, e.what());
}
auto end = std::chrono::steady_clock::now();
auto dur_us =
std::chrono::duration_cast<std::chrono::microseconds>(end - start);
LOG_INFO(DEFAULT_LOG, "Thread {} compiled {} byte wasm contract {} in {}us",
thread, wasm.size(), binToHex(sha256(wasm)), dur_us.count());
lock.lock();
mTotalCompileTime += dur_us;
mBytesCompiled += wasm.size();
wasm.clear();
mHaveSpace.notify_all();
mHaveContracts.notify_all();
return true;
}

void
SharedModuleCacheCompiler::run()
{
auto self = shared_from_this();
auto start = std::chrono::steady_clock::now();
LOG_INFO(DEFAULT_LOG,
"Launching 1 loading and {} compiling background threads",
mNumThreads);
mApp.postOnBackgroundThread(
[self]() {
self->mSnap->scanForContractCode([&](LedgerEntry const& entry) {
self->pushWasm(entry.data.contractCode().code);
return Loop::INCOMPLETE;
});
self->setFinishedLoading();
},
"contract loading thread");

for (auto thread = 0; thread < self->mNumThreads; ++thread)
{
mApp.postOnBackgroundThread(
[self, thread]() {
size_t nContractsCompiled = 0;
std::unique_lock<std::mutex> lock(self->mMutex);
while (!self->isFinishedCompiling(lock))
{
if (self->popAndCompileWasm(thread, lock))
{
++nContractsCompiled;
}
}
LOG_INFO(DEFAULT_LOG, "Thread {} compiled {} contracts", thread,
nContractsCompiled);
},
fmt::format("compilation thread {}", thread));
}

std::unique_lock lock(self->mMutex);
self->mHaveContracts.wait(
lock, [self, &lock] { return self->isFinishedCompiling(lock); });

auto end = std::chrono::steady_clock::now();
LOG_INFO(DEFAULT_LOG,
"All contracts compiled in {}ms real time, {}ms CPU time",
std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count(),
std::chrono::duration_cast<std::chrono::milliseconds>(
self->mTotalCompileTime)
.count());
}
59 changes: 59 additions & 0 deletions src/ledger/SharedModuleCacheCompiler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once
// Copyright 2024 Stellar Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "bucket/BucketSnapshotManager.h"
#include "rust/RustBridge.h"
#include "xdrpp/types.h"

#include <cstdint>
#include <deque>

#include <condition_variable>
#include <memory>
#include <mutex>

namespace stellar
{
class Application;
}

// This class encapsulates a multithreaded strategy for loading contracts
// out of the database (on one thread) and compiling them (on N-1 others).
class SharedModuleCacheCompiler
: public std::enable_shared_from_this<SharedModuleCacheCompiler>
{
stellar::Application& mApp;
stellar::rust_bridge::SorobanModuleCache& mModuleCache;
stellar::SearchableSnapshotConstPtr mSnap;
std::deque<xdr::xvector<uint8_t>> mWasms;

const size_t mNumThreads;
const size_t MAX_MEM_BYTES = 10 * 1024 * 1024;
bool mLoadedAll{false};
size_t mBytesLoaded{0};
size_t mBytesCompiled{0};
std::vector<uint32_t> mLedgerVersions;

std::mutex mMutex;
std::condition_variable mHaveSpace;
std::condition_variable mHaveContracts;

std::chrono::microseconds mTotalCompileTime{0};

void setFinishedLoading();
bool isFinishedCompiling(std::unique_lock<std::mutex>& lock);
// This gets called in a loop on the loader/producer thread.
void pushWasm(xdr::xvector<uint8_t> const& vec);
// This gets called in a loop on the compiler/consumer threads. It returns
// true if anything was actually compiled.
bool popAndCompileWasm(size_t thread, std::unique_lock<std::mutex>& lock);

public:
SharedModuleCacheCompiler(
stellar::Application& app,
stellar::rust_bridge::SorobanModuleCache& moduleCache,
std::vector<uint32_t> const& ledgerVersions);
void run();
};
15 changes: 15 additions & 0 deletions src/main/ApplicationUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "main/PersistentState.h"
#include "main/StellarCoreVersion.h"
#include "overlay/OverlayManager.h"
#include "rust/RustBridge.h"
#include "scp/LocalNode.h"
#include "util/GlobalChecks.h"
#include "util/Logging.h"
Expand Down Expand Up @@ -1034,4 +1035,18 @@ listContracts(Config const& cfg)
return 0;
}

int
compileContracts(Config const& cfg)
{
VirtualClock clock(VirtualClock::REAL_TIME);
auto config = cfg;
config.setNoListen();
auto app = Application::create(clock, config, /* newDB */ false);
// Initializing the ledgerManager will, in restoring the last known ledger,
// also cause all contracts to be compiled. All we need to do is start and
// stop the application.
app->start();
return 0;
}

}
1 change: 1 addition & 0 deletions src/main/ApplicationUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,5 @@ std::optional<uint32_t>
getStellarCoreMajorReleaseVersion(std::string const& vstr);

int listContracts(Config const& cfg);
int compileContracts(Config const& cfg);
}
9 changes: 9 additions & 0 deletions src/main/CommandLine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1478,8 +1478,15 @@ runListContracts(CommandLineArgs const& args)

return runWithHelp(args, {configurationParser(configOption)},
[&] { return listContracts(configOption.getConfig()); });
}

int
runCompileContracts(CommandLineArgs const& args)
{
CommandLine::ConfigOption configOption;

return runWithHelp(args, {configurationParser(configOption)}, [&] {
return compileContracts(configOption.getConfig());
});
}

Expand Down Expand Up @@ -1960,6 +1967,8 @@ handleCommandLine(int argc, char* const* argv)
{"list-contracts",
"List sha256 hashes of all contract code entries in the bucket list",
runListContracts},
{"compile-contracts", "Compile wasm files and cache them",
runCompileContracts},
{"version", "print version information", runVersion}}};

auto adjustedCommandLine = commandLine.adjustCommandLine({argc, argv});
Expand Down
Loading

0 comments on commit 89d85c2

Please sign in to comment.