Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abci state sync #2413

Open
wants to merge 10 commits into
base: v2.0-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,475 changes: 812 additions & 663 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ transport = "routed"
#]
grpc-concurrency = [
{ "check_tx" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "list_snapshots" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "load_snapshot_chunk" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "offer_snapshot" = 1 },
{ "apply_snapshot_chunk" = 1 },
]


Expand Down Expand Up @@ -414,7 +418,7 @@ ttl-num-blocks = {{=it.platform.drive.tenderdash.mempool.ttlNumBlocks}}
# the network to take and serve state machine snapshots. State sync is not attempted if the node
# has any local state (LastBlockHeight > 0). The node will have a truncated block history,
# starting from the height of the snapshot.
enable = false
enable = true

# State sync uses light client verification to verify state. This can be done either through the
# P2P layer or RPC layer. Set this to true to use the P2P layer. If false (default), RPC layer
Expand Down
6 changes: 6 additions & 0 deletions packages/rs-drive-abci/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ ABCI_LOG_STDOUT_FORMAT=pretty
ABCI_LOG_STDOUT_COLOR=true

DB_PATH=/tmp/db

CHECKPOINTS_PATH=${DB_PATH}/checkpoints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Critical: Production environments configured to use temporary storage

The codebase has a proper default production path (/var/lib/dash-platform/data), but it's being overridden by environment files to use temporary storage:

  • .env.mainnet: DB_PATH=/tmp/db
  • .env.testnet: DB_PATH=/tmp/db

This configuration will lead to data loss on system reboot. Update the environment files to use persistent storage paths instead of /tmp/db.

🔗 Analysis chain

Verify database paths for production environments.

Both CHECKPOINTS_PATH and GROVEDB_LATEST_FILE inherit from DB_PATH which is set to /tmp/db. While this is fine for local development, using /tmp in production could lead to data loss as it's typically cleared on system reboot.

Run this script to check if there are any production environment files with similar configuration:

Also applies to: 19-19

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for production environment files and their DB_PATH configuration
# to ensure they don't use temporary directories.

echo "Searching for environment files:"
fd -t f "^\.env\.(production|prod)" packages/rs-drive-abci/

echo -e "\nChecking DB_PATH in found files:"
rg "^DB_PATH=" packages/rs-drive-abci/.env.*

Length of output: 702


Script:

#!/bin/bash
# Check for deployment documentation and configuration
echo "Searching for deployment-related files and documentation:"
fd -t f "^(deploy|docker|k8s|kubernetes|README)" packages/rs-drive-abci/

echo -e "\nChecking content of found files for database path configuration:"
rg -i "(\bdb[_\s-]path|database[_\s-]path|data[_\s-]dir)" packages/rs-drive-abci/

Length of output: 2638


# GroveDB database file
GROVEDB_LATEST_FILE=${DB_PATH}/latest_state

REJECTIONS_PATH=/tmp/rejected

# Cache size for Data Contracts
Expand Down
88 changes: 76 additions & 12 deletions packages/rs-drive-abci/src/abci/app/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,97 @@
use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication};
use crate::abci::app::{
BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication,
SnapshotManagerApplication, TransactionalApplication,
};
use crate::abci::handler;
use crate::abci::handler::error::error_into_exception;
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::platform::Platform;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::CoreRPCLike;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;
use std::fmt::Debug;
use std::sync::RwLock;
use tenderdash_abci::proto::abci as proto;
use dapi_grpc::tonic;

/// AbciApp is an implementation of ABCI Application, as defined by Tenderdash.
///
/// AbciApp implements logic that should be triggered when Tenderdash performs various operations, like
/// creating new proposal or finalizing new block.
pub struct ConsensusAbciApplication<'a, C> {
/// 'p: 'tx, means that Platform must outlive the transaction
pub struct ConsensusAbciApplication<'p, C> {
/// Platform
platform: &'a Platform<C>,
platform: &'p Platform<C>,
/// The current GroveDb transaction
transaction: RwLock<Option<Transaction<'a>>>,
transaction: RwLock<Option<Transaction<'p>>>,
/// The current block execution context
block_execution_context: RwLock<Option<BlockExecutionContext>>,
/// The State sync session
snapshot_fetching_session: RwLock<Option<SnapshotFetchingSession<'p>>>,
/// The snapshot manager
snapshot_manager: SnapshotManager,
}

