Skip to content

Commit

Permalink
Support multiple sessions in Database class
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Jan 3, 2025
1 parent 483b558 commit 229beb4
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 67 deletions.
121 changes: 70 additions & 51 deletions src/database/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ using namespace std;

bool Database::gDriversRegistered = false;

// smallest schema version supported
static unsigned long const MIN_SCHEMA_VERSION = 21;
static unsigned long const SCHEMA_VERSION = 24;

// These should always match our compiled version precisely, since we are
// using a bundled version to get access to carray(). But in case someone
// overrides that or our build configuration changes, it's nicer to get a
Expand Down Expand Up @@ -184,6 +180,7 @@ Database::Database(Application& app)
: mApp(app)
, mQueryMeter(
app.getMetrics().NewMeter({"database", "query", "exec"}, "query"))
, mSession("main")
, mStatementsSize(
app.getMetrics().NewCounter({"database", "memory", "statements"}))
{
Expand All @@ -198,17 +195,17 @@ Database::Database(Application& app)
void
Database::open()
{
mSession.open(mApp.getConfig().DATABASE.value);
DatabaseConfigureSessionOp op(mSession);
doDatabaseTypeSpecificOperation(op);
mSession.session().open(mApp.getConfig().DATABASE.value);
DatabaseConfigureSessionOp op(mSession.session());
doDatabaseTypeSpecificOperation(op, mSession);
}

void
Database::applySchemaUpgrade(unsigned long vers)
{
clearPreparedStatementCache();
clearPreparedStatementCache(mSession);

soci::transaction tx(mSession);
soci::transaction tx(mSession.session());
switch (vers)
{
case 22:
Expand All @@ -220,6 +217,7 @@ Database::applySchemaUpgrade(unsigned long vers)
break;
case 24:
getSession() << "DROP TABLE IF EXISTS pubsub;";
mApp.getPersistentState().migrateToSlotStateTable();
break;
default:
throw std::runtime_error("Unknown DB schema version");
Expand Down Expand Up @@ -263,62 +261,66 @@ Database::upgradeToCurrentSchema()
void
Database::maybeUpgradeToBucketListDB()
{
if (mApp.getPersistentState().getState(PersistentState::kDBBackend) !=
if (mApp.getPersistentState().getState(PersistentState::kDBBackend,
getSession()) !=
BucketIndex::DB_BACKEND_STATE)
{
CLOG_INFO(Database, "Upgrading to BucketListDB");

// Drop all LedgerEntry tables except for offers
CLOG_INFO(Database, "Dropping table accounts");
getSession() << "DROP TABLE IF EXISTS accounts;";
getRawSession() << "DROP TABLE IF EXISTS accounts;";

CLOG_INFO(Database, "Dropping table signers");
getSession() << "DROP TABLE IF EXISTS signers;";
getRawSession() << "DROP TABLE IF EXISTS signers;";

CLOG_INFO(Database, "Dropping table claimablebalance");
getSession() << "DROP TABLE IF EXISTS claimablebalance;";
getRawSession() << "DROP TABLE IF EXISTS claimablebalance;";

CLOG_INFO(Database, "Dropping table configsettings");
getSession() << "DROP TABLE IF EXISTS configsettings;";
getRawSession() << "DROP TABLE IF EXISTS configsettings;";

CLOG_INFO(Database, "Dropping table contractcode");
getSession() << "DROP TABLE IF EXISTS contractcode;";
getRawSession() << "DROP TABLE IF EXISTS contractcode;";

CLOG_INFO(Database, "Dropping table contractdata");
getSession() << "DROP TABLE IF EXISTS contractdata;";
getRawSession() << "DROP TABLE IF EXISTS contractdata;";

CLOG_INFO(Database, "Dropping table accountdata");
getSession() << "DROP TABLE IF EXISTS accountdata;";
getRawSession() << "DROP TABLE IF EXISTS accountdata;";

CLOG_INFO(Database, "Dropping table liquiditypool");
getSession() << "DROP TABLE IF EXISTS liquiditypool;";
getRawSession() << "DROP TABLE IF EXISTS liquiditypool;";

CLOG_INFO(Database, "Dropping table trustlines");
getSession() << "DROP TABLE IF EXISTS trustlines;";
getRawSession() << "DROP TABLE IF EXISTS trustlines;";

CLOG_INFO(Database, "Dropping table ttl");
getSession() << "DROP TABLE IF EXISTS ttl;";
getRawSession() << "DROP TABLE IF EXISTS ttl;";

mApp.getPersistentState().setState(PersistentState::kDBBackend,
BucketIndex::DB_BACKEND_STATE);
BucketIndex::DB_BACKEND_STATE,
getSession());
}
}

void
Database::putSchemaVersion(unsigned long vers)
{
mApp.getPersistentState().setState(PersistentState::kDatabaseSchema,
std::to_string(vers));
std::to_string(vers),
mApp.getDatabase().getSession());
}

unsigned long
Database::getDBSchemaVersion()
{
releaseAssert(threadIsMain());
unsigned long vers = 0;
try
{
auto vstr = mApp.getPersistentState().getState(
PersistentState::kDatabaseSchema);
PersistentState::kDatabaseSchema, getSession());
vers = std::stoul(vstr);
}
catch (...)
Expand All @@ -332,16 +334,9 @@ Database::getDBSchemaVersion()
return vers;
}

unsigned long
Database::getAppSchemaVersion()
{
return SCHEMA_VERSION;
}

medida::TimerContext
Database::getInsertTimer(std::string const& entityName)
{
mEntityTypes.insert(entityName);
mQueryMeter.Mark();
return mApp.getMetrics()
.NewTimer({"database", "insert", entityName})
Expand All @@ -351,7 +346,6 @@ Database::getInsertTimer(std::string const& entityName)
medida::TimerContext
Database::getSelectTimer(std::string const& entityName)
{
mEntityTypes.insert(entityName);
mQueryMeter.Mark();
return mApp.getMetrics()
.NewTimer({"database", "select", entityName})
Expand All @@ -361,7 +355,6 @@ Database::getSelectTimer(std::string const& entityName)
medida::TimerContext
Database::getDeleteTimer(std::string const& entityName)
{
mEntityTypes.insert(entityName);
mQueryMeter.Mark();
return mApp.getMetrics()
.NewTimer({"database", "delete", entityName})
Expand All @@ -371,7 +364,6 @@ Database::getDeleteTimer(std::string const& entityName)
medida::TimerContext
Database::getUpdateTimer(std::string const& entityName)
{
mEntityTypes.insert(entityName);
mQueryMeter.Mark();
return mApp.getMetrics()
.NewTimer({"database", "update", entityName})
Expand All @@ -381,7 +373,6 @@ Database::getUpdateTimer(std::string const& entityName)
medida::TimerContext
Database::getUpsertTimer(std::string const& entityName)
{
mEntityTypes.insert(entityName);
mQueryMeter.Mark();
return mApp.getMetrics()
.NewTimer({"database", "upsert", entityName})
Expand All @@ -393,7 +384,8 @@ Database::setCurrentTransactionReadOnly()
{
if (!isSqlite())
{
auto prep = getPreparedStatement("SET TRANSACTION READ ONLY");
auto prep =
getPreparedStatement("SET TRANSACTION READ ONLY", getSession());
auto& st = prep.statement();
st.define_and_bind();
st.execute(false);
Expand Down Expand Up @@ -429,14 +421,31 @@ Database::canUsePool() const
void
Database::clearPreparedStatementCache()
{
std::lock_guard<std::mutex> lock(mStatementsMutex);
for (auto& c : mCaches)
{
for (auto& st : c.second)
{
st.second->clean_up(true);
}
}
mCaches.clear();
mStatementsSize.set_count(0);
}

void
Database::clearPreparedStatementCache(SessionWrapper& session)
{
std::lock_guard<std::mutex> lock(mStatementsMutex);

// Flush all prepared statements; in sqlite they represent open cursors
// and will conflict with any DROP TABLE commands issued below
for (auto st : mStatements)
for (auto st : mCaches[session.getSessionName()])
{
st.second->clean_up(true);
mStatementsSize.dec();
}
mStatements.clear();
mStatementsSize.set_count(mStatements.size());
mCaches.erase(session.getSessionName());
}

void
Expand All @@ -452,8 +461,9 @@ Database::initialize()
int i;
std::string databaseName, databaseLocation;
soci::statement st =
(mSession.prepare << "PRAGMA database_list;", soci::into(i),
soci::into(databaseName), soci::into(databaseLocation));
(getRawSession().prepare << "PRAGMA database_list;",
soci::into(i), soci::into(databaseName),
soci::into(databaseLocation));
st.execute(true);
while (st.got_data())
{
Expand All @@ -466,7 +476,7 @@ Database::initialize()
}
if (!fn.empty() && fs::exists(fn))
{
mSession.close();
getRawSession().close();
std::remove(fn.c_str());
open();
}
Expand All @@ -476,7 +486,7 @@ Database::initialize()

// only time this section should be modified is when
// consolidating changes found in applySchemaUpgrade here
Upgrades::dropAll(*this);
Upgrades::dropSupportUpgradeHistory(*this);
OverlayManager::dropAll(*this);
PersistentState::dropAll(*this);
LedgerHeaderUtils::dropAll(*this);
Expand All @@ -487,21 +497,26 @@ Database::initialize()
HerderPersistence::dropAll(*this);
BanManager::dropAll(*this);
putSchemaVersion(MIN_SCHEMA_VERSION);
mApp.getHerderPersistence().createQuorumTrackingTable(mSession);

LOG_INFO(DEFAULT_LOG, "* ");
LOG_INFO(DEFAULT_LOG, "* The database has been initialized");
LOG_INFO(DEFAULT_LOG, "* ");
}

soci::session&
SessionWrapper&
Database::getSession()
{
// global session can only be used from the main thread
releaseAssert(threadIsMain());
return mSession;
}

soci::session&
Database::getRawSession()
{
return getSession().session();
}

soci::connection_pool&
Database::getPool()
{
Expand Down Expand Up @@ -568,17 +583,21 @@ class SQLLogContext : NonCopyable
};

StatementContext
Database::getPreparedStatement(std::string const& query)
Database::getPreparedStatement(std::string const& query,
SessionWrapper& session)
{
auto i = mStatements.find(query);
std::lock_guard<std::mutex> lock(mStatementsMutex);

auto& cache = mCaches[session.getSessionName()];
auto i = cache.find(query);
std::shared_ptr<soci::statement> p;
if (i == mStatements.end())
if (i == cache.end())
{
p = std::make_shared<soci::statement>(mSession);
p = std::make_shared<soci::statement>(session.session());
p->alloc();
p->prepare(query);
mStatements.insert(std::make_pair(query, p));
mStatementsSize.set_count(mStatements.size());
cache.insert(std::make_pair(query, p));
mStatementsSize.inc();
}
else
{
Expand All @@ -591,6 +610,6 @@ Database::getPreparedStatement(std::string const& query)
std::shared_ptr<SQLLogContext>
Database::captureAndLogSQL(std::string contextName)
{
return make_shared<SQLLogContext>(contextName, mSession);
return make_shared<SQLLogContext>(contextName, mSession.session());
}
}
Loading

0 comments on commit 229beb4

Please sign in to comment.