Skip to content

Commit

Permalink
feat: converted db.rs to use prepare_cached() for all statements. (#2161
Browse files Browse the repository at this point in the history
)
  • Loading branch information
shawn-zil authored Jan 15, 2025
1 parent 026c32a commit e1dcc0f
Showing 1 changed file with 77 additions and 91 deletions.
168 changes: 77 additions & 91 deletions zilliqa/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,9 @@ impl Db {
connection.pragma_update(None, "cache_size", (1 << 28) / page_size)?;
let cache_size: i32 = connection.pragma_query_value(None, "cache_size", |r| r.get(0))?;

// increase size of prepared cache
connection.set_prepared_statement_cache_capacity(128); // default is 16, which is small

tracing::info!(
?journal_mode,
?journal_size_limit,
Expand Down Expand Up @@ -567,18 +570,15 @@ impl Db {
.db
.lock()
.unwrap()
.query_row_and_then(
"SELECT block_hash FROM blocks WHERE view = ?1",
[view],
|row| row.get(0),
)
.prepare_cached("SELECT block_hash FROM blocks WHERE view = ?1")?
.query_row([view], |row| row.get(0))
.optional()?)
}

pub fn set_finalized_view_with_db_tx(&self, sqlite_tx: &Connection, view: u64) -> Result<()> {
sqlite_tx
.execute("INSERT INTO tip_info (finalized_view) VALUES (?1) ON CONFLICT DO UPDATE SET finalized_view = ?1",
[view])?;
.prepare_cached("INSERT INTO tip_info (finalized_view) VALUES (?1) ON CONFLICT DO UPDATE SET finalized_view = ?1")?
.execute([view])?;
Ok(())
}

Expand All @@ -591,16 +591,17 @@ impl Db {
.db
.lock()
.unwrap()
.query_row("SELECT finalized_view FROM tip_info", (), |row| row.get(0))
.prepare_cached("SELECT finalized_view FROM tip_info")?
.query_row((), |row| row.get(0))
.optional()
.unwrap_or(None))
}

/// Write view and timestamp to table if view is larger than current. Return true if write was successful
pub fn set_view_with_db_tx(&self, sqlite_tx: &Connection, view: u64) -> Result<bool> {
let res = sqlite_tx
.execute("INSERT INTO tip_info (view) VALUES (?1) ON CONFLICT(_single_row) DO UPDATE SET view = ?1 WHERE tip_info.view IS NULL OR tip_info.view < ?1",
[view])?;
.prepare_cached("INSERT INTO tip_info (view) VALUES (?1) ON CONFLICT(_single_row) DO UPDATE SET view = ?1 WHERE tip_info.view IS NULL OR tip_info.view < ?1",)?
.execute([view])?;
Ok(res != 0)
}

Expand All @@ -613,7 +614,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row("SELECT view FROM tip_info", (), |row| row.get(0))
.prepare_cached("SELECT view FROM tip_info")?
.query_row((), |row| row.get(0))
.optional()
.unwrap_or(None))
}
Expand All @@ -626,11 +628,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row_and_then(
"SELECT height FROM blocks ORDER BY height DESC LIMIT 1",
(),
|row| row.get(0),
)
.prepare_cached("SELECT height FROM blocks ORDER BY height DESC LIMIT 1")?
.query_row((), |row| row.get(0))
.optional()?)
}