impl<'a, C> ConsensusAbciApplication<'a, C> {
impl<'p, C> ConsensusAbciApplication<'p, C> {
/// Create new ABCI app
pub fn new(platform: &'a Platform<C>) -> Self {
pub fn new(platform: &'p Platform<C>) -> Self {
let snapshot_manager = SnapshotManager::new(
platform
.config
.state_sync_config
.checkpoints_path.clone(),
platform.config.state_sync_config.max_num_snapshots,
platform.config.state_sync_config.snapshots_frequency,
);
Self {
platform,
transaction: Default::default(),
block_execution_context: Default::default(),
snapshot_fetching_session: Default::default(),
snapshot_manager,
}
}
}

impl<'a, C> PlatformApplication<C> for ConsensusAbciApplication<'a, C> {
impl<'p, C> PlatformApplication<C> for ConsensusAbciApplication<'p, C> {
fn platform(&self) -> &Platform<C> {
self.platform
}
}

impl<'a, C> BlockExecutionApplication for ConsensusAbciApplication<'a, C> {
impl<'p, C> SnapshotManagerApplication for ConsensusAbciApplication<'p, C> {
fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}
}

impl<'p, C> SnapshotFetchingApplication<'p, C> for ConsensusAbciApplication<'p, C> {
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'p>>> {
&self.snapshot_fetching_session
}

fn platform(&self) -> &'p Platform<C> {
self.platform
}
}

impl<'p, C> BlockExecutionApplication for ConsensusAbciApplication<'p, C> {
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>> {
&self.block_execution_context
}
}

impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> {
impl<'p, C> TransactionalApplication<'p> for ConsensusAbciApplication<'p, C> {
/// create and store a new transaction
fn start_transaction(&self) {
let transaction = self.platform.drive.grove.start_transaction();
self.transaction.write().unwrap().replace(transaction);
}

fn transaction(&self) -> &RwLock<Option<Transaction<'a>>> {
fn transaction(&self) -> &RwLock<Option<Transaction<'p>>> {
&self.transaction
}

Expand All @@ -77,13 +113,13 @@ impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> {
}
}

impl<'a, C> Debug for ConsensusAbciApplication<'a, C> {
impl<'p, C> Debug for ConsensusAbciApplication<'p, C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<ConsensusAbciApplication>")
}
}

impl<'a, C> tenderdash_abci::Application for ConsensusAbciApplication<'a, C>
impl<'p, C> tenderdash_abci::Application for ConsensusAbciApplication<'p, C>
where
C: CoreRPCLike,
{
Expand Down Expand Up @@ -149,4 +185,32 @@ where
) -> Result<proto::ResponseVerifyVoteExtension, proto::ResponseException> {
handler::verify_vote_extension(self, request).map_err(error_into_exception)
}

fn offer_snapshot(
&self,
request: proto::RequestOfferSnapshot,
) -> Result<proto::ResponseOfferSnapshot, proto::ResponseException> {
handler::offer_snapshot(self, request).map_err(error_into_exception)
}

fn apply_snapshot_chunk(
&self,
request: proto::RequestApplySnapshotChunk,
) -> Result<proto::ResponseApplySnapshotChunk, proto::ResponseException> {
handler::apply_snapshot_chunk(self, request).map_err(error_into_exception)
}

fn list_snapshots(
&self,
request: proto::RequestListSnapshots,
) -> Result<proto::ResponseListSnapshots, proto::ResponseException> {
handler::list_snapshots(self, request).map_err(error_into_exception)
}

fn load_snapshot_chunk(
&self,
request: proto::RequestLoadSnapshotChunk,
) -> Result<proto::ResponseLoadSnapshotChunk, proto::ResponseException> {
handler::load_snapshot_chunk(self, request).map_err(error_into_exception)
}
}
64 changes: 63 additions & 1 deletion packages/rs-drive-abci/src/abci/app/full.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication};
use crate::abci::app::{
BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication,
SnapshotManagerApplication, TransactionalApplication,
};
use crate::abci::handler;
use crate::abci::handler::error::error_into_exception;
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::platform::Platform;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::CoreRPCLike;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;
Expand All @@ -23,15 +27,29 @@ pub struct FullAbciApplication<'a, C> {
pub transaction: RwLock<Option<Transaction<'a>>>,
/// The current block execution context
pub block_execution_context: RwLock<Option<BlockExecutionContext>>,
/// The State sync session
pub snapshot_fetching_session: RwLock<Option<SnapshotFetchingSession<'a>>>,
/// The snapshot manager
pub snapshot_manager: SnapshotManager,
}

