Skip to content

Commit

Permalink
Connect defrag-style TUI to sync command
Browse files Browse the repository at this point in the history
  • Loading branch information
str4d committed Apr 22, 2024
1 parent 1dcd4b2 commit 67e75ba
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 12 deletions.
121 changes: 112 additions & 9 deletions src/commands/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,27 @@ use crate::{
remote::connect_to_lightwalletd,
};

#[cfg(feature = "tui")]
use {crate::tui::Tui, zcash_protocol::consensus::NetworkUpgrade};

#[cfg(feature = "tui")]
mod defrag;

const BATCH_SIZE: u32 = 10_000;

// Options accepted for the `sync` command
#[derive(Debug, Options)]
pub(crate) struct Command {}
pub(crate) struct Command {
#[cfg(feature = "tui")]
pub(crate) defrag: bool,
}

impl Command {
pub(crate) async fn run(self, wallet_dir: Option<String>) -> Result<(), anyhow::Error> {
pub(crate) async fn run(
self,
wallet_dir: Option<String>,
#[cfg(feature = "tui")] tui: Tui,
) -> Result<(), anyhow::Error> {
let params = get_wallet_network(wallet_dir.as_ref())?;

let (fsblockdb_root, db_data) = get_db_paths(wallet_dir.as_ref());
Expand All @@ -49,6 +59,24 @@ impl Command {
let mut db_data = WalletDb::for_path(db_data, params)?;
let mut client = connect_to_lightwalletd(&params).await?;

#[cfg(feature = "tui")]
let tui_handle = if self.defrag {
let mut app = defrag::App::new(
db_data
.get_wallet_birthday()?
.unwrap_or_else(|| params.activation_height(NetworkUpgrade::Sapling).unwrap()),
);
let handle = app.handle();
tokio::spawn(async move {
if let Err(e) = app.run(tui).await {
error!("Error while running TUI: {e}");
}
});
Some(handle)
} else {
None
};

// 1) Download note commitment tree data from lightwalletd
// 2) Pass the commitment tree data to the database.
update_subtree_roots(&mut client, &mut db_data).await?;
Expand All @@ -59,13 +87,23 @@ impl Command {
fsblockdb_root: &Path,
db_cache: &mut FsBlockDb,
db_data: &mut WalletDb<rusqlite::Connection, P>,
#[cfg(feature = "tui")] tui_handle: Option<&defrag::AppHandle>,
) -> Result<bool, anyhow::Error> {
// 3) Download chain tip metadata from lightwalletd
// 4) Notify the wallet of the updated chain tip.
update_chain_tip(client, db_data).await?;
let _chain_tip = update_chain_tip(client, db_data).await?;

// 5) Get the suggested scan ranges from the wallet database
info!("Fetching scan ranges");
let mut scan_ranges = db_data.suggest_scan_ranges()?;
info!("Fetched {} scan ranges", scan_ranges.len());
#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
if handle.set_scan_ranges(&scan_ranges, _chain_tip) {
// TUI exited.
return Ok(false);
}
}

// Store the handles to cached block deletions (which we spawn into separate
// tasks to allow us to continue downloading and scanning other ranges).
Expand All @@ -80,8 +118,15 @@ impl Command {
Some(scan_range) if scan_range.priority() == ScanPriority::Verify => {
// Download the blocks in `scan_range` into the block source,
// overwriting any existing blocks in this range.
let block_meta =
download_blocks(client, fsblockdb_root, db_cache, scan_range).await?;
let block_meta = download_blocks(
client,
fsblockdb_root,
db_cache,
scan_range,
#[cfg(feature = "tui")]
tui_handle,
)
.await?;

let chain_state =
download_chain_state(client, scan_range.block_range().start - 1)
Expand All @@ -97,6 +142,10 @@ impl Command {
db_data,
&chain_state,
scan_range,
#[cfg(feature = "tui")]
tui_handle,
#[cfg(feature = "tui")]
_chain_tip,
)?;

// Delete the now-scanned blocks, because keeping the entire chain
Expand All @@ -106,6 +155,13 @@ impl Command {
if scan_ranges_updated {
// The suggested scan ranges have been updated, so we re-request.
scan_ranges = db_data.suggest_scan_ranges()?;
#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
if handle.set_scan_ranges(&scan_ranges, _chain_tip) {
// TUI exited.
return Ok(false);
}
}
} else {
// At this point, the cache and scanned data are locally
// consistent (though not necessarily consistent with the
Expand All @@ -126,6 +182,13 @@ impl Command {
// and calling `scan_cached_blocks` on each range.
let scan_ranges = db_data.suggest_scan_ranges()?;
debug!("Suggested ranges: {:?}", scan_ranges);
#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
if handle.set_scan_ranges(&scan_ranges, _chain_tip) {
// TUI exited.
return Ok(false);
}
}
for scan_range in scan_ranges.into_iter().flat_map(|r| {
// Limit the number of blocks we download and scan at any one time.
(0..).scan(r, |acc, _| {
Expand All @@ -145,8 +208,15 @@ impl Command {
})
}) {
// Download the blocks in `scan_range` into the block source.
let block_meta =
download_blocks(client, fsblockdb_root, db_cache, &scan_range).await?;
let block_meta = download_blocks(
client,
fsblockdb_root,
db_cache,
&scan_range,
#[cfg(feature = "tui")]
tui_handle,
)
.await?;

let chain_state =
download_chain_state(client, scan_range.block_range().start - 1).await?;
Expand All @@ -159,6 +229,10 @@ impl Command {
db_data,
&chain_state,
&scan_range,
#[cfg(feature = "tui")]
tui_handle,
#[cfg(feature = "tui")]
_chain_tip,
)?;

// Delete the now-scanned blocks.
Expand Down Expand Up @@ -188,6 +262,8 @@ impl Command {
fsblockdb_root,
&mut db_cache,
&mut db_data,
#[cfg(feature = "tui")]
tui_handle.as_ref(),
)
.await?
{}
Expand Down Expand Up @@ -248,7 +324,7 @@ async fn update_subtree_roots<P: Parameters>(
async fn update_chain_tip<P: Parameters>(
client: &mut CompactTxStreamerClient<Channel>,
db_data: &mut WalletDb<rusqlite::Connection, P>,
) -> Result<(), anyhow::Error> {
) -> Result<BlockHeight, anyhow::Error> {
let tip_height: BlockHeight = client
.get_latest_block(service::ChainSpec::default())
.await?
Expand All @@ -261,16 +337,21 @@ async fn update_chain_tip<P: Parameters>(
info!("Latest block height is {}", tip_height);
db_data.update_chain_tip(tip_height)?;

Ok(())
Ok(tip_height)
}

async fn download_blocks(
client: &mut CompactTxStreamerClient<Channel>,
fsblockdb_root: &Path,
db_cache: &FsBlockDb,
scan_range: &ScanRange,
#[cfg(feature = "tui")] tui_handle: Option<&defrag::AppHandle>,
) -> Result<Vec<BlockMeta>, anyhow::Error> {
info!("Fetching {}", scan_range);
#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
handle.set_fetching_range(Some(scan_range.block_range().clone()));
}
let mut start = service::BlockId::default();
start.height = scan_range.block_range().start.into();
let mut end = service::BlockId::default();
Expand Down Expand Up @@ -314,6 +395,10 @@ async fn download_blocks(
.write_block_metadata(&block_meta)
.map_err(error::Error::from)?;

#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
handle.set_fetching_range(None);
}
Ok(block_meta)
}

Expand Down Expand Up @@ -346,15 +431,22 @@ fn delete_cached_blocks(fsblockdb_root: &Path, block_meta: Vec<BlockMeta>) -> Jo
/// chain tip is out of sync with blockchain history.
///
/// Returns `true` if scanning these blocks materially changed the suggested scan ranges.
#[allow(clippy::too_many_arguments)]
fn scan_blocks<P: Parameters + Send + 'static>(
params: &P,
fsblockdb_root: &Path,
db_cache: &mut FsBlockDb,
db_data: &mut WalletDb<rusqlite::Connection, P>,
initial_chain_state: &ChainState,
scan_range: &ScanRange,
#[cfg(feature = "tui")] tui_handle: Option<&defrag::AppHandle>,
#[cfg(feature = "tui")] chain_tip: BlockHeight,
) -> Result<bool, anyhow::Error> {
info!("Scanning {}", scan_range);
#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
handle.set_scanning_range(Some(scan_range.block_range().clone()));
}
let scan_result = scan_cached_blocks(
params,
db_cache,
Expand All @@ -363,6 +455,10 @@ fn scan_blocks<P: Parameters + Send + 'static>(
initial_chain_state,
scan_range.len(),
);
#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
handle.set_scanning_range(None);
}

match scan_result {
Err(ChainError::Scan(err)) if err.is_continuity_error() => {
Expand Down Expand Up @@ -410,6 +506,13 @@ fn scan_blocks<P: Parameters + Send + 'static>(
// If scanning these blocks caused a suggested range to be added that has a
// higher priority than the current range, invalidate the current ranges.
let latest_ranges = db_data.suggest_scan_ranges()?;
#[cfg(feature = "tui")]
if let Some(handle) = tui_handle {
if handle.set_scan_ranges(&latest_ranges, chain_tip) {
// TUI exited.
return Ok(false);
}
}

Ok(if let Some(range) = latest_ranges.first() {
range.priority() > scan_range.priority()
Expand Down
35 changes: 32 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,26 @@ enum Command {
fn main() -> Result<(), anyhow::Error> {
let opts = MyOptions::parse_args_default_or_exit();

let filter = env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned());
tracing_subscriber::fmt().with_env_filter(filter).init();
#[cfg(not(feature = "tui"))]
let log_configured = false;
#[cfg(feature = "tui")]
let log_configured =
if let Some(Command::Sync(commands::sync::Command { defrag: true })) = opts.command {
use tracing_subscriber::layer::SubscriberExt;

tracing::subscriber::set_global_default(
tracing_subscriber::registry().with(tui_logger::tracing_subscriber_layer()),
)
.unwrap();
true
} else {
false
};

if !log_configured {
let filter = env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned());
tracing_subscriber::fmt().with_env_filter(filter).init();
}

rayon::ThreadPoolBuilder::new()
.thread_name(|i| format!("zec-rayon-{}", i))
Expand All @@ -84,11 +102,22 @@ fn main() -> Result<(), anyhow::Error> {
.build()?;

runtime.block_on(async {
#[cfg(feature = "tui")]
let tui = tui::Tui::new()?.tick_rate(4.0).frame_rate(30.0);

match opts.command {
Some(Command::Init(command)) => command.run(opts.wallet_dir).await,
Some(Command::Reset(command)) => command.run(opts.wallet_dir).await,
Some(Command::Upgrade(command)) => command.run(opts.wallet_dir),
Some(Command::Sync(command)) => command.run(opts.wallet_dir).await,
Some(Command::Sync(command)) => {
command
.run(
opts.wallet_dir,
#[cfg(feature = "tui")]
tui,
)
.await
}
Some(Command::Balance(command)) => command.run(opts.wallet_dir),
Some(Command::ListTx(command)) => command.run(opts.wallet_dir),
Some(Command::ListUnspent(command)) => command.run(opts.wallet_dir),
Expand Down

0 comments on commit 67e75ba

Please sign in to comment.