From 67e75babca9c69e4a1c11fc9e5ecb2f770768feb Mon Sep 17 00:00:00 2001 From: Jack Grigg Date: Sat, 20 Apr 2024 21:40:48 +0000 Subject: [PATCH] Connect defrag-style TUI to sync command --- src/commands/sync.rs | 121 +++++++++++++++++++++++++++++++++++++++---- src/main.rs | 35 +++++++++++-- 2 files changed, 144 insertions(+), 12 deletions(-) diff --git a/src/commands/sync.rs b/src/commands/sync.rs index 70d07f9..af37ab3 100644 --- a/src/commands/sync.rs +++ b/src/commands/sync.rs @@ -30,6 +30,9 @@ use crate::{ remote::connect_to_lightwalletd, }; +#[cfg(feature = "tui")] +use {crate::tui::Tui, zcash_protocol::consensus::NetworkUpgrade}; + #[cfg(feature = "tui")] mod defrag; @@ -37,10 +40,17 @@ const BATCH_SIZE: u32 = 10_000; // Options accepted for the `sync` command #[derive(Debug, Options)] -pub(crate) struct Command {} +pub(crate) struct Command { + #[cfg(feature = "tui")] + pub(crate) defrag: bool, +} impl Command { - pub(crate) async fn run(self, wallet_dir: Option) -> Result<(), anyhow::Error> { + pub(crate) async fn run( + self, + wallet_dir: Option, + #[cfg(feature = "tui")] tui: Tui, + ) -> Result<(), anyhow::Error> { let params = get_wallet_network(wallet_dir.as_ref())?; let (fsblockdb_root, db_data) = get_db_paths(wallet_dir.as_ref()); @@ -49,6 +59,24 @@ impl Command { let mut db_data = WalletDb::for_path(db_data, params)?; let mut client = connect_to_lightwalletd(¶ms).await?; + #[cfg(feature = "tui")] + let tui_handle = if self.defrag { + let mut app = defrag::App::new( + db_data + .get_wallet_birthday()? + .unwrap_or_else(|| params.activation_height(NetworkUpgrade::Sapling).unwrap()), + ); + let handle = app.handle(); + tokio::spawn(async move { + if let Err(e) = app.run(tui).await { + error!("Error while running TUI: {e}"); + } + }); + Some(handle) + } else { + None + }; + // 1) Download note commitment tree data from lightwalletd // 2) Pass the commitment tree data to the database. update_subtree_roots(&mut client, &mut db_data).await?; @@ -59,13 +87,23 @@ impl Command { fsblockdb_root: &Path, db_cache: &mut FsBlockDb, db_data: &mut WalletDb, + #[cfg(feature = "tui")] tui_handle: Option<&defrag::AppHandle>, ) -> Result { // 3) Download chain tip metadata from lightwalletd // 4) Notify the wallet of the updated chain tip. - update_chain_tip(client, db_data).await?; + let _chain_tip = update_chain_tip(client, db_data).await?; // 5) Get the suggested scan ranges from the wallet database + info!("Fetching scan ranges"); let mut scan_ranges = db_data.suggest_scan_ranges()?; + info!("Fetched {} scan ranges", scan_ranges.len()); + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + if handle.set_scan_ranges(&scan_ranges, _chain_tip) { + // TUI exited. + return Ok(false); + } + } // Store the handles to cached block deletions (which we spawn into separate // tasks to allow us to continue downloading and scanning other ranges). @@ -80,8 +118,15 @@ impl Command { Some(scan_range) if scan_range.priority() == ScanPriority::Verify => { // Download the blocks in `scan_range` into the block source, // overwriting any existing blocks in this range. - let block_meta = - download_blocks(client, fsblockdb_root, db_cache, scan_range).await?; + let block_meta = download_blocks( + client, + fsblockdb_root, + db_cache, + scan_range, + #[cfg(feature = "tui")] + tui_handle, + ) + .await?; let chain_state = download_chain_state(client, scan_range.block_range().start - 1) @@ -97,6 +142,10 @@ impl Command { db_data, &chain_state, scan_range, + #[cfg(feature = "tui")] + tui_handle, + #[cfg(feature = "tui")] + _chain_tip, )?; // Delete the now-scanned blocks, because keeping the entire chain @@ -106,6 +155,13 @@ impl Command { if scan_ranges_updated { // The suggested scan ranges have been updated, so we re-request. scan_ranges = db_data.suggest_scan_ranges()?; + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + if handle.set_scan_ranges(&scan_ranges, _chain_tip) { + // TUI exited. + return Ok(false); + } + } } else { // At this point, the cache and scanned data are locally // consistent (though not necessarily consistent with the @@ -126,6 +182,13 @@ impl Command { // and calling `scan_cached_blocks` on each range. let scan_ranges = db_data.suggest_scan_ranges()?; debug!("Suggested ranges: {:?}", scan_ranges); + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + if handle.set_scan_ranges(&scan_ranges, _chain_tip) { + // TUI exited. + return Ok(false); + } + } for scan_range in scan_ranges.into_iter().flat_map(|r| { // Limit the number of blocks we download and scan at any one time. (0..).scan(r, |acc, _| { @@ -145,8 +208,15 @@ impl Command { }) }) { // Download the blocks in `scan_range` into the block source. - let block_meta = - download_blocks(client, fsblockdb_root, db_cache, &scan_range).await?; + let block_meta = download_blocks( + client, + fsblockdb_root, + db_cache, + &scan_range, + #[cfg(feature = "tui")] + tui_handle, + ) + .await?; let chain_state = download_chain_state(client, scan_range.block_range().start - 1).await?; @@ -159,6 +229,10 @@ impl Command { db_data, &chain_state, &scan_range, + #[cfg(feature = "tui")] + tui_handle, + #[cfg(feature = "tui")] + _chain_tip, )?; // Delete the now-scanned blocks. @@ -188,6 +262,8 @@ impl Command { fsblockdb_root, &mut db_cache, &mut db_data, + #[cfg(feature = "tui")] + tui_handle.as_ref(), ) .await? {} @@ -248,7 +324,7 @@ async fn update_subtree_roots( async fn update_chain_tip( client: &mut CompactTxStreamerClient, db_data: &mut WalletDb, -) -> Result<(), anyhow::Error> { +) -> Result { let tip_height: BlockHeight = client .get_latest_block(service::ChainSpec::default()) .await? @@ -261,7 +337,7 @@ async fn update_chain_tip( info!("Latest block height is {}", tip_height); db_data.update_chain_tip(tip_height)?; - Ok(()) + Ok(tip_height) } async fn download_blocks( @@ -269,8 +345,13 @@ async fn download_blocks( fsblockdb_root: &Path, db_cache: &FsBlockDb, scan_range: &ScanRange, + #[cfg(feature = "tui")] tui_handle: Option<&defrag::AppHandle>, ) -> Result, anyhow::Error> { info!("Fetching {}", scan_range); + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + handle.set_fetching_range(Some(scan_range.block_range().clone())); + } let mut start = service::BlockId::default(); start.height = scan_range.block_range().start.into(); let mut end = service::BlockId::default(); @@ -314,6 +395,10 @@ async fn download_blocks( .write_block_metadata(&block_meta) .map_err(error::Error::from)?; + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + handle.set_fetching_range(None); + } Ok(block_meta) } @@ -346,6 +431,7 @@ fn delete_cached_blocks(fsblockdb_root: &Path, block_meta: Vec) -> Jo /// chain tip is out of sync with blockchain history. /// /// Returns `true` if scanning these blocks materially changed the suggested scan ranges. +#[allow(clippy::too_many_arguments)] fn scan_blocks( params: &P, fsblockdb_root: &Path, @@ -353,8 +439,14 @@ fn scan_blocks( db_data: &mut WalletDb, initial_chain_state: &ChainState, scan_range: &ScanRange, + #[cfg(feature = "tui")] tui_handle: Option<&defrag::AppHandle>, + #[cfg(feature = "tui")] chain_tip: BlockHeight, ) -> Result { info!("Scanning {}", scan_range); + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + handle.set_scanning_range(Some(scan_range.block_range().clone())); + } let scan_result = scan_cached_blocks( params, db_cache, @@ -363,6 +455,10 @@ fn scan_blocks( initial_chain_state, scan_range.len(), ); + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + handle.set_scanning_range(None); + } match scan_result { Err(ChainError::Scan(err)) if err.is_continuity_error() => { @@ -410,6 +506,13 @@ fn scan_blocks( // If scanning these blocks caused a suggested range to be added that has a // higher priority than the current range, invalidate the current ranges. let latest_ranges = db_data.suggest_scan_ranges()?; + #[cfg(feature = "tui")] + if let Some(handle) = tui_handle { + if handle.set_scan_ranges(&latest_ranges, chain_tip) { + // TUI exited. + return Ok(false); + } + } Ok(if let Some(range) = latest_ranges.first() { range.priority() > scan_range.priority() diff --git a/src/main.rs b/src/main.rs index 31bf5da..9948005 100644 --- a/src/main.rs +++ b/src/main.rs @@ -66,8 +66,26 @@ enum Command { fn main() -> Result<(), anyhow::Error> { let opts = MyOptions::parse_args_default_or_exit(); - let filter = env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()); - tracing_subscriber::fmt().with_env_filter(filter).init(); + #[cfg(not(feature = "tui"))] + let log_configured = false; + #[cfg(feature = "tui")] + let log_configured = + if let Some(Command::Sync(commands::sync::Command { defrag: true })) = opts.command { + use tracing_subscriber::layer::SubscriberExt; + + tracing::subscriber::set_global_default( + tracing_subscriber::registry().with(tui_logger::tracing_subscriber_layer()), + ) + .unwrap(); + true + } else { + false + }; + + if !log_configured { + let filter = env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()); + tracing_subscriber::fmt().with_env_filter(filter).init(); + } rayon::ThreadPoolBuilder::new() .thread_name(|i| format!("zec-rayon-{}", i)) @@ -84,11 +102,22 @@ fn main() -> Result<(), anyhow::Error> { .build()?; runtime.block_on(async { + #[cfg(feature = "tui")] + let tui = tui::Tui::new()?.tick_rate(4.0).frame_rate(30.0); + match opts.command { Some(Command::Init(command)) => command.run(opts.wallet_dir).await, Some(Command::Reset(command)) => command.run(opts.wallet_dir).await, Some(Command::Upgrade(command)) => command.run(opts.wallet_dir), - Some(Command::Sync(command)) => command.run(opts.wallet_dir).await, + Some(Command::Sync(command)) => { + command + .run( + opts.wallet_dir, + #[cfg(feature = "tui")] + tui, + ) + .await + } Some(Command::Balance(command)) => command.run(opts.wallet_dir), Some(Command::ListTx(command)) => command.run(opts.wallet_dir), Some(Command::ListUnspent(command)) => command.run(opts.wallet_dir),