Expand All @@ -639,9 +638,9 @@ impl Db {
.db
.lock()
.unwrap()
.query_row_and_then(
// Two queries here are deliberate to ensure the index on `height` column is used
"SELECT height from (SELECT height, is_canonical FROM blocks ORDER BY height DESC) WHERE is_canonical = 1 LIMIT 1",
// Two queries here are deliberate to ensure the index on `height` column is used
.prepare_cached("SELECT height from (SELECT height, is_canonical FROM blocks ORDER BY height DESC) WHERE is_canonical = 1 LIMIT 1",)?
.query_row(
(),
|row| row.get(0),
)
Expand All @@ -663,8 +662,8 @@ impl Db {
sqlite_tx: &Connection,
high_qc: QuorumCertificate,
) -> Result<()> {
sqlite_tx.execute(
"INSERT INTO tip_info (high_qc, high_qc_updated_at) VALUES (:high_qc, :timestamp) ON CONFLICT DO UPDATE SET high_qc = :high_qc, high_qc_updated_at = :timestamp",
sqlite_tx.prepare_cached("INSERT INTO tip_info (high_qc, high_qc_updated_at) VALUES (:high_qc, :timestamp) ON CONFLICT DO UPDATE SET high_qc = :high_qc, high_qc_updated_at = :timestamp",)?
.execute(
named_params! {
":high_qc": high_qc,
":timestamp": SystemTimeSqlable(SystemTime::now())
Expand All @@ -681,7 +680,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row("SELECT high_qc FROM tip_info", (), |row| row.get(0))
.prepare_cached("SELECT high_qc FROM tip_info")?
.query_row((), |row| row.get(0))
.optional()?
.flatten())
}
Expand All @@ -691,9 +691,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row("SELECT high_qc_updated_at FROM tip_info", (), |row| {
row.get::<_, SystemTimeSqlable>(0)
})
.prepare_cached("SELECT high_qc_updated_at FROM tip_info")?
.query_row((), |row| row.get::<_, SystemTimeSqlable>(0))
.optional()
.unwrap_or(None)
.map(Into::<SystemTime>::into))
Expand All @@ -705,10 +704,11 @@ impl Db {
address: Address,
txn_hash: Hash,
) -> Result<()> {
sqlite_tx.execute(
"INSERT OR IGNORE INTO touched_address_index (address, tx_hash) VALUES (?1, ?2)",
(AddressSqlable(address), txn_hash),
)?;
sqlite_tx
.prepare_cached(
"INSERT OR IGNORE INTO touched_address_index (address, tx_hash) VALUES (?1, ?2)",
)?
.execute((AddressSqlable(address), txn_hash))?;
Ok(())
}

Expand All @@ -733,11 +733,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row(
"SELECT data FROM transactions WHERE tx_hash = ?1",
[txn_hash],
|row| row.get(0),
)
.prepare_cached("SELECT data FROM transactions WHERE tx_hash = ?1")?
.query_row([txn_hash], |row| row.get(0))
.optional()?)
}

Expand All @@ -746,11 +743,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row(
"SELECT 1 FROM transactions WHERE tx_hash = ?1",
[hash],
|row| row.get::<_, i64>(0),
)
.prepare_cached("SELECT 1 FROM transactions WHERE tx_hash = ?1")?
.query_row([hash], |row| row.get::<_, i64>(0))
.optional()?
.is_some())
}
Expand All @@ -761,10 +755,9 @@ impl Db {
hash: &Hash,
tx: &SignedTransaction,
) -> Result<()> {
sqlite_tx.execute(
"INSERT OR IGNORE INTO transactions (tx_hash, data) VALUES (?1, ?2)",
(hash, tx),
)?;
sqlite_tx
.prepare_cached("INSERT OR IGNORE INTO transactions (tx_hash, data) VALUES (?1, ?2)")?
.execute((hash, tx))?;
Ok(())
}

