Skip to content
This repository has been archived by the owner on Jan 8, 2025. It is now read-only.

mempool: modify maintain_transaction_pool to prune txs #1485

Merged
merged 15 commits into from
Oct 28, 2024
8 changes: 5 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::{env::var, str::FromStr, sync::Arc};

use dotenvy::dotenv;
use eyre::Result;
use kakarot_rpc::{
Expand All @@ -16,6 +14,7 @@ use starknet::{
core::types::Felt,
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};
use std::{env::var, str::FromStr, sync::Arc, time::Duration};
use tracing_opentelemetry::MetricsLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};

Expand Down Expand Up @@ -57,8 +56,11 @@ async fn main() -> Result<()> {
var("RELAYERS_ADDRESSES")?.split(',').filter_map(|addr| Felt::from_str(addr).ok()).collect::<Vec<_>>();
AccountManager::from_addresses(addresses, Arc::clone(&eth_client)).await?.start();

// Transactions should be pruned after 5 minutes in the mempool
let prune_duration = Duration::from_secs(300);

greged93 marked this conversation as resolved.
Show resolved Hide resolved
// Start the maintenance of the mempool
maintain_transaction_pool(Arc::clone(&eth_client));
maintain_transaction_pool(Arc::clone(&eth_client), prune_duration);

// Setup the RPC module
let kakarot_rpc_module = KakarotRpcModuleBuilder::new(eth_client).rpc_module()?;
Expand Down
46 changes: 42 additions & 4 deletions src/pool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use starknet::{
providers::{jsonrpc::HttpTransport, JsonRpcClient},
};
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tokio::{sync::Mutex, time::Instant};
use tracing::instrument;

/// A type alias for the Kakarot Transaction Validator.
Expand Down Expand Up @@ -268,13 +268,27 @@ where

/// Maintains the transaction pool by periodically polling the database in order to
/// fetch the latest block and mark the block's transactions as mined by the node.
pub fn maintain_transaction_pool<SP>(eth_client: Arc<EthClient<SP>>)
pub fn maintain_transaction_pool<SP>(eth_client: Arc<EthClient<SP>>, prune_duration: Duration)
where
SP: starknet::providers::Provider + Send + Sync + Clone + 'static,
{
tokio::spawn(async move {
let mut block_number = 0u64;

// Mapping to store the transactions in the mempool with a timestamp to potentially prune them
let mut mempool_transactions = HashMap::new();

loop {
// Adding the transactions to the mempool mapping with a timestamp
for tx in eth_client
.mempool()
.queued_transactions()
.into_iter()
.chain(eth_client.mempool().pending_transactions())
tcoratger marked this conversation as resolved.
Show resolved Hide resolved
{
mempool_transactions.entry(*tx.hash()).or_insert_with(Instant::now);
}

// Fetch the latest block number
let Ok(current_block_number) = eth_client.eth_provider().block_number().await else {
tracing::error!(target: "maintain_transaction_pool", "failed to fetch current block number");
Expand Down Expand Up @@ -305,7 +319,7 @@ where
chain_spec.base_fee_params_at_timestamp(latest_header.timestamp + 12),
)
.unwrap_or_default(),
pending_blob_fee: latest_header.next_block_blob_fee(),
pending_blob_fee: None,
};
eth_client.mempool().set_block_info(info);

Expand All @@ -324,7 +338,31 @@ where
}

let sealed_block = latest_block.seal(hash);
let mined_transactions = sealed_block.body.transactions.iter().map(|tx| tx.hash).collect();
let mut mined_transactions: Vec<_> =
sealed_block.body.transactions.iter().map(|tx| tx.hash).collect();

// Prune mined transactions from the mempool mapping
for tx_hash in &mined_transactions {
mempool_transactions.remove(tx_hash);
}

// Prune transactions that have been in the mempool for more than 5 minutes
let now = Instant::now();

for (tx_hash, timestamp) in &mempool_transactions.clone() {
tcoratger marked this conversation as resolved.
Show resolved Hide resolved
// - If the transaction has been in the mempool for more than 5 minutes
// - And the transaction is in the mempool right now
if now.duration_since(*timestamp) > prune_duration && eth_client.mempool().contains(tx_hash)
{
tracing::warn!("Transaction {} in mempool for more than 5 minutes. Pruning.", tx_hash);
tcoratger marked this conversation as resolved.
Show resolved Hide resolved

// Add the transaction to the mined transactions so that it can be pruned
mined_transactions.push(*tx_hash);

// Remove the transaction from the mempool mapping
mempool_transactions.remove(tx_hash);
}
}

// Canonical update
let update = CanonicalStateUpdate {
Expand Down
2 changes: 1 addition & 1 deletion src/providers/eth_provider/database/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ impl<T: Default> EthDatabaseFilterBuilder<T> {
}
}

pub(crate) fn format_hex(value: impl LowerHex, width: usize) -> String {
pub fn format_hex(value: impl LowerHex, width: usize) -> String {
// Add 2 to the width to account for the 0x prefix.
let s = format!("{:#0width$x}", value, width = width + 2);
// `s.len() < width` can happen because of the LowerHex implementation
Expand Down
128 changes: 123 additions & 5 deletions tests/tests/mempool.rs
tcoratger marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
#![allow(clippy::used_underscore_binding)]
#![cfg(feature = "testing")]
use alloy_consensus::TxEip1559;
use alloy_consensus::{TxEip1559, EMPTY_ROOT_HASH};
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::{Address, TxKind, U256};
use alloy_primitives::{Address, TxKind, B64, U256};
use alloy_rpc_types::Header;
use kakarot_rpc::{
providers::eth_provider::{error::SignatureError, ChainProvider},
pool::mempool::maintain_transaction_pool,
providers::eth_provider::{
constant::U64_HEX_STRING_LEN,
database::{
filter::{self, format_hex, EthDatabaseFilterBuilder},
types::header::StoredHeader,
},
error::SignatureError,
ChainProvider,
},
test_utils::{
eoa::Eoa,
fixtures::{katana_empty, setup},
fixtures::{katana, katana_empty, setup},
katana::Katana,
},
};
use mongodb::{
bson::doc,
options::{UpdateModifications, UpdateOptions},
};
use reth_primitives::{sign_message, Transaction, TransactionSigned, TransactionSignedEcRecovered};
use reth_transaction_pool::{EthPooledTransaction, TransactionOrigin, TransactionPool};
use reth_transaction_pool::{EthPooledTransaction, PoolTransaction, TransactionOrigin, TransactionPool};
use revm_primitives::B256;
use rstest::*;
use std::{sync::Arc, time::Duration};

#[rstest]
#[awt]
Expand Down Expand Up @@ -351,3 +367,105 @@ pub async fn create_sample_transactions(
}
Ok(transactions)
}

#[rstest]
#[awt]
#[tokio::test(flavor = "multi_thread")]
async fn test_maintain_mempool(#[future] katana: Katana, _setup: ()) {
let eth_client = Arc::new(katana.eth_client());

// Create two sample transactions at once
let transactions = create_sample_transactions(&katana, 2).await.expect("Failed to create sample transactions");

// Extract and ensure we have two valid transactions from the transaction list.
let ((transaction1, _), (transaction2, _)) = (
transactions.first().expect("Expected at least one transaction").clone(),
transactions.get(1).expect("Expected at least two transactions").clone(),
);

// Add transactions to the mempool
eth_client.mempool().add_transaction(TransactionOrigin::Private, transaction1.clone()).await.unwrap();
eth_client.mempool().add_transaction(TransactionOrigin::Private, transaction2.clone()).await.unwrap();

// Start maintaining the transaction pool
//
// This task will periodically prune the mempool based on the given prune_duration.
// For testing purposes, we set the prune_duration to 1 second.
let prune_duration = Duration::from_secs(1);
let eth_client_clone = Arc::clone(&eth_client);
let maintain_task = tokio::spawn(async move {
maintain_transaction_pool(eth_client_clone, prune_duration);
});

// Initialize the block number based on the current blockchain state from katana.
let mut last_block_number = katana.block_number();

// Loop to simulate new blocks being added to the blockchain every 100 milliseconds.
for _ in 0..10 {
// Sleep for 100 milliseconds to simulate the passage of time between blocks.
tokio::time::sleep(Duration::from_millis(100)).await;

// Increment the block number to simulate the blockchain progressing.
last_block_number += 1;

// Format the block number in both padded and unpadded hexadecimal formats.
let unpadded_block_number = format_hex(last_block_number, 0);
let padded_block_number = format_hex(last_block_number, U64_HEX_STRING_LEN);

// Get the block header collection from the database.
let header_collection = eth_client.eth_provider().database().collection::<StoredHeader>();

// Build a filter for updating the header based on the new block number.
let filter = EthDatabaseFilterBuilder::<filter::Header>::default().with_block_number(last_block_number).build();

// Insert a new header for the new block number in the database.
eth_client
.eth_provider()
.database()
.update_one(
StoredHeader {
header: Header {
hash: B256::random(),
total_difficulty: Some(U256::default()),
mix_hash: Some(B256::default()),
nonce: Some(B64::default()),
withdrawals_root: Some(EMPTY_ROOT_HASH),
base_fee_per_gas: Some(0),
blob_gas_used: Some(0),
excess_blob_gas: Some(0),
number: last_block_number,
..Default::default()
},
},
filter,
true,
)
.await
.expect("Failed to update header in database");

// Update the header collection with the padded block number in the database.
header_collection
.update_one(
doc! {"header.number": unpadded_block_number},
UpdateModifications::Document(doc! {"$set": {"header.number": padded_block_number}}),
)
.with_options(UpdateOptions::builder().upsert(true).build())
.await
.expect("Failed to update block number");
Eikix marked this conversation as resolved.
Show resolved Hide resolved

// Check if both transactions are still in the mempool.
// We expect them to still be in the mempool until 1 second has elapsed.
assert!(eth_client.mempool().contains(transaction1.hash()), "Transaction 1 should still be in the mempool");
assert!(eth_client.mempool().contains(transaction2.hash()), "Transaction 2 should still be in the mempool");
}

// Sleep for some additional time to allow the pruning to occur.
tokio::time::sleep(Duration::from_secs(1)).await;

// Verify that both transactions have been pruned from the mempool after the pruning duration.
assert!(!eth_client.mempool().contains(transaction1.hash()), "Transaction 1 should be pruned after 1 second");
assert!(!eth_client.mempool().contains(transaction2.hash()), "Transaction 2 should be pruned after 1 second");

// Ensure the background task is stopped gracefully.
maintain_task.abort();
}
Loading