Skip to content

Commit

Permalink
Address the migration TODO
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Dec 18, 2024
1 parent 1792481 commit 0936264
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 31 deletions.
3 changes: 3 additions & 0 deletions src/database/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ Database::applySchemaUpgrade(unsigned long vers)
mApp.getHistoryManager().dropSQLBasedPublish();
Upgrades::dropSupportUpgradeHistory(*this);
break;
case 24:
mApp.getPersistentState().migrateToSlotStateTable();
break;
default:
throw std::runtime_error("Unknown DB schema version");
}
Expand Down
2 changes: 1 addition & 1 deletion src/database/Database.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ using PreparedStatementCache =

// smallest schema version supported
static constexpr unsigned long MIN_SCHEMA_VERSION = 21;
static constexpr unsigned long SCHEMA_VERSION = 23;
static constexpr unsigned long SCHEMA_VERSION = 24;

/**
* Helper class for borrowing a SOCI prepared statement handle into a local
Expand Down
6 changes: 3 additions & 3 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2008,7 +2008,7 @@ HerderImpl::restoreSCPState()

// Load all known tx sets
auto latestTxSets = mApp.getPersistentState().getTxSetsForAllSlots();
for (auto const& txSet : latestTxSets)
for (auto const& [_, txSet] : latestTxSets)
{
try
{
Expand Down Expand Up @@ -2037,7 +2037,7 @@ HerderImpl::restoreSCPState()
// load saved state from database
auto latest64 = mApp.getPersistentState().getSCPStateAllSlots();

for (auto const& state : latest64)
for (auto const& [_, state] : latest64)
{
try
{
Expand Down Expand Up @@ -2244,7 +2244,7 @@ HerderImpl::purgeOldPersistedTxSets()
{
auto hashesToDelete =
mApp.getPersistentState().getTxSetHashesForAllSlots();
for (auto const& state :
for (auto const& [_, state] :
mApp.getPersistentState().getSCPStateAllSlots())
{
try
Expand Down
110 changes: 88 additions & 22 deletions src/main/PersistentState.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,25 @@ std::string PersistentState::kSQLCreateSCPStatement =
"state TEXT"
"); ";

std::string PersistentState::LCLTableName = "storestate";
std::string PersistentState::SlotTableName = "slotstate";
std::string PersistentState::kLCLTableName = "storestate";
std::string PersistentState::kSlotTableName = "slotstate";

PersistentState::PersistentState(Application& app) : mApp(app)
{
releaseAssert(threadIsMain());
}

void
PersistentState::deleteTxSets(std::unordered_set<Hash> hashesToDelete)
PersistentState::deleteTxSets(std::unordered_set<Hash> hashesToDelete,
std::string table)
{
releaseAssert(threadIsMain());
soci::transaction tx(mApp.getDatabase().getRawSession());
for (auto const& hash : hashesToDelete)
{
auto name = getStoreStateNameForTxSet(hash);
auto prep = mApp.getDatabase().getPreparedStatement(
"DELETE FROM slotstate WHERE statename = :n;",
fmt::format("DELETE FROM {} WHERE statename = :n;", table),
mApp.getDatabase().getSession());

auto& st = prep.statement();
Expand All @@ -64,6 +65,66 @@ PersistentState::deleteTxSets(std::unordered_set<Hash> hashesToDelete)
tx.commit();
}

void
PersistentState::migrateToSlotStateTable()
{
// No soci::transaction needed, because the migration in Database.cpp wraps
// everything in one transaction anyway.
releaseAssert(threadIsMain());
auto& db = mApp.getDatabase();

// First, create the new table
db.getRawSession() << PersistentState::kSQLCreateSCPStatement;

// Migrate all the tx sets
auto txSets = getTxSetsForAllSlots(kLCLTableName);
std::unordered_set<Hash> keysToDelete;
for (auto const& txSet : txSets)
{
CLOG_INFO(Herder, "Migrating tx set {} to slotstate",
hexAbbrev(txSet.first));
updateDb(getStoreStateNameForTxSet(txSet.first), txSet.second,
db.getSession(), kSlotTableName);
keysToDelete.insert(txSet.first);
}

// Cleanup tx sets from the previous table
deleteTxSets(keysToDelete, kLCLTableName);

// Migrate all SCP slot data
auto scpStates = getSCPStateAllSlots(kLCLTableName);
for (auto const& [i, scpState] : scpStates)
{
CLOG_INFO(Herder, "Migrating SCP state for slot {} to slotstate", i);
setSCPStateForSlot(i, scpState);
auto prep = mApp.getDatabase().getPreparedStatement(
"DELETE FROM storestate WHERE statename = :n;",
mApp.getDatabase().getSession());

auto& st = prep.statement();
st.exchange(soci::use(getStoreStateName(kLastSCPDataXDR, i)));
st.define_and_bind();
st.execute(true);
}

// Migrate upgrade data
auto upgrades = getFromDb(getStoreStateName(kLedgerUpgrades),
db.getSession(), kLCLTableName);
if (!upgrades.empty())
{
updateDb(getStoreStateName(kLedgerUpgrades), upgrades, db.getSession(),
kSlotTableName);
auto prep = mApp.getDatabase().getPreparedStatement(
"DELETE FROM storestate WHERE statename = :n;",
mApp.getDatabase().getSession());

auto& st = prep.statement();
st.exchange(soci::use(getStoreStateName(kLedgerUpgrades)));
st.define_and_bind();
st.execute(true);
}
}

void
PersistentState::dropAll(Database& db)
{
Expand All @@ -72,7 +133,6 @@ PersistentState::dropAll(Database& db)
soci::statement st = db.getRawSession().prepare << kSQLCreateStatement;
st.execute(true);

// TODO: add a migration to move data to slotstate
db.getRawSession() << "DROP TABLE IF EXISTS slotstate;";
soci::statement st2 = db.getRawSession().prepare << kSQLCreateSCPStatement;
st2.execute(true);
Expand Down Expand Up @@ -126,7 +186,7 @@ std::string
PersistentState::getDBForEntry(PersistentState::Entry entry)
{
releaseAssert(entry != kLastEntry);
return entry <= kRebuildLedger ? LCLTableName : SlotTableName;
return entry <= kRebuildLedger ? kLCLTableName : kSlotTableName;
}

std::string
Expand All @@ -144,21 +204,21 @@ PersistentState::setState(PersistentState::Entry entry,
updateDb(getStoreStateName(entry), value, session, getDBForEntry(entry));
}

std::vector<std::string>
PersistentState::getSCPStateAllSlots()
std::unordered_map<uint32_t, std::string>
PersistentState::getSCPStateAllSlots(std::string table)
{
ZoneScoped;
releaseAssert(threadIsMain());

// Collect all slots persisted
std::vector<std::string> states;
std::unordered_map<uint32_t, std::string> states;
for (uint32 i = 0; i <= mApp.getConfig().MAX_SLOTS_TO_REMEMBER; i++)
{
auto val = getFromDb(getStoreStateName(kLastSCPDataXDR, i),
mApp.getDatabase().getSession(), SlotTableName);
mApp.getDatabase().getSession(), table);
if (!val.empty())
{
states.push_back(val);
states.emplace(i, val);
}
}

Expand All @@ -174,7 +234,7 @@ PersistentState::setSCPStateForSlot(uint64 slot, std::string const& value)
auto slotIdx = static_cast<uint32>(
slot % (mApp.getConfig().MAX_SLOTS_TO_REMEMBER + 1));
updateDb(getStoreStateName(kLastSCPDataXDR, slotIdx), value,
mApp.getDatabase().getSession(), SlotTableName);
mApp.getDatabase().getSession(), kSlotTableName);
}

void
Expand All @@ -190,7 +250,7 @@ PersistentState::setSCPStateV1ForSlot(
for (auto const& txSet : txSets)
{
updateDb(getStoreStateNameForTxSet(txSet.first), txSet.second,
mApp.getDatabase().getSession(), SlotTableName);
mApp.getDatabase().getSession(), kSlotTableName);
}
tx.commit();
}
Expand All @@ -202,7 +262,7 @@ PersistentState::shouldRebuildForType(LedgerEntryType let)
releaseAssert(threadIsMain());

return !getFromDb(getStoreStateName(kRebuildLedger, let),
mApp.getDatabase().getSession(), LCLTableName)
mApp.getDatabase().getSession(), kLCLTableName)
.empty();
}

Expand All @@ -213,7 +273,7 @@ PersistentState::clearRebuildForType(LedgerEntryType let)
releaseAssert(threadIsMain());

updateDb(getStoreStateName(kRebuildLedger, let), "",
mApp.getDatabase().getSession(), LCLTableName);
mApp.getDatabase().getSession(), kLCLTableName);
}

void
Expand All @@ -230,7 +290,7 @@ PersistentState::setRebuildForType(LedgerEntryType let)
}

updateDb(getStoreStateName(kRebuildLedger, let), "1",
mApp.getDatabase().getSession(), LCLTableName);
mApp.getDatabase().getSession(), kLCLTableName);
}

void
Expand Down Expand Up @@ -272,21 +332,23 @@ PersistentState::updateDb(std::string const& entry, std::string const& value,
}
}

std::vector<std::string>
PersistentState::getTxSetsForAllSlots()
std::unordered_map<Hash, std::string>
PersistentState::getTxSetsForAllSlots(std::string table)
{
ZoneScoped;
releaseAssert(threadIsMain());

std::vector<std::string> result;
std::unordered_map<Hash, std::string> result;
std::string key;
std::string val;

std::string pattern = mapping[kTxSet] + "%";
std::string statementStr =
"SELECT state FROM slotstate WHERE statename LIKE :n;";
std::string statementStr = fmt::format(
"SELECT statename, state FROM {} WHERE statename LIKE :n;", table);
auto& db = mApp.getDatabase();
auto prep = db.getPreparedStatement(statementStr, db.getSession());
auto& st = prep.statement();
st.exchange(soci::into(key));
st.exchange(soci::into(val));
st.exchange(soci::use(pattern));
st.define_and_bind();
Expand All @@ -295,9 +357,13 @@ PersistentState::getTxSetsForAllSlots()
st.execute(true);
}

Hash hash;
size_t len = binToHex(hash).size();

while (st.got_data())
{
result.push_back(val);
result.emplace(hexToBin256(key.substr(mapping[kTxSet].size(), len)),
val);
st.fetch();
}

Expand Down
14 changes: 9 additions & 5 deletions src/main/PersistentState.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ class PersistentState
SessionWrapper& session);

// Special methods for SCP state (multiple slots)
std::vector<std::string> getSCPStateAllSlots();
std::vector<std::string> getTxSetsForAllSlots();
std::unordered_map<uint32_t, std::string>
getSCPStateAllSlots(std::string table = kSlotTableName);
std::unordered_map<Hash, std::string>
getTxSetsForAllSlots(std::string table = kSlotTableName);
std::unordered_set<Hash> getTxSetHashesForAllSlots();

void
Expand All @@ -57,14 +59,16 @@ class PersistentState
void setRebuildForType(LedgerEntryType let);

bool hasTxSet(Hash const& txSetHash);
void deleteTxSets(std::unordered_set<Hash> hashesToDelete);
void deleteTxSets(std::unordered_set<Hash> hashesToDelete,
std::string table = kSlotTableName);
void migrateToSlotStateTable();

private:
static std::string kSQLCreateStatement;
static std::string kSQLCreateSCPStatement;
static std::string mapping[kLastEntry];
static std::string LCLTableName;
static std::string SlotTableName;
static std::string kLCLTableName;
static std::string kSlotTableName;

Application& mApp;

Expand Down

0 comments on commit 0936264

Please sign in to comment.