diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f23deba..dee56d0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.12.0-pre.11 (Nov 21, 2024) +* Added parallelization for node preloads during audit proof generation +* Removed `parallel_azks` feature in favor of explicit parallelism configuration object allowing the +setting of static or dynamic parallelism across insertion and preloads + ## 0.12.0-pre.10 (Oct 17, 2024) * Added parallelization for node preloads during insertion * Added support for [tracing](https://docs.rs/tracing/latest/tracing/index.html) diff --git a/README.md b/README.md index 6e80a6bc..23ab715a 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ Installation Add the following line to the dependencies of your `Cargo.toml`: ``` -akd = "0.12.0-pre.10" +akd = "0.12.0-pre.11" ``` ### Minimum Supported Rust Version diff --git a/akd/Cargo.toml b/akd/Cargo.toml index 94fabaeb..2d2eff48 100644 --- a/akd/Cargo.toml +++ b/akd/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akd" -version = "0.12.0-pre.10" +version = "0.12.0-pre.11" authors = ["akd contributors"] description = "An implementation of an auditable key directory" license = "MIT OR Apache-2.0" @@ -18,7 +18,6 @@ experimental = ["akd_core/experimental"] default = [ "public_auditing", "parallel_vrf", - "parallel_azks", "preload_history", "greedy_lookup_preload", "experimental", @@ -28,8 +27,6 @@ bench = ["experimental", "public_tests", "tokio/rt-multi-thread"] # Greedy loading of lookup proof nodes greedy_lookup_preload = [] public_auditing = ["dep:protobuf", "akd_core/protobuf"] -# Parallelize node fetch and insertion during publish -parallel_azks = [] # Parallelize VRF calculations during publish parallel_vrf = ["akd_core/parallel_vrf"] # Enable pre-loading of the nodes when generating history proofs @@ -56,7 +53,7 @@ tracing_instrument = ["tracing/attributes"] [dependencies] ## Required dependencies ## -akd_core = { version = "0.12.0-pre.10", path = "../akd_core", default-features = false, features = [ +akd_core = { version = "0.12.0-pre.11", path = "../akd_core", default-features = false, features = [ "vrf", ] } async-recursion = "1" diff --git a/akd/benches/azks.rs b/akd/benches/azks.rs index 5b338348..10427dc8 100644 --- a/akd/benches/azks.rs +++ b/akd/benches/azks.rs @@ -10,7 +10,7 @@ extern crate criterion; mod common; -use akd::append_only_zks::InsertMode; +use akd::append_only_zks::{AzksParallelismConfig, InsertMode}; use akd::auditor; use akd::storage::manager::StorageManager; use akd::storage::memory::AsyncInMemoryDatabase; @@ -57,6 +57,7 @@ fn batch_insertion(c: &mut Criterion) { &db, initial_node_set.clone(), InsertMode::Directory, + AzksParallelismConfig::default(), )) .unwrap(); (azks, db, node_set.clone()) @@ -67,6 +68,7 @@ fn batch_insertion(c: &mut Criterion) { &db, node_set, InsertMode::Directory, + AzksParallelismConfig::default(), )) .unwrap(); }, @@ -107,6 +109,7 @@ fn audit_verify(c: &mut Criterion) { &db, initial_node_set.clone(), InsertMode::Directory, + AzksParallelismConfig::default(), )) .unwrap(); @@ -118,12 +121,18 @@ fn audit_verify(c: &mut Criterion) { &db, node_set.clone(), InsertMode::Directory, + AzksParallelismConfig::default(), )) .unwrap(); let end_hash = runtime.block_on(azks.get_root_hash::(&db)).unwrap(); let proof = runtime - .block_on(azks.get_append_only_proof::(&db, 1, 2)) + .block_on(azks.get_append_only_proof::( + &db, + 1, + 2, + AzksParallelismConfig::default(), + )) .unwrap(); (start_hash, end_hash, proof) @@ -157,7 +166,12 @@ fn audit_generate(c: &mut Criterion) { for _epoch in 0..num_epochs { let node_set = gen_nodes(&mut rng, num_leaves); runtime - .block_on(azks.batch_insert_nodes::(&db, node_set, InsertMode::Directory)) + .block_on(azks.batch_insert_nodes::( + &db, + node_set, + InsertMode::Directory, + AzksParallelismConfig::default(), + )) .unwrap(); } let epoch = azks.get_latest_epoch(); @@ -172,7 +186,12 @@ fn audit_generate(c: &mut Criterion) { || {}, |_| { let _proof = runtime - .block_on(azks.get_append_only_proof::(&db, epoch - 1, epoch)) + .block_on(azks.get_append_only_proof::( + &db, + epoch - 1, + epoch, + AzksParallelismConfig::default(), + )) .unwrap(); }, BatchSize::PerIteration, diff --git a/akd/benches/directory.rs b/akd/benches/directory.rs index 03f2a686..9c7a155a 100644 --- a/akd/benches/directory.rs +++ b/akd/benches/directory.rs @@ -10,6 +10,7 @@ extern crate criterion; mod common; +use akd::append_only_zks::AzksParallelismConfig; use akd::ecvrf::HardCodedAkdVRF; use akd::storage::manager::StorageManager; use akd::storage::memory::AsyncInMemoryDatabase; @@ -56,7 +57,9 @@ fn history_generation(c: &mut Criterion) { ); let db_clone = db.clone(); let directory = runtime - .block_on(async move { Directory::::new(db, vrf).await }) + .block_on(async move { + Directory::::new(db, vrf, AzksParallelismConfig::default()).await + }) .unwrap(); for _epoch in 1..num_updates { diff --git a/akd/src/append_only_zks.rs b/akd/src/append_only_zks.rs index 2ab9450a..d8b8ab89 100644 --- a/akd/src/append_only_zks.rs +++ b/akd/src/append_only_zks.rs @@ -36,11 +36,6 @@ use std::sync::Arc; /// The default azks key pub const DEFAULT_AZKS_KEY: u8 = 1u8; -/// The default available parallelism for parallel batch insertions, used when -/// available parallelism cannot be determined at runtime. Should be > 1 -#[cfg(feature = "parallel_azks")] -pub const DEFAULT_AVAILABLE_PARALLELISM: usize = 32; - async fn tic_toc(f: impl core::future::Future) -> (T, Option) { #[cfg(feature = "runtime_metrics")] { @@ -53,44 +48,6 @@ async fn tic_toc(f: impl core::future::Future) -> (T, Option (f.await, None) } -fn get_parallel_levels() -> Option { - #[cfg(not(feature = "parallel_azks"))] - return None; - - #[cfg(feature = "parallel_azks")] - { - // Based on profiling results, the best performance is achieved when the - // number of spawned tasks is equal to the number of available threads. - // We therefore get the number of available threads and calculate the - // number of levels that should be executed in parallel to give the - // number of tasks closest to the number of threads. While there might - // be other tasks that are running on the threads, this is a reasonable - // approximation that should yield good performance in most cases. - let available_parallelism = std::thread::available_parallelism() - .map_or(DEFAULT_AVAILABLE_PARALLELISM, |v| v.into()); - // The number of tasks spawned at a level is the number of leaves at - // the level. As we are using a binary tree, the number of leaves at a - // level is 2^level. Therefore, the number of levels that should be - // executed in parallel is the log2 of the number of available threads. - let parallel_levels = (available_parallelism as f32).log2().ceil() as u8; - - info!( - "Parallel levels requested (available parallelism: {}, parallel levels: {})", - available_parallelism, parallel_levels - ); - Some(parallel_levels) - } -} - -/// Determines parallelism for node preloading. -#[derive(Debug, Clone, Copy, PartialEq)] -pub(crate) enum PreloadParallelism { - /// Parallelism will never be used regardless of configuration. - Disabled, - /// Parallelism will be used if configuration is eligible. - Default, -} - /// An azks is built both by the [crate::directory::Directory] and the auditor. /// However, both constructions have very minor differences, and the insert /// mode enum is used to differentiate between the two. @@ -243,6 +200,77 @@ impl AzksElementSet { } } +/// Parallelism configuration for [Azks] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)] +pub struct AzksParallelismConfig { + /// Parallelization for node insertion. + pub insertion: AzksParallelismOption, + /// Parallelization for node preloading, during insertion and auditing. + pub preload: AzksParallelismOption, +} + +impl AzksParallelismConfig { + /// The default fallback parallelism for parallel azks operations, used when + /// available parallelism cannot be determined automatically at runtime. Should be > 1 + const DEFAULT_FALLBACK_PARALLELISM: u32 = 32; + + /// Instantiate a parallelism config with no parallelism set for all fields. + pub fn disabled() -> Self { + Self { + insertion: AzksParallelismOption::Disabled, + preload: AzksParallelismOption::Disabled, + } + } +} + +impl Default for AzksParallelismConfig { + fn default() -> Self { + Self { + insertion: AzksParallelismOption::AvailableOr(Self::DEFAULT_FALLBACK_PARALLELISM), + preload: AzksParallelismOption::AvailableOr(Self::DEFAULT_FALLBACK_PARALLELISM), + } + } +} + +/// Parallelism setting for a given field in [AzksParallelismConfig]. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)] +pub enum AzksParallelismOption { + /// No parallelism. + Disabled, + /// Set parallelism to a static value. + Static(u32), + /// Dynamically derive parallelism from the number of available cores, + /// falling back to the passed value if available cores cannot be retrieved. + AvailableOr(u32), +} + +impl AzksParallelismOption { + fn get_parallel_levels(&self) -> Option { + let parallelism = match *self { + AzksParallelismOption::Disabled => return None, + AzksParallelismOption::Static(parallelism) => parallelism, + AzksParallelismOption::AvailableOr(fallback_parallelism) => { + std::thread::available_parallelism() + .map_or(fallback_parallelism, |v| v.get() as u32) + } + }; + + // We calculate the number of levels that should be executed in parallel + // to give the number of tasks closest to the available parallelism. + // The number of tasks spawned at a level is the number of leaves at + // the level. As we are using a binary tree, the number of leaves at a + // level is 2^level. Therefore, the number of levels that should be + // executed in parallel is the log2 of the number of available threads. + let parallel_levels = (parallelism as f32).log2().ceil() as u8; + + info!( + "Parallel levels requested (parallelism: {}, parallel levels: {})", + parallelism, parallel_levels + ); + Some(parallel_levels) + } +} + /// An append-only zero knowledge set, the data structure used to efficiently implement /// a auditable key directory. #[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord)] @@ -311,13 +339,13 @@ impl Azks { storage: &StorageManager, nodes: Vec, insert_mode: InsertMode, + parallelism_config: AzksParallelismConfig, ) -> Result<(), AkdError> { let azks_element_set = AzksElementSet::from(nodes); // preload the nodes that we will visit during the insertion let (fallible_load_count, time_s) = - tic_toc(self.preload_nodes(storage, &azks_element_set, PreloadParallelism::Default)) - .await; + tic_toc(self.preload_nodes(storage, &azks_element_set, parallelism_config)).await; let load_count = fallible_load_count?; if let Some(time) = time_s { info!( @@ -342,7 +370,7 @@ impl Azks { azks_element_set, self.latest_epoch, insert_mode, - get_parallel_levels(), + parallelism_config.insertion.get_parallel_levels(), ) .await?; root_node.write_to_storage(storage, is_new).await?; @@ -663,7 +691,7 @@ impl Azks { self.preload_nodes( storage, &AzksElementSet::from(lookup_nodes), - PreloadParallelism::Disabled, + AzksParallelismConfig::disabled(), ) .await } @@ -673,7 +701,7 @@ impl Azks { &self, storage: &StorageManager, azks_element_set: &AzksElementSet, - parallelism: PreloadParallelism, + parallelism_config: AzksParallelismConfig, ) -> Result { if !storage.has_cache() { info!("No cache found, skipping preload"); @@ -688,11 +716,7 @@ impl Azks { let azks_element_set = Arc::new(azks_element_set.clone()); let epoch = self.get_latest_epoch(); let node_keys = vec![NodeKey(NodeLabel::root())]; - let parallel_levels = if parallelism == PreloadParallelism::Disabled { - None - } else { - get_parallel_levels() - }; + let parallel_levels = parallelism_config.preload.get_parallel_levels(); let load_count = Azks::recursive_preload_nodes( storage, @@ -878,6 +902,7 @@ impl Azks { storage: &StorageManager, start_epoch: u64, end_epoch: u64, + parallelism_config: AzksParallelismConfig, ) -> Result { let latest_epoch = self.get_latest_epoch(); if latest_epoch < end_epoch || end_epoch <= start_epoch { @@ -897,7 +922,7 @@ impl Azks { latest_epoch, start_epoch, end_epoch, - PreloadParallelism::Default, + parallelism_config, )) .await; let load_count = fallible_load_count?; @@ -925,7 +950,7 @@ impl Azks { ep, ep + 1, 0, - get_parallel_levels(), + parallelism_config.insertion.get_parallel_levels(), ) .await?; info!("Generated audit proof for {} -> {}", ep, ep + 1); @@ -945,7 +970,7 @@ impl Azks { latest_epoch: u64, start_epoch: u64, end_epoch: u64, - parallelism: PreloadParallelism, + parallelism_config: AzksParallelismConfig, ) -> Result { if !storage.has_cache() { info!("No cache found, skipping preload"); @@ -953,11 +978,7 @@ impl Azks { } let node_keys = vec![NodeKey(NodeLabel::root())]; - let parallel_levels = if parallelism == PreloadParallelism::Disabled { - None - } else { - get_parallel_levels() - }; + let parallel_levels = parallelism_config.preload.get_parallel_levels(); let load_count = Azks::recursive_preload_audit_nodes( storage, @@ -1101,58 +1122,33 @@ impl Azks { let maybe_task: Option< tokio::task::JoinHandle, Vec), AkdError>>, > = if let Some(left_child) = node.left_child { - #[cfg(feature = "parallel_azks")] - { - if parallel_levels.map(|p| p as u64 > level).unwrap_or(false) { - // we can parallelise further! - let storage_clone = storage.clone(); - let tsk: tokio::task::JoinHandle> = - tokio::spawn(async move { - let my_storage = storage_clone; - let child_node = TreeNode::get_from_storage( - &my_storage, - &NodeKey(left_child), - latest_epoch, - ) - .await?; - Self::get_append_only_proof_helper::( - latest_epoch, - &my_storage, - child_node, - start_epoch, - end_epoch, - level + 1, - parallel_levels, - ) - .await - }); - - Some(tsk) - } else { - // Enough parallelism already, STOP IT! Don't make me get the belt! - let child_node = - TreeNode::get_from_storage(storage, &NodeKey(left_child), latest_epoch) - .await?; - let (mut inner_unchanged, mut inner_leaf) = + if parallel_levels.map(|p| p as u64 > level).unwrap_or(false) { + // we can parallelise further! + let storage_clone = storage.clone(); + let tsk: tokio::task::JoinHandle> = + tokio::spawn(async move { + let my_storage = storage_clone; + let child_node = TreeNode::get_from_storage( + &my_storage, + &NodeKey(left_child), + latest_epoch, + ) + .await?; Self::get_append_only_proof_helper::( latest_epoch, - storage, + &my_storage, child_node, start_epoch, end_epoch, level + 1, parallel_levels, ) - .await?; - unchanged.append(&mut inner_unchanged); - leaves.append(&mut inner_leaf); - None - } - } + .await + }); - #[cfg(not(feature = "parallel_azks"))] - { - // NO Parallelism, BAD! parallelism. Get your nose out of the garbage! + Some(tsk) + } else { + // Enough parallelism already, STOP IT! Don't make me get the belt! let child_node = TreeNode::get_from_storage(storage, &NodeKey(left_child), latest_epoch) .await?; @@ -1443,7 +1439,12 @@ mod tests { let mut azks2 = Azks::new::(&db2).await?; azks2 - .batch_insert_nodes::(&db2, azks_element_set, InsertMode::Directory) + .batch_insert_nodes::( + &db2, + azks_element_set, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) .await?; assert_eq!( @@ -1522,8 +1523,13 @@ mod tests { let mut azks = Azks::new::(&db).await?; for i in 0..8 { let node = nodes[7 - i]; - azks.batch_insert_nodes::(&db, vec![node], InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + vec![node], + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; } let root_digest = azks.get_root_hash::(&db).await.unwrap(); @@ -1572,7 +1578,12 @@ mod tests { let mut azks2 = Azks::new::(&db2).await?; azks2 - .batch_insert_nodes::(&db2, azks_element_set, InsertMode::Directory) + .batch_insert_nodes::( + &db2, + azks_element_set, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) .await?; assert_eq!( @@ -1614,8 +1625,13 @@ mod tests { }) .collect(); - azks.batch_insert_nodes::(&db, nodes, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + nodes, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; // expected nodes inserted: 3 leaves, 2 internal nodes // - @@ -1645,8 +1661,13 @@ mod tests { }) .collect(); - azks.batch_insert_nodes::(&db, nodes, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + nodes, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; // expected nodes inserted: 2 leaves, 1 internal node // - @@ -1738,7 +1759,10 @@ mod tests { .preload_nodes( &storage_manager, &azks_element_set, - PreloadParallelism::Default, + AzksParallelismConfig { + preload: AzksParallelismOption::Static(32), + ..Default::default() + }, ) .await .expect("Failed to preload nodes"); @@ -1753,7 +1777,7 @@ mod tests { .preload_nodes( &storage_manager, &azks_element_set, - PreloadParallelism::Disabled, + AzksParallelismConfig::disabled(), ) .await .expect("Failed to preload nodes"); @@ -1867,8 +1891,13 @@ mod tests { let database = AsyncInMemoryDatabase::new(); let db = StorageManager::new_no_cache(database); let mut azks = Azks::new::(&db).await?; - azks.batch_insert_nodes::(&db, perm, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + perm, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; // Recursively traverse the tree and check that the sibling of each node is correct let root_node = TreeNode::get_from_storage(&db, &NodeKey(NodeLabel::root()), 1).await?; @@ -1925,8 +1954,13 @@ mod tests { let database = AsyncInMemoryDatabase::new(); let db = StorageManager::new_no_cache(database); let mut azks = Azks::new::(&db).await?; - azks.batch_insert_nodes::(&db, azks_element_set.clone(), InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set.clone(), + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let proof = azks .get_membership_proof::(&db, azks_element_set[0].label) @@ -1956,8 +1990,13 @@ mod tests { let database = AsyncInMemoryDatabase::new(); let db = StorageManager::new_no_cache(database); let mut azks = Azks::new::(&db).await?; - azks.batch_insert_nodes::(&db, azks_element_set.clone(), InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set.clone(), + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let proof = azks .get_membership_proof::(&db, azks_element_set[0].label) @@ -1983,8 +2022,13 @@ mod tests { let database = AsyncInMemoryDatabase::new(); let db = StorageManager::new_no_cache(database); let mut azks = Azks::new::(&db).await?; - azks.batch_insert_nodes::(&db, azks_element_set.clone(), InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set.clone(), + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let mut proof = azks .get_membership_proof::(&db, azks_element_set[0].label) @@ -2033,8 +2077,13 @@ mod tests { ]; let mut azks = Azks::new::(&db).await?; - azks.batch_insert_nodes::(&db, azks_element_set, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let search_label = NodeLabel::new(byte_arr_from_u64(0b1111 << 60), 64); let proof = azks .get_non_membership_proof::(&db, search_label) @@ -2077,6 +2126,7 @@ mod tests { &db, azks_element_set.clone()[1..2].to_vec(), InsertMode::Directory, + AzksParallelismConfig::default(), ) .await?; let proof = azks @@ -2104,6 +2154,7 @@ mod tests { &db, azks_element_set.clone()[0..num_nodes - 1].to_vec(), InsertMode::Directory, + AzksParallelismConfig::default(), ) .await?; let proof = azks @@ -2129,6 +2180,7 @@ mod tests { &db, azks_element_set.clone()[0..num_nodes - 1].to_vec(), InsertMode::Directory, + AzksParallelismConfig::default(), ) .await?; let proof = azks @@ -2150,8 +2202,13 @@ mod tests { label: NodeLabel::new(byte_arr_from_u64(0b0), 64), value: AzksValue(EMPTY_DIGEST), }]; - azks.batch_insert_nodes::(&db, azks_element_set_1, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set_1, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let start_hash = azks.get_root_hash::(&db).await?; let azks_element_set_2: Vec = vec![AzksElement { @@ -2159,11 +2216,18 @@ mod tests { value: AzksValue(EMPTY_DIGEST), }]; - azks.batch_insert_nodes::(&db, azks_element_set_2, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set_2, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let end_hash = azks.get_root_hash::(&db).await?; - let proof = azks.get_append_only_proof::(&db, 1, 2).await?; + let proof = azks + .get_append_only_proof::(&db, 1, 2, AzksParallelismConfig::default()) + .await?; audit_verify::(vec![start_hash, end_hash], proof).await?; Ok(()) @@ -2186,8 +2250,13 @@ mod tests { }, ]; - azks.batch_insert_nodes::(&db, azks_element_set_1, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set_1, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let start_hash = azks.get_root_hash::(&db).await?; let azks_element_set_2: Vec = vec![ @@ -2201,11 +2270,18 @@ mod tests { }, ]; - azks.batch_insert_nodes::(&db, azks_element_set_2, InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set_2, + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let end_hash = azks.get_root_hash::(&db).await?; - let proof = azks.get_append_only_proof::(&db, 1, 2).await?; + let proof = azks + .get_append_only_proof::(&db, 1, 2, AzksParallelismConfig::default()) + .await?; audit_verify::(vec![start_hash, end_hash], proof).await?; Ok(()) } @@ -2220,24 +2296,41 @@ mod tests { let database = AsyncInMemoryDatabase::new(); let db = StorageManager::new_no_cache(database); let mut azks = Azks::new::(&db).await?; - azks.batch_insert_nodes::(&db, azks_element_set_1.clone(), InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set_1.clone(), + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let start_hash = azks.get_root_hash::(&db).await?; let azks_element_set_2 = gen_random_elements(num_nodes, &mut rng); - azks.batch_insert_nodes::(&db, azks_element_set_2.clone(), InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set_2.clone(), + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let middle_hash = azks.get_root_hash::(&db).await?; let azks_element_set_3: Vec = gen_random_elements(num_nodes, &mut rng); - azks.batch_insert_nodes::(&db, azks_element_set_3.clone(), InsertMode::Directory) - .await?; + azks.batch_insert_nodes::( + &db, + azks_element_set_3.clone(), + InsertMode::Directory, + AzksParallelismConfig::default(), + ) + .await?; let end_hash = azks.get_root_hash::(&db).await?; - let proof = azks.get_append_only_proof::(&db, 1, 3).await?; + let proof = azks + .get_append_only_proof::(&db, 1, 3, AzksParallelismConfig::default()) + .await?; let hashes = vec![start_hash, middle_hash, end_hash]; audit_verify::(hashes, proof).await?; diff --git a/akd/src/auditor.rs b/akd/src/auditor.rs index 7cdb6ffd..b6c85bff 100644 --- a/akd/src/auditor.rs +++ b/akd/src/auditor.rs @@ -9,6 +9,7 @@ use akd_core::configuration::Configuration; +use crate::append_only_zks::AzksParallelismConfig; use crate::AzksValue; use crate::{ append_only_zks::InsertMode, @@ -65,8 +66,13 @@ pub async fn verify_consecutive_append_only( let manager = StorageManager::new_no_cache(db); let mut azks = Azks::new::(&manager).await?; - azks.batch_insert_nodes::(&manager, proof.unchanged_nodes.clone(), InsertMode::Auditor) - .await?; + azks.batch_insert_nodes::( + &manager, + proof.unchanged_nodes.clone(), + InsertMode::Auditor, + AzksParallelismConfig::default(), + ) + .await?; let computed_start_root_hash: Digest = azks.get_root_hash::(&manager).await?; let mut verified = computed_start_root_hash == start_hash; azks.latest_epoch = end_epoch - 1; @@ -79,8 +85,13 @@ pub async fn verify_consecutive_append_only( y }) .collect(); - azks.batch_insert_nodes::(&manager, updated_inserted, InsertMode::Auditor) - .await?; + azks.batch_insert_nodes::( + &manager, + updated_inserted, + InsertMode::Auditor, + AzksParallelismConfig::default(), + ) + .await?; let computed_end_root_hash: Digest = azks.get_root_hash::(&manager).await?; verified = verified && (computed_end_root_hash == end_hash); if !verified { diff --git a/akd/src/directory.rs b/akd/src/directory.rs index a58c688c..ccdc41c6 100644 --- a/akd/src/directory.rs +++ b/akd/src/directory.rs @@ -7,7 +7,7 @@ //! Implementation of an auditable key directory -use crate::append_only_zks::{Azks, InsertMode}; +use crate::append_only_zks::{Azks, AzksParallelismConfig, InsertMode}; use crate::ecvrf::{VRFKeyStorage, VRFPublicKey}; use crate::errors::{AkdError, DirectoryError, StorageError}; use crate::helper_structs::LookupInfo; @@ -35,6 +35,7 @@ use tracing::Instrument; pub struct Directory { storage: StorageManager, vrf: V, + parallelism_config: AzksParallelismConfig, /// The cache lock guarantees that the cache is not /// flushed mid-proof generation. We allow multiple proof generations /// to occur (RwLock.read() operations can have multiple) but we want @@ -51,6 +52,7 @@ impl Clone for Directory { Self { storage: self.storage.clone(), vrf: self.vrf.clone(), + parallelism_config: self.parallelism_config, cache_lock: self.cache_lock.clone(), tc: PhantomData, } @@ -67,7 +69,11 @@ where /// Takes as input a pointer to the storage being used for this instance. /// The state is stored in the storage. #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] - pub async fn new(storage: StorageManager, vrf: V) -> Result { + pub async fn new( + storage: StorageManager, + vrf: V, + parallelism_config: AzksParallelismConfig, + ) -> Result { let azks = Directory::::get_azks_from_storage(&storage, false).await; if let Err(AkdError::Storage(StorageError::NotFound(e))) = azks { @@ -83,8 +89,9 @@ where Ok(Directory { storage, - cache_lock: Arc::new(RwLock::new(())), vrf, + parallelism_config, + cache_lock: Arc::new(RwLock::new(())), tc: PhantomData, }) } @@ -215,7 +222,12 @@ where info!("Starting inserting new leaves"); if let Err(err) = current_azks - .batch_insert_nodes::(&self.storage, update_set, InsertMode::Directory) + .batch_insert_nodes::( + &self.storage, + update_set, + InsertMode::Directory, + self.parallelism_config, + ) .await { // If we fail to do the batch-leaf insert, we should rollback the transaction so we can try again cleanly. @@ -705,7 +717,12 @@ where } else { self.storage.disable_cache_cleaning(); let result = current_azks - .get_append_only_proof::(&self.storage, audit_start_ep, audit_end_ep) + .get_append_only_proof::( + &self.storage, + audit_start_ep, + audit_end_ep, + self.parallelism_config, + ) .await; self.storage.enable_cache_cleaning(); result @@ -847,7 +864,11 @@ where /// Constructs a new instance of [ReadOnlyDirectory]. In the event that an [Azks] /// does not exist in the storage, or we're unable to retrieve it from storage, then /// a [DirectoryError] will be returned. - pub async fn new(storage: StorageManager, vrf: V) -> Result { + pub async fn new( + storage: StorageManager, + vrf: V, + parallelism_config: AzksParallelismConfig, + ) -> Result { let azks = Directory::::get_azks_from_storage(&storage, false).await; if azks.is_err() { @@ -861,8 +882,9 @@ where Ok(Self(Directory { storage, - cache_lock: Arc::new(RwLock::new(())), vrf, + parallelism_config, + cache_lock: Arc::new(RwLock::new(())), tc: PhantomData, })) } @@ -1091,7 +1113,12 @@ impl Directory(&self.storage, azks_element_set, InsertMode::Directory) + .batch_insert_nodes::( + &self.storage, + azks_element_set, + InsertMode::Directory, + self.parallelism_config, + ) .await?; // batch all the inserts into a single transactional write to storage diff --git a/akd/src/lib.rs b/akd/src/lib.rs index e99f384b..8ab27e7e 100644 --- a/akd/src/lib.rs +++ b/akd/src/lib.rs @@ -51,6 +51,7 @@ //! [`storage::memory::AsyncInMemoryDatabase`] as in-memory storage, and [`ecvrf::HardCodedAkdVRF`] as the VRF. //! The [`directory::ReadOnlyDirectory`] creates a read-only directory which cannot be updated. //! ``` +//! use akd::append_only_zks::AzksParallelismConfig; //! use akd::storage::StorageManager; //! use akd::storage::memory::AsyncInMemoryDatabase; //! use akd::ecvrf::HardCodedAkdVRF; @@ -63,7 +64,7 @@ //! let vrf = HardCodedAkdVRF{}; //! //! # tokio_test::block_on(async { -//! let mut akd = Directory::::new(storage_manager, vrf) +//! let mut akd = Directory::::new(storage_manager, vrf, AzksParallelismConfig::default()) //! .await //! .expect("Could not create a new directory"); //! # }); @@ -76,6 +77,7 @@ //! with a list of the pairs. In the following example, we derive the labels and values from strings. After publishing, //! the new epoch number and root hash are returned. //! ``` +//! # use akd::append_only_zks::AzksParallelismConfig; //! # use akd::storage::StorageManager; //! # use akd::storage::memory::AsyncInMemoryDatabase; //! # use akd::ecvrf::HardCodedAkdVRF; @@ -99,7 +101,11 @@ //! //! # tokio_test::block_on(async { //! # let vrf = HardCodedAkdVRF{}; -//! # let mut akd = Directory::::new(storage_manager, vrf).await.unwrap(); +//! # let mut akd = Directory::::new( +//! # storage_manager, +//! # vrf, +//! # AzksParallelismConfig::default() +//! # ).await.unwrap(); //! let EpochHash(epoch, root_hash) = akd.publish(entries) //! .await.expect("Error with publishing"); //! println!("Published epoch {} with root hash: {}", epoch, hex::encode(root_hash)); @@ -112,6 +118,7 @@ //! We can call [`Directory::lookup`] to generate a [`LookupProof`] that proves the correctness //! of a client lookup for an existing entry. //! ``` +//! # use akd::append_only_zks::AzksParallelismConfig; //! # use akd::storage::StorageManager; //! # use akd::storage::memory::AsyncInMemoryDatabase; //! # use akd::ecvrf::HardCodedAkdVRF; @@ -135,7 +142,11 @@ //! # //! # tokio_test::block_on(async { //! # let vrf = HardCodedAkdVRF{}; -//! # let mut akd = Directory::::new(storage_manager, vrf).await.unwrap(); +//! # let mut akd = Directory::::new( +//! # storage_manager, +//! # vrf, +//! # AzksParallelismConfig::default() +//! # ).await.unwrap(); //! # let EpochHash(epoch, root_hash) = akd.publish(entries) //! # .await.expect("Error with publishing"); //! let (lookup_proof, epoch_hash) = akd.lookup( @@ -147,6 +158,7 @@ //! To verify a valid proof, we call [`client::lookup_verify`], with respect to the root hash and //! the server's public key. //! ``` +//! # use akd::append_only_zks::AzksParallelismConfig; //! # use akd::storage::StorageManager; //! # use akd::storage::memory::AsyncInMemoryDatabase; //! # use akd::ecvrf::HardCodedAkdVRF; @@ -170,7 +182,7 @@ //! # //! # tokio_test::block_on(async { //! # let vrf = HardCodedAkdVRF{}; -//! # let mut akd = Directory::::new(storage_manager, vrf).await.unwrap(); +//! # let mut akd = Directory::::new(storage_manager, vrf, AzksParallelismConfig::default()).await.unwrap(); //! # let _ = akd.publish(entries) //! # .await.expect("Error with publishing"); //! # let (lookup_proof, epoch_hash) = akd.lookup( @@ -207,6 +219,7 @@ //! example we default to a complete history. For more information on the parameters, see the //! [History Parameters](#history-parameters) section. //! ``` +//! # use akd::append_only_zks::AzksParallelismConfig; //! # use akd::storage::StorageManager; //! # use akd::storage::memory::AsyncInMemoryDatabase; //! # use akd::ecvrf::HardCodedAkdVRF; @@ -230,7 +243,7 @@ //! # //! # tokio_test::block_on(async { //! # let vrf = HardCodedAkdVRF{}; -//! # let mut akd = Directory::::new(storage_manager, vrf).await.unwrap(); +//! # let mut akd = Directory::::new(storage_manager, vrf, AzksParallelismConfig::default()).await.unwrap(); //! # let EpochHash(epoch, root_hash) = akd.publish(entries) //! # .await.expect("Error with publishing"); //! use akd::HistoryParams; @@ -271,6 +284,7 @@ //! # let db = AsyncInMemoryDatabase::new(); //! # let storage_manager = StorageManager::new_no_cache(db); //! # let vrf = HardCodedAkdVRF{}; +//! # use akd::append_only_zks::AzksParallelismConfig; //! # use akd::EpochHash; //! # use akd::HistoryParams; //! # use akd::{AkdLabel, AkdValue}; @@ -285,7 +299,11 @@ //! # //! # tokio_test::block_on(async { //! # let vrf = HardCodedAkdVRF{}; -//! # let mut akd = Directory::::new(storage_manager, vrf).await.unwrap(); +//! # let mut akd = Directory::::new( +//! # storage_manager, +//! # vrf, +//! # AzksParallelismConfig::default() +//! # ).await.unwrap(); //! # let _ = akd.publish(entries) //! # .await.expect("Error with publishing"); //! # let _ = akd.publish( @@ -337,6 +355,7 @@ //! # let db = AsyncInMemoryDatabase::new(); //! # let storage_manager = StorageManager::new_no_cache(db); //! # let vrf = HardCodedAkdVRF{}; +//! # use akd::append_only_zks::AzksParallelismConfig; //! # use akd::EpochHash; //! # use akd::{AkdLabel, AkdValue}; //! # use akd::Digest; @@ -350,7 +369,11 @@ //! # //! # tokio_test::block_on(async { //! # let vrf = HardCodedAkdVRF{}; -//! # let mut akd = Directory::::new(storage_manager, vrf).await.unwrap(); +//! # let mut akd = Directory::::new( +//! # storage_manager, +//! # vrf, +//! # AzksParallelismConfig::default() +//! # ).await.unwrap(); //! # let EpochHash(epoch, root_hash) = akd.publish(entries) //! # .await.expect("Error with publishing"); //! // Publish new entries into a second epoch @@ -368,6 +391,7 @@ //! ``` //! The auditor then verifies the above [`AppendOnlyProof`] using [`auditor::audit_verify`]. //! ``` +//! # use akd::append_only_zks::AzksParallelismConfig; //! # use akd::storage::StorageManager; //! # use akd::storage::memory::AsyncInMemoryDatabase; //! # use akd::ecvrf::HardCodedAkdVRF; @@ -391,7 +415,11 @@ //! # //! # tokio_test::block_on(async { //! # let vrf = HardCodedAkdVRF{}; -//! # let mut akd = Directory::::new(storage_manager, vrf).await.unwrap(); +//! # let mut akd = Directory::::new( +//! # storage_manager, +//! # vrf, +//! # AzksParallelismConfig::default() +//! # ).await.unwrap(); //! # let EpochHash(epoch, root_hash) = akd.publish(entries) //! # .await.expect("Error with publishing"); //! # // Publish new entries into a second epoch @@ -459,7 +487,6 @@ //! //! Performance optimizations: //! - `greedy_lookup_preload`: Greedy loading of lookup proof nodes -//! - `parallel_azks`: Enables nodes to be fetched and inserted via multiple threads during a publish operation //! - `parallel_vrf`: Enables the VRF computations to be run in parallel //! - `preload_history`: Enables pre-loading of nodes when generating history proofs //! @@ -526,7 +553,7 @@ pub use akd_core::{ mod utils; // ========== Type re-exports which are commonly used ========== // -pub use append_only_zks::Azks; +pub use append_only_zks::{Azks, AzksParallelismConfig, AzksParallelismOption}; pub use client::HistoryVerificationParams; pub use directory::Directory; pub use helper_structs::EpochHash; diff --git a/akd/src/tests/test_core_protocol.rs b/akd/src/tests/test_core_protocol.rs index ee2e2aaf..2da31d4a 100644 --- a/akd/src/tests/test_core_protocol.rs +++ b/akd/src/tests/test_core_protocol.rs @@ -12,6 +12,7 @@ use akd_core::{configuration::Configuration, hash::DIGEST_BYTES}; use rand::{rngs::StdRng, SeedableRng}; use crate::{ + append_only_zks::AzksParallelismConfig, auditor::{audit_verify, verify_consecutive_append_only}, client::{key_history_verify, lookup_verify}, directory::Directory, @@ -29,7 +30,7 @@ async fn test_empty_tree_root_hash() -> Result<(), AkdError> let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; let akd: Directory<_, AsyncInMemoryDatabase, HardCodedAkdVRF> = - Directory::::new(storage, vrf).await?; + Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; let hash = akd.get_epoch_hash().await?.1; @@ -48,7 +49,7 @@ async fn test_simple_publish() -> Result<(), AkdError> { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // Make sure you can publish and that something so simple // won't throw errors. akd.publish(vec![(AkdLabel::from("hello"), AkdValue::from("world"))]) @@ -62,7 +63,7 @@ async fn test_complex_publish() -> Result<(), AkdError> { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; let num_entries = 10000; let mut entries = vec![]; @@ -84,7 +85,7 @@ async fn test_simple_lookup() -> Result<(), AkdError> { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // Add two labels and corresponding values to the akd akd.publish(vec![ (AkdLabel::from("hello"), AkdValue::from("world")), @@ -117,7 +118,7 @@ async fn test_small_key_history() -> Result<(), AkdError> { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // Publish the first value for the label "hello" // Epoch here will be 1 akd.publish(vec![(AkdLabel::from("hello"), AkdValue::from("world"))]) @@ -170,7 +171,7 @@ async fn test_simple_key_history() -> Result<(), AkdError> { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // Epoch 1: Add labels "hello" and "hello2" akd.publish(vec![ (AkdLabel::from("hello"), AkdValue::from("world")), @@ -316,7 +317,8 @@ async fn test_complex_verification_many_versions() -> Result< let storage_manager = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; // epoch 0 - let akd = Directory::::new(storage_manager, vrf).await?; + let akd = + Directory::::new(storage_manager, vrf, AzksParallelismConfig::default()).await?; let vrf_pk = akd.get_public_key().await?; let num_labels = 4; @@ -400,7 +402,8 @@ async fn test_limited_key_history() -> Result<(), AkdError> { let storage_manager = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; // epoch 0 - let akd = Directory::::new(storage_manager, vrf).await?; + let akd = + Directory::::new(storage_manager, vrf, AzksParallelismConfig::default()).await?; // epoch 1 akd.publish(vec![ @@ -507,7 +510,7 @@ async fn test_simple_audit() -> Result<(), AkdError> { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; akd.publish(vec![ (AkdLabel::from("hello"), AkdValue::from("world")), @@ -682,7 +685,8 @@ async fn test_simple_lookup_for_small_tree() -> Result<(), Ak let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; // epoch 0 - let akd = Directory::::new(storage, vrf.clone()).await?; + let akd = + Directory::::new(storage, vrf.clone(), AzksParallelismConfig::default()).await?; // Create a set with 2 updates, (label, value) pairs // ("hello10", "hello10") @@ -734,7 +738,8 @@ async fn test_tombstoned_key_history() -> Result<(), AkdError let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; // epoch 0 - let akd = Directory::::new(storage.clone(), vrf).await?; + let akd = + Directory::::new(storage.clone(), vrf, AzksParallelismConfig::default()).await?; // epoch 1 akd.publish(vec![(AkdLabel::from("hello"), AkdValue::from("world"))]) diff --git a/akd/src/tests/test_errors.rs b/akd/src/tests/test_errors.rs index 0366ab78..d598f18f 100644 --- a/akd/src/tests/test_errors.rs +++ b/akd/src/tests/test_errors.rs @@ -11,6 +11,7 @@ use akd_core::configuration::Configuration; use std::default::Default; +use crate::append_only_zks::AzksParallelismConfig; use crate::storage::types::KeyData; use crate::tree_node::TreeNodeWithPreviousValue; use crate::{ @@ -37,7 +38,12 @@ async fn test_directory_polling_azks_change() -> Result<(), A let storage = StorageManager::new(db, None, None, None); let vrf = HardCodedAkdVRF {}; // writer will write the AZKS record - let writer = Directory::::new(storage.clone(), vrf.clone()).await?; + let writer = Directory::::new( + storage.clone(), + vrf.clone(), + AzksParallelismConfig::default(), + ) + .await?; writer .publish(vec![ @@ -47,7 +53,8 @@ async fn test_directory_polling_azks_change() -> Result<(), A .await?; // reader will not write the AZKS but will be "polling" for AZKS changes - let reader = ReadOnlyDirectory::::new(storage, vrf).await?; + let reader = + ReadOnlyDirectory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // start the poller let (tx, mut rx) = tokio::sync::mpsc::channel(10); @@ -98,7 +105,8 @@ async fn test_directory_azks_bootstrapping() -> Result<(), Ak mock_db.expect_set().times(0); let storage = StorageManager::new_no_cache(mock_db); - let maybe_akd = Directory::::new(storage, vrf.clone()).await; + let maybe_akd = + Directory::::new(storage, vrf.clone(), AzksParallelismConfig::default()).await; assert!(maybe_akd.is_err()); // Verify that an aZKS not found error results in one being created with the Directory @@ -110,7 +118,8 @@ async fn test_directory_azks_bootstrapping() -> Result<(), Ak setup_mocked_db(&mut mock_db, &test_db); let storage = StorageManager::new_no_cache(mock_db); - let maybe_akd = Directory::::new(storage, vrf).await; + let maybe_akd = + Directory::::new(storage, vrf, AzksParallelismConfig::default()).await; assert!(maybe_akd.is_ok()); let akd = maybe_akd.expect("Failed to get create a Directory!"); @@ -159,7 +168,7 @@ async fn test_key_history_dirty_reads() -> Result<(), AkdErro let storage = StorageManager::new_no_cache(mock_db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // Ensure that we do not panic in this scenario, so we can just ignore the result. let _res = akd @@ -174,7 +183,7 @@ async fn test_read_during_publish() -> Result<(), AkdError> { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db.clone()); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // Publish once akd.publish(vec![ @@ -217,7 +226,7 @@ async fn test_read_during_publish() -> Result<(), AkdError> { // re-create the directory instance so it refreshes from storage let storage = StorageManager::new_no_cache(db.clone()); let vrf = HardCodedAkdVRF {}; - let akd = ReadOnlyDirectory::::new(storage, vrf) + let akd = ReadOnlyDirectory::::new(storage, vrf, AzksParallelismConfig::default()) .await .unwrap(); @@ -291,7 +300,8 @@ async fn test_directory_read_only_mode() -> Result<(), AkdErr let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; // There is no AZKS object in the storage layer, directory construction should fail - let akd = ReadOnlyDirectory::::new(storage, vrf).await; + let akd = + ReadOnlyDirectory::::new(storage, vrf, AzksParallelismConfig::default()).await; assert!(akd.is_err()); Ok(()) @@ -303,7 +313,8 @@ async fn test_publish_duplicate_entries() -> Result<(), AkdEr let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf.clone()).await?; + let akd = + Directory::::new(storage, vrf.clone(), AzksParallelismConfig::default()).await?; // Create a set of updates let mut updates = vec![]; @@ -336,7 +347,7 @@ async fn test_malicious_key_history() -> Result<(), AkdError> let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf).await?; + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()).await?; // Publish the first value for the label "hello" // Epoch here will be 1 akd.publish(vec![(AkdLabel::from("hello"), AkdValue::from("world"))]) @@ -401,7 +412,8 @@ async fn test_key_history_verify_malformed() -> Result<(), Ak let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf.clone()).await?; + let akd = + Directory::::new(storage, vrf.clone(), AzksParallelismConfig::default()).await?; let mut rng = rand::rngs::OsRng; for _ in 0..100 { @@ -536,7 +548,8 @@ async fn test_lookup_verify_invalid_version_number() -> Resul let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; // epoch 0 - let akd = Directory::::new(storage, vrf.clone()).await?; + let akd = + Directory::::new(storage, vrf.clone(), AzksParallelismConfig::default()).await?; // Create a set with 2 updates, (label, value) pairs // ("hello10", "hello10") diff --git a/akd/src/tests/test_preloads.rs b/akd/src/tests/test_preloads.rs index cf693be5..3ffca6b0 100644 --- a/akd/src/tests/test_preloads.rs +++ b/akd/src/tests/test_preloads.rs @@ -10,6 +10,7 @@ use akd_core::configuration::Configuration; use crate::{ + append_only_zks::AzksParallelismConfig, directory::Directory, ecvrf::HardCodedAkdVRF, errors::{AkdError, StorageError}, @@ -31,7 +32,7 @@ async fn test_publish_op_makes_no_get_requests() -> Result<() let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf) + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()) .await .expect("Failed to create directory"); @@ -61,7 +62,7 @@ async fn test_publish_op_makes_no_get_requests() -> Result<() let storage = StorageManager::new_no_cache(db2); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf) + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()) .await .expect("Failed to create directory"); diff --git a/akd_core/Cargo.toml b/akd_core/Cargo.toml index 88cbd706..1dc4c4b0 100644 --- a/akd_core/Cargo.toml +++ b/akd_core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "akd_core" -version = "0.12.0-pre.10" +version = "0.12.0-pre.11" authors = ["akd contributors"] description = "Core utilities for the akd crate" license = "MIT OR Apache-2.0" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 217cd951..79517b12 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "examples" -version = "0.12.0-pre.10" +version = "0.12.0-pre.11" authors = ["akd contributors"] license = "MIT OR Apache-2.0" edition = "2021" diff --git a/examples/src/fixture_generator/examples/example_tests.rs b/examples/src/fixture_generator/examples/example_tests.rs index 03d1f78d..78a48fee 100644 --- a/examples/src/fixture_generator/examples/example_tests.rs +++ b/examples/src/fixture_generator/examples/example_tests.rs @@ -10,6 +10,7 @@ use std::fs::File; use akd::{ + append_only_zks::AzksParallelismConfig, directory::Directory, ecvrf::HardCodedAkdVRF, storage::{memory::AsyncInMemoryDatabase, Database, StorageManager, StorageUtil}, @@ -39,9 +40,13 @@ async fn test_use_fixture() { .unwrap(); let vrf = HardCodedAkdVRF {}; let storage_manager = StorageManager::new_no_cache(db); - let akd = Directory::::new(storage_manager.clone(), vrf) - .await - .unwrap(); + let akd = Directory::::new( + storage_manager.clone(), + vrf, + AzksParallelismConfig::default(), + ) + .await + .unwrap(); // publish delta updates let delta = reader.read_delta(epochs[1]).unwrap(); diff --git a/examples/src/fixture_generator/generator.rs b/examples/src/fixture_generator/generator.rs index 97a0e5f9..51271c5f 100644 --- a/examples/src/fixture_generator/generator.rs +++ b/examples/src/fixture_generator/generator.rs @@ -13,6 +13,7 @@ use std::env; use std::fs::File; use std::io::Write; +use akd::append_only_zks::AzksParallelismConfig; use akd::directory::Directory; use akd::storage::types::DbRecord; use akd::storage::{StorageManager, StorageUtil}; @@ -129,9 +130,13 @@ pub(crate) async fn generate(args: &Args let db = akd::storage::memory::AsyncInMemoryDatabase::new(); let vrf = akd::ecvrf::HardCodedAkdVRF {}; let storage_manager = StorageManager::new_no_cache(db); - let akd = Directory::::new(storage_manager.clone(), vrf) - .await - .unwrap(); + let akd = Directory::::new( + storage_manager.clone(), + vrf, + AzksParallelismConfig::default(), + ) + .await + .unwrap(); for epoch in 1..=args.epochs { // gather specified key updates diff --git a/examples/src/mysql_demo/mod.rs b/examples/src/mysql_demo/mod.rs index 4f26b0c4..5524b421 100644 --- a/examples/src/mysql_demo/mod.rs +++ b/examples/src/mysql_demo/mod.rs @@ -7,6 +7,7 @@ //! An example tool for running AKD backed by MySQL storage +use akd::append_only_zks::AzksParallelismConfig; use akd::ecvrf::HardCodedAkdVRF; use akd::storage::StorageManager; use akd::Directory; @@ -144,9 +145,10 @@ pub(crate) async fn render_cli(args: CliArgs) -> Result<()> { if cli.memory_db { let db = akd::storage::memory::AsyncInMemoryDatabase::new(); let storage_manager = StorageManager::new_no_cache(db); - let mut directory = Directory::::new(storage_manager, vrf) - .await - .unwrap(); + let mut directory = + Directory::::new(storage_manager, vrf, AzksParallelismConfig::default()) + .await + .unwrap(); if let Some(()) = pre_process_input(&cli, None).await { return Ok(()); } @@ -175,9 +177,13 @@ pub(crate) async fn render_cli(args: CliArgs) -> Result<()> { None, Some(Duration::from_secs(15)), ); - let mut directory = Directory::::new(storage_manager.clone(), vrf) - .await - .unwrap(); + let mut directory = Directory::::new( + storage_manager.clone(), + vrf, + AzksParallelismConfig::default(), + ) + .await + .unwrap(); tokio::spawn(async move { directory_host::init_host::(&mut rx, &mut directory).await }); diff --git a/examples/src/mysql_demo/tests/test_util.rs b/examples/src/mysql_demo/tests/test_util.rs index 4e359c9b..c617226e 100644 --- a/examples/src/mysql_demo/tests/test_util.rs +++ b/examples/src/mysql_demo/tests/test_util.rs @@ -7,6 +7,7 @@ extern crate thread_id; +use akd::append_only_zks::AzksParallelismConfig; use akd::configuration::Configuration; use akd::ecvrf::VRFKeyStorage; use akd::storage::{Database, StorageManager}; @@ -144,7 +145,12 @@ pub(crate) async fn test_lookups::new(mysql_db.clone(), vrf.clone()).await; + let maybe_dir = Directory::::new( + mysql_db.clone(), + vrf.clone(), + AzksParallelismConfig::default(), + ) + .await; match maybe_dir { Err(akd_error) => panic!("Error initializing directory: {:?}", akd_error), Ok(dir) => { @@ -279,7 +285,12 @@ pub(crate) async fn directory_test_suite< } let mut root_hashes = vec![]; // create & test the directory - let maybe_dir = Directory::::new(mysql_db.clone(), vrf.clone()).await; + let maybe_dir = Directory::::new( + mysql_db.clone(), + vrf.clone(), + AzksParallelismConfig::default(), + ) + .await; match maybe_dir { Err(akd_error) => panic!("Error initializing directory: {:?}", akd_error), Ok(dir) => { diff --git a/examples/src/test_vectors/mod.rs b/examples/src/test_vectors/mod.rs index 167c0fa6..2e8fcb7d 100644 --- a/examples/src/test_vectors/mod.rs +++ b/examples/src/test_vectors/mod.rs @@ -10,6 +10,7 @@ use crate::fixture_generator::writer::yaml::YamlWriter; use crate::fixture_generator::writer::Writer; +use akd::append_only_zks::AzksParallelismConfig; use akd::directory::Directory; use akd::ecvrf::HardCodedAkdVRF; use akd::hash::DIGEST_BYTES; @@ -146,7 +147,8 @@ async fn generate_impl() -> Result { let storage_manager = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; // epoch 0 - let akd = Directory::::new(storage_manager, vrf).await?; + let akd = + Directory::::new(storage_manager, vrf, AzksParallelismConfig::default()).await?; let vrf_pk = akd.get_public_key().await?; let num_labels = 5; diff --git a/examples/src/wasm_client/mod.rs b/examples/src/wasm_client/mod.rs index 23dd2c86..6fff6ae8 100644 --- a/examples/src/wasm_client/mod.rs +++ b/examples/src/wasm_client/mod.rs @@ -185,6 +185,7 @@ pub fn lookup_verify_experimental( pub mod tests { extern crate wasm_bindgen_test; + use akd::append_only_zks::AzksParallelismConfig; use akd::errors::AkdError; use akd::storage::memory::AsyncInMemoryDatabase; use akd::storage::StorageManager; @@ -216,7 +217,7 @@ pub mod tests { let db = AsyncInMemoryDatabase::new(); let storage = StorageManager::new_no_cache(db); let vrf = HardCodedAkdVRF {}; - let akd = Directory::::new(storage, vrf) + let akd = Directory::::new(storage, vrf, AzksParallelismConfig::default()) .await .expect("Failed to construct directory");