Expand All @@ -776,8 +769,7 @@ impl Db {

pub fn remove_transactions_executed_in_block(&self, block_hash: &Hash) -> Result<()> {
// foreign key triggers will take care of receipts and touched_address_index
self.db.lock().unwrap().execute(
"DELETE FROM transactions WHERE tx_hash IN (SELECT tx_hash FROM receipts WHERE block_hash = ?1)",
self.db.lock().unwrap().prepare_cached("DELETE FROM transactions WHERE tx_hash IN (SELECT tx_hash FROM receipts WHERE block_hash = ?1)",)?.execute(
[block_hash],
)?;
Ok(())
Expand All @@ -788,11 +780,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row(
"SELECT block_hash FROM receipts WHERE tx_hash = ?1",
[tx_hash],
|row| row.get(0),
)
.prepare_cached("SELECT block_hash FROM receipts WHERE tx_hash = ?1")?
.query_row([tx_hash], |row| row.get(0))
.optional()?)
}

Expand All @@ -806,10 +795,9 @@ impl Db {
hash: Hash,
block: &Block,
) -> Result<()> {
sqlite_tx.execute(
"INSERT INTO blocks
(block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg, is_canonical)
VALUES (:block_hash, :view, :height, :qc, :signature, :state_root_hash, :transactions_root_hash, :receipts_root_hash, :timestamp, :gas_used, :gas_limit, :agg, TRUE)",
sqlite_tx.prepare_cached( "INSERT INTO blocks
(block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg, is_canonical)
VALUES (:block_hash, :view, :height, :qc, :signature, :state_root_hash, :transactions_root_hash, :receipts_root_hash, :timestamp, :gas_used, :gas_limit, :agg, TRUE)",)?.execute(
named_params! {
":block_hash": hash,
":view": block.header.view,
Expand All @@ -828,18 +816,20 @@ impl Db {
}

pub fn mark_block_as_canonical(&self, hash: Hash) -> Result<()> {
self.db.lock().unwrap().execute(
"UPDATE blocks SET is_canonical = TRUE WHERE block_hash = ?1",
[hash],
)?;
self.db
.lock()
.unwrap()
.prepare_cached("UPDATE blocks SET is_canonical = TRUE WHERE block_hash = ?1")?
.execute([hash])?;
Ok(())
}

pub fn mark_block_as_non_canonical(&self, hash: Hash) -> Result<()> {
self.db.lock().unwrap().execute(
"UPDATE blocks SET is_canonical = FALSE WHERE block_hash = ?1",
[hash],
)?;
self.db
.lock()
.unwrap()
.prepare_cached("UPDATE blocks SET is_canonical = FALSE WHERE block_hash = ?1")?
.execute([hash])?;
Ok(())
}

Expand All @@ -848,10 +838,11 @@ impl Db {
}

pub fn remove_block(&self, block: &Block) -> Result<()> {
self.db.lock().unwrap().execute(
"DELETE FROM blocks WHERE block_hash = ?1",
[block.header.hash],
)?;
self.db
.lock()
.unwrap()
.prepare_cached("DELETE FROM blocks WHERE block_hash = ?1")?
.execute([block.header.hash])?;
Ok(())
}

Expand All @@ -877,7 +868,7 @@ impl Db {
}
macro_rules! query_block {
($cond: tt, $key: tt) => {
self.db.lock().unwrap().query_row(concat!("SELECT block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg FROM blocks WHERE ", $cond), [$key], make_block).optional()?
self.db.lock().unwrap().prepare_cached(concat!("SELECT block_hash, view, height, qc, signature, state_root_hash, transactions_root_hash, receipts_root_hash, timestamp, gas_used, gas_limit, agg FROM blocks WHERE ", $cond),)?.query_row([$key], make_block).optional()?
};
}
Ok(match filter {
Expand Down Expand Up @@ -925,11 +916,8 @@ impl Db {
.db
.lock()
.unwrap()
.query_row(
"SELECT 1 FROM blocks WHERE block_hash = ?1",
[block_hash],
|row| row.get::<_, i64>(0),
)
.prepare_cached("SELECT 1 FROM blocks WHERE block_hash = ?1")?
.query_row([block_hash], |row| row.get::<_, i64>(0))
.optional()?
.is_some())
}
Expand Down Expand Up @@ -966,10 +954,9 @@ impl Db {
sqlite_tx: &Connection,
receipt: TransactionReceipt,
) -> Result<()> {
sqlite_tx.execute(
"INSERT INTO receipts
sqlite_tx.prepare_cached("INSERT INTO receipts
(tx_hash, block_hash, tx_index, success, gas_used, cumulative_gas_used, contract_address, logs, transitions, accepted, errors, exceptions)
VALUES (:tx_hash, :block_hash, :tx_index, :success, :gas_used, :cumulative_gas_used, :contract_address, :logs, :transitions, :accepted, :errors, :exceptions)",
VALUES (:tx_hash, :block_hash, :tx_index, :success, :gas_used, :cumulative_gas_used, :contract_address, :logs, :transitions, :accepted, :errors, :exceptions)",)?.execute(
named_params! {
":tx_hash": receipt.tx_hash,
":block_hash": receipt.block_hash,
Expand All @@ -993,7 +980,7 @@ impl Db {
}

pub fn get_transaction_receipt(&self, txn_hash: &Hash) -> Result<Option<TransactionReceipt>> {
Ok(self.db.lock().unwrap().query_row("SELECT tx_hash, block_hash, tx_index, success, gas_used, cumulative_gas_used, contract_address, logs, transitions, accepted, errors, exceptions FROM receipts WHERE tx_hash = ?1", [txn_hash], Self::make_receipt).optional()?)
Ok(self.db.lock().unwrap().prepare_cached("SELECT tx_hash, block_hash, tx_index, success, gas_used, cumulative_gas_used, contract_address, logs, transitions, accepted, errors, exceptions FROM receipts WHERE tx_hash = ?1",)?.query_row( [txn_hash], Self::make_receipt).optional()?)
}

pub fn get_transaction_receipts_in_block(
Expand All @@ -1007,7 +994,8 @@ impl Db {
self.db
.lock()
.unwrap()
.execute("DELETE FROM receipts WHERE block_hash = ?1", [block_hash])?;
.prepare_cached("DELETE FROM receipts WHERE block_hash = ?1")?
.execute([block_hash])?;
Ok(())
}

Expand Down Expand Up @@ -1050,16 +1038,15 @@ impl Db {
pub fn forget_block_range(&self, blocks: Range<u64>) -> Result<()> {
self.with_sqlite_tx(move |tx| {
// Remove everything!
tx.execute("DELETE FROM tip_info WHERE finalized_view IN (SELECT view FROM blocks WHERE height >= :low AND height < :high)",
tx.prepare_cached("DELETE FROM tip_info WHERE finalized_view IN (SELECT view FROM blocks WHERE height >= :low AND height < :high)",)?.execute(
named_params! {
":low" : blocks.start,
":high" : blocks.end } )?;
tx.execute("DELETE FROM receipts WHERE block_hash IN (SELECT block_hash FROM blocks WHERE height >= :low AND height < :high)",
tx.prepare_cached("DELETE FROM receipts WHERE block_hash IN (SELECT block_hash FROM blocks WHERE height >= :low AND height < :high)",)?.execute(
named_params! {
":low": blocks.start,
":high": blocks.end })?;
tx.execute(
"DELETE FROM blocks WHERE height >= :low AND height < :high",
tx.prepare_cached( "DELETE FROM blocks WHERE height >= :low AND height < :high",)?.execute(
named_params! {
":low": blocks.start,
":high": blocks.end },
Expand Down Expand Up @@ -1243,7 +1230,8 @@ impl TrieStorage {
self.db
.lock()
.unwrap()
.execute(&query, rusqlite::params_from_iter(params))?;
.prepare_cached(&query)?
.execute(rusqlite::params_from_iter(params))?;
for (key, value) in keys.iter().zip(values) {
let _ = self
.cache
Expand All @@ -1269,11 +1257,8 @@ impl eth_trie::DB for TrieStorage {
.db
.lock()
.unwrap()
.query_row(
"SELECT value FROM state_trie WHERE key = ?1",
[key],
|row| row.get(0),
)
.prepare_cached("SELECT value FROM state_trie WHERE key = ?1")?
.query_row([key], |row| row.get(0))
.optional()?;

let mut cache = self.cache.lock().unwrap();
Expand All @@ -1287,10 +1272,11 @@ impl eth_trie::DB for TrieStorage {
}

fn insert(&self, key: &[u8], value: Vec<u8>) -> Result<(), Self::Error> {
self.db.lock().unwrap().execute(
"INSERT OR REPLACE INTO state_trie (key, value) VALUES (?1, ?2)",
(key, &value),
)?;
self.db
.lock()
.unwrap()
.prepare_cached("INSERT OR REPLACE INTO state_trie (key, value) VALUES (?1, ?2)")?
.execute((key, &value))?;
let _ = self.cache.lock().unwrap().insert(key.to_vec(), value);
Ok(())
}
Expand Down

0 comments on commit e1dcc0f

Please sign in to comment.