impl<'a, C> FullAbciApplication<'a, C> {
/// Create new ABCI app
pub fn new(platform: &'a Platform<C>) -> Self {
let snapshot_manager = SnapshotManager::new(
platform
.config
.state_sync_config
.checkpoints_path.clone(),
platform.config.state_sync_config.max_num_snapshots,
platform.config.state_sync_config.snapshots_frequency,
);
Comment on lines +39 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add path validation for checkpoints directory

Consider validating that the checkpoints directory exists and is writable before initializing the SnapshotManager. This would prevent runtime errors during snapshot operations.

 let snapshot_manager = SnapshotManager::new(
-    platform
-        .config
-        .state_sync_config
-        .checkpoints_path.clone(),
+    {
+        let path = platform.config.state_sync_config.checkpoints_path.clone();
+        std::fs::create_dir_all(&path).map_err(|e| {
+            Error::InitializationError(format!(
+                "Failed to create checkpoints directory: {}",
+                e
+            ))
+        })?;
+        path
+    },
     platform.config.state_sync_config.max_num_snapshots,
     platform.config.state_sync_config.snapshots_frequency,
 );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let snapshot_manager = SnapshotManager::new(
platform
.config
.state_sync_config
.checkpoints_path.clone(),
platform.config.state_sync_config.max_num_snapshots,
platform.config.state_sync_config.snapshots_frequency,
);
let snapshot_manager = SnapshotManager::new(
{
let path = platform.config.state_sync_config.checkpoints_path.clone();
std::fs::create_dir_all(&path).map_err(|e| {
Error::InitializationError(format!(
"Failed to create checkpoints directory: {}",
e
))
})?;
path
},
platform.config.state_sync_config.max_num_snapshots,
platform.config.state_sync_config.snapshots_frequency,
);

Self {
platform,
transaction: Default::default(),
block_execution_context: Default::default(),
snapshot_fetching_session: Default::default(),
snapshot_manager,
}
}
}
Expand All @@ -42,6 +60,22 @@ impl<'a, C> PlatformApplication<C> for FullAbciApplication<'a, C> {
}
}

impl<'a, C> SnapshotManagerApplication for FullAbciApplication<'a, C> {
fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}
}

impl<'a, C> SnapshotFetchingApplication<'a, C> for FullAbciApplication<'a, C> {
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'a>>> {
&self.snapshot_fetching_session
}

fn platform(&self) -> &'a Platform<C> {
self.platform
}
}

impl<'a, C> BlockExecutionApplication for FullAbciApplication<'a, C> {
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>> {
&self.block_execution_context
Expand Down Expand Up @@ -150,4 +184,32 @@ where
) -> Result<proto::ResponseVerifyVoteExtension, proto::ResponseException> {
handler::verify_vote_extension(self, request).map_err(error_into_exception)
}

fn offer_snapshot(
&self,
request: proto::RequestOfferSnapshot,
) -> Result<proto::ResponseOfferSnapshot, proto::ResponseException> {
handler::offer_snapshot(self, request).map_err(error_into_exception)
}

fn apply_snapshot_chunk(
&self,
request: proto::RequestApplySnapshotChunk,
) -> Result<proto::ResponseApplySnapshotChunk, proto::ResponseException> {
handler::apply_snapshot_chunk(self, request).map_err(error_into_exception)
}

fn list_snapshots(
&self,
request: proto::RequestListSnapshots,
) -> Result<proto::ResponseListSnapshots, proto::ResponseException> {
handler::list_snapshots(self, request).map_err(error_into_exception)
}

fn load_snapshot_chunk(
&self,
request: proto::RequestLoadSnapshotChunk,
) -> Result<proto::ResponseLoadSnapshotChunk, proto::ResponseException> {
handler::load_snapshot_chunk(self, request).map_err(error_into_exception)
}
}
22 changes: 20 additions & 2 deletions packages/rs-drive-abci/src/abci/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,36 @@ mod consensus;
/// Convert state transition execution result into ABCI response
pub mod execution_result;
mod full;
mod state_source;

use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::DefaultCoreRPC;
pub use check_tx::CheckTxAbciApplication;
pub use consensus::ConsensusAbciApplication;
use dpp::version::PlatformVersion;
pub use full::FullAbciApplication;
pub use state_source::StateSourceAbciApplication;

/// Platform-based ABCI application
pub trait PlatformApplication<C = DefaultCoreRPC> {
/// Returns Platform
fn platform(&self) -> &Platform<C>;
}

/// Platform-based ABCI application
pub trait SnapshotManagerApplication {
/// Returns Platform
fn snapshot_manager(&self) -> &SnapshotManager;
}

/// Transactional ABCI application
pub trait TransactionalApplication<'a> {
pub trait TransactionalApplication<'p> {
/// Creates and keeps a new transaction
fn start_transaction(&self);

/// Returns the current transaction
fn transaction(&self) -> &RwLock<Option<Transaction<'a>>>;
fn transaction(&self) -> &RwLock<Option<Transaction<'p>>>;

/// Commits created transaction
fn commit_transaction(&self, platform_version: &PlatformVersion) -> Result<(), Error>;
Expand All @@ -39,3 +48,12 @@ pub trait BlockExecutionApplication {
/// Returns the current block execution context
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>>;
}

/// Application that can maintain state sync
pub trait SnapshotFetchingApplication<'p, C> {
/// Returns the current snapshot fetching session
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'p>>>;

/// Returns platform reference
fn platform(&self) -> &'p Platform<C>;
}
Loading