diff --git a/akd/Cargo.toml b/akd/Cargo.toml index a628dd24..d04d1a22 100644 --- a/akd/Cargo.toml +++ b/akd/Cargo.toml @@ -14,8 +14,26 @@ readme = "../README.md" whatsapp_v1 = ["akd_core/whatsapp_v1"] experimental = ["akd_core/experimental"] -rand = ["dep:rand"] +# Default features mix (experimental + audit-proof protobuf mgmt support) +default = [ + "public_auditing", + "parallel_vrf", + "parallel_insert", + "preload_history", + "greedy_lookup_preload", + "experimental", +] + bench = ["experimental", "public_tests", "tokio/rt-multi-thread"] +# Greedy loading of lookup proof nodes +greedy_lookup_preload = [] +public_auditing = ["dep:protobuf", "akd_core/protobuf"] +# Parallelize node insertion during publish +parallel_insert = [] +# Parallelize VRF calculations during publish +parallel_vrf = ["akd_core/parallel_vrf"] +# Enable pre-loading of the nodes when generating history proofs +preload_history = [] public_tests = [ "rand", "dep:colored", @@ -25,30 +43,16 @@ public_tests = [ "akd_core/rand", "dep:paste", ] -public_auditing = ["dep:protobuf", "akd_core/protobuf"] -serde_serialization = ["dep:serde", "akd_core/serde_serialization"] +rand = ["dep:rand"] # Collect runtime metrics on db access calls + timing runtime_metrics = [] -# Parallelize VRF calculations during publish -parallel_vrf = ["akd_core/parallel_vrf"] -# Parallelize node insertion during publish -parallel_insert = [] -# Enable pre-loading of the nodes when generating history proofs -preload_history = [] +serde_serialization = ["dep:serde", "akd_core/serde_serialization"] # TESTING ONLY: Artifically slow the in-memory database (for benchmarking) slow_internal_db = [] -# Greedy loading of lookup proof nodes -greedy_lookup_preload = [] - -# Default features mix (experimental + audit-proof protobuf mgmt support) -default = [ - "public_auditing", - "parallel_vrf", - "parallel_insert", - "preload_history", - "greedy_lookup_preload", - "experimental", -] +# Tracing instrumentation +tracing = ["dep:tracing"] +# Tracing-based instrumentation +tracing_instrument = ["tracing/attributes"] [dependencies] ## Required dependencies ## @@ -63,12 +67,13 @@ log = { version = "0.4", features = ["kv_unstable"] } tokio = { version = "1", features = ["sync", "time", "rt"] } ## Optional dependencies ## -serde = { version = "1", features = ["derive"], optional = true } -rand = { version = "0.8", optional = true } colored = { version = "2", optional = true } once_cell = { version = "1", optional = true } -protobuf = { version = "3", optional = true } paste = { version = "1", optional = true } +protobuf = { version = "3", optional = true } +rand = { version = "0.8", optional = true } +serde = { version = "1", features = ["derive"], optional = true } +tracing = {version = "0.1.40", optional = true } [dev-dependencies] criterion = "0.5" diff --git a/akd/src/append_only_zks.rs b/akd/src/append_only_zks.rs index 72c077ba..214e2934 100644 --- a/akd/src/append_only_zks.rs +++ b/akd/src/append_only_zks.rs @@ -9,6 +9,7 @@ use crate::hash::EMPTY_DIGEST; use crate::helper_structs::LookupInfo; +use crate::log::{debug, info}; use crate::storage::manager::StorageManager; use crate::storage::types::StorageType; use crate::tree_node::{ @@ -22,8 +23,8 @@ use crate::{ AppendOnlyProof, AzksElement, AzksValue, Digest, Direction, MembershipProof, NodeLabel, NonMembershipProof, PrefixOrdering, SiblingProof, SingleAppendOnlyProof, SizeOf, ARITY, }; + use async_recursion::async_recursion; -use log::info; use std::cmp::Ordering; #[cfg(feature = "greedy_lookup_preload")] use std::collections::HashSet; @@ -528,7 +529,7 @@ impl Azks { } } - /// Builds all of the POSSIBLE paths along the route from root node to + /// Builds all the POSSIBLE paths along the route from root node to /// leaf node. This will be grossly over-estimating the true size of the /// tree and the number of nodes required to be fetched, however /// it allows a single batch-get call in necessary scenarios @@ -564,7 +565,7 @@ impl Azks { Ok(results) } - /// Preload for a single lookup operation by loading all of the nodes along + /// Preload for a single lookup operation by loading all the nodes along /// the direct path, and the children of resolved nodes on the path. This /// minimizes the number of batch_get operations to the storage layer which are /// called @@ -676,13 +677,14 @@ impl Azks { .collect(); } - info!("Preload of tree ({} nodes) completed", load_count); + debug!("Preload of tree ({} nodes) completed", load_count); Ok(load_count) } /// Returns the Merkle membership proof for the trie as it stood at epoch // Assumes the verifier has access to the root at epoch + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_membership_proof( &self, storage: &StorageManager, @@ -697,6 +699,7 @@ impl Azks { /// In a compressed trie, the proof consists of the longest prefix /// of the label that is included in the trie, as well as its children, to show that /// none of the children is equal to the given label. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_non_membership_proof( &self, storage: &StorageManager, @@ -759,6 +762,7 @@ impl Azks { /// **RESTRICTIONS**: Note that `start_epoch` and `end_epoch` are valid only when the following are true /// * `start_epoch` <= `end_epoch` /// * `start_epoch` and `end_epoch` are both existing epochs of this AZKS + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_append_only_proof( &self, storage: &StorageManager, @@ -802,7 +806,7 @@ impl Azks { load_count ); } - storage.log_metrics(log::Level::Info).await; + storage.log_metrics().await; let (unchanged, leaves) = Self::get_append_only_proof_helper::( latest_epoch, @@ -1027,6 +1031,7 @@ impl Azks { } /// Gets the root hash for this azks + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_root_hash( &self, storage: &StorageManager, @@ -1037,6 +1042,7 @@ impl Azks { /// Gets the root hash of the tree at the latest epoch if the passed epoch /// is equal to the latest epoch. Will return an error otherwise. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub(crate) async fn get_root_hash_safe( &self, storage: &StorageManager, diff --git a/akd/src/auditor.rs b/akd/src/auditor.rs index 40b386b9..7cdb6ffd 100644 --- a/akd/src/auditor.rs +++ b/akd/src/auditor.rs @@ -18,6 +18,7 @@ use crate::{ }; /// Verifies an audit proof, given start and end hashes for a merkle patricia tree. +#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn audit_verify( hashes: Vec, proof: AppendOnlyProof, @@ -52,7 +53,8 @@ pub async fn audit_verify( Ok(()) } -/// Helper for audit, verifies an append-only proof +/// Helper for audit, verifies an append-only proof. +#[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn verify_consecutive_append_only( proof: &SingleAppendOnlyProof, start_hash: Digest, diff --git a/akd/src/directory.rs b/akd/src/directory.rs index daa7785f..a58c688c 100644 --- a/akd/src/directory.rs +++ b/akd/src/directory.rs @@ -11,6 +11,7 @@ use crate::append_only_zks::{Azks, InsertMode}; use crate::ecvrf::{VRFKeyStorage, VRFPublicKey}; use crate::errors::{AkdError, DirectoryError, StorageError}; use crate::helper_structs::LookupInfo; +use crate::log::{error, info}; use crate::storage::manager::StorageManager; use crate::storage::types::{DbRecord, ValueState, ValueStateRetrievalFlag}; use crate::storage::Database; @@ -23,11 +24,12 @@ use crate::VersionFreshness; use akd_core::configuration::Configuration; use akd_core::utils::get_marker_versions; use akd_core::verify::history::HistoryParams; -use log::{error, info}; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Arc; use tokio::sync::RwLock; +#[cfg(feature = "tracing_instrument")] +use tracing::Instrument; /// The representation of a auditable key directory pub struct Directory { @@ -64,6 +66,7 @@ where /// Creates a new (stateless) instance of a auditable key directory. /// Takes as input a pointer to the storage being used for this instance. /// The state is stored in the storage. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn new(storage: StorageManager, vrf: V) -> Result { let azks = Directory::::get_azks_from_storage(&storage, false).await; @@ -90,8 +93,9 @@ where /// /// Note that the vector of label-value pairs should not contain any entries with duplicate labels. This /// condition is explicitly checked, and an error will be returned if this is the case. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all, fields(num_updates = updates.len())))] pub async fn publish(&self, updates: Vec<(AkdLabel, AkdValue)>) -> Result { - // The guard will be dropped at the end of the publish + // The guard will be dropped at the end of the publish operation let _guard = self.cache_lock.read().await; // Check for duplicate labels and return an error if any are encountered @@ -254,6 +258,7 @@ where /// /// Returns [Ok((LookupProof, EpochHash))] upon successful generation for the latest version /// of the target label's state. [Err(_)] otherwise + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn lookup(&self, akd_label: AkdLabel) -> Result<(LookupProof, EpochHash), AkdError> { // The guard will be dropped at the end of the proof generation let _guard = self.cache_lock.read().await; @@ -281,6 +286,7 @@ where /// from bulk lookup proof generation, as it has its own preloading operation /// /// Returns [Ok(LookupProof)] if the proof generation succeeded, [Err(_)] otherwise + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] async fn lookup_with_info( &self, current_azks: &Azks, @@ -351,6 +357,7 @@ where // TODO(eoz): Call proof generations async /// Allows efficient batch lookups by preloading necessary nodes for the lookups. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn batch_lookup( &self, akd_labels: &[AkdLabel], @@ -392,6 +399,7 @@ where Ok((lookup_proofs, root_hash)) } + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] async fn build_lookup_info(&self, latest_st: &ValueState) -> Result { let akd_label = &latest_st.username; // Need to account for the case where the latest state is @@ -419,6 +427,7 @@ where }) } + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] async fn get_lookup_info( &self, akd_label: AkdLabel, @@ -449,13 +458,21 @@ where /// this function returns all the values ever associated with it, /// and the epoch at which each value was first committed to the server state. /// It also returns the proof of the latest version being served at all times. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn key_history( &self, akd_label: &AkdLabel, params: HistoryParams, ) -> Result<(HistoryProof, EpochHash), AkdError> { // The guard will be dropped at the end of the proof generation + #[cfg(not(feature = "tracing_instrument"))] let _guard = self.cache_lock.read().await; + #[cfg(feature = "tracing_instrument")] + let _guard = self + .cache_lock + .read() + .instrument(tracing::info_span!("cache_lock.read")) + .await; let current_azks = self.retrieve_azks().await?; let current_epoch = current_azks.get_latest_epoch(); @@ -616,9 +633,24 @@ where { // acquire a singleton lock prior to flushing the cache to assert that no // cache accesses are underway (i.e. publish/proof generations/etc) + #[cfg(not(feature = "tracing_instrument"))] let _guard = self.cache_lock.write().await; + #[cfg(feature = "tracing_instrument")] + let _guard = self + .cache_lock + .write() + .instrument(tracing::info_span!("cache_lock.write")) + .await; + // flush the cache in its entirety + #[cfg(not(feature = "tracing_instrument"))] self.storage.flush_cache().await; + #[cfg(feature = "tracing_instrument")] + self.storage + .flush_cache() + .instrument(tracing::info_span!("flush_cache")) + .await; + // re-fetch the azks to load it into cache so when we release the cache lock // others will see the new AZKS loaded up and ready last = @@ -643,13 +675,21 @@ where /// Returns an [AppendOnlyProof] for the leaves inserted into the underlying tree between /// the epochs `audit_start_ep` and `audit_end_ep`. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all, fields(start_epoch = audit_start_ep, end_epoch = audit_end_ep)))] pub async fn audit( &self, audit_start_ep: u64, audit_end_ep: u64, ) -> Result { // The guard will be dropped at the end of the proof generation + #[cfg(not(feature = "tracing_instrument"))] let _guard = self.cache_lock.read().await; + #[cfg(feature = "tracing_instrument")] + let _guard = self + .cache_lock + .read() + .instrument(tracing::info_span!("cache_lock.read")) + .await; let current_azks = self.retrieve_azks().await?; let current_epoch = current_azks.get_latest_epoch(); @@ -673,10 +713,12 @@ where } /// Retrieves the [Azks] + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub(crate) async fn retrieve_azks(&self) -> Result { Directory::::get_azks_from_storage(&self.storage, false).await } + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all, fields(ignore_cache = ignore_cache)))] async fn get_azks_from_storage( storage: &StorageManager, ignore_cache: bool, @@ -704,10 +746,12 @@ where /// HELPERS /// /// Use this function to retrieve the [VRFPublicKey] for this AKD. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_public_key(&self) -> Result { Ok(self.vrf.get_vrf_public_key().await?) } + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] async fn create_single_update_proof( &self, akd_label: &AkdLabel, @@ -770,6 +814,7 @@ where } /// Gets the root hash at the current epoch. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_epoch_hash(&self) -> Result { let current_azks = self.retrieve_azks().await?; let latest_epoch = current_azks.get_latest_epoch(); @@ -823,11 +868,13 @@ where } /// Read-only access to [Directory::lookup](Directory::lookup). + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn lookup(&self, uname: AkdLabel) -> Result<(LookupProof, EpochHash), AkdError> { self.0.lookup(uname).await } /// Read-only access to [Directory::batch_lookup](Directory::batch_lookup). + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn batch_lookup( &self, unames: &[AkdLabel], @@ -836,6 +883,7 @@ where } /// Read-only access to [Directory::key_history](Directory::key_history). + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn key_history( &self, uname: &AkdLabel, @@ -845,6 +893,7 @@ where } /// Read-only access to [Directory::poll_for_azks_changes](Directory::poll_for_azks_changes). + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn poll_for_azks_changes( &self, period: tokio::time::Duration, @@ -854,6 +903,7 @@ where } /// Read-only access to [Directory::audit](Directory::audit). + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn audit( &self, audit_start_ep: u64, @@ -863,11 +913,13 @@ where } /// Read-only access to [Directory::get_epoch_hash]. + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_epoch_hash(&self) -> Result { self.0.get_epoch_hash().await } /// Read-only access to [Directory::get_public_key](Directory::get_public_key). + #[cfg_attr(feature = "tracing_instrument", tracing::instrument(skip_all))] pub async fn get_public_key(&self) -> Result { self.0.get_public_key().await } diff --git a/akd/src/lib.rs b/akd/src/lib.rs index 4a886272..df8aace0 100644 --- a/akd/src/lib.rs +++ b/akd/src/lib.rs @@ -254,7 +254,7 @@ //! First, it encodes a [`HistoryParams`]. Note that the same argument for [`HistoryParams`] that was used to generate //! the key history proof must also be used to verify the proof. Otherwise, verification may fail. //! -//! [`HistoryVerificationParams`] also allows the consumer to specify whether or not a "tombstoned" value should be +//! [`HistoryVerificationParams`] also allows the consumer to specify whether a "tombstoned" value should be //! accepted in place of a valid value for the corresponding entry. This is useful //! in scenarios where the consumer wishes to verify that a particular entry exists, //! but does not care about the value associated with it. The default behavior is to @@ -460,12 +460,16 @@ //! Performance optimizations: //! - `parallel_vrf`: Enables the VRF computations to be run in parallel //! - `parallel_insert`: Enables nodes to be inserted via multiple threads during a publish operation -//! - `preload_history`: Enable pre-loading of the nodes when generating history proofs +//! - `preload_history`: Enables pre-loading of nodes when generating history proofs //! - `greedy_lookup_preload`: Greedy loading of lookup proof nodes //! //! Benchmarking: //! - `bench`: Feature used when running benchmarks -//! - `slow_internal_db`: Artifically slow the in-memory database (for benchmarking) +//! - `slow_internal_db`: Artificially slow the in-memory database (for benchmarking) +//! +//! Tracing: +//! - `tracing`: Enables [tracing](https://docs.rs/tracing/latest/tracing/). If not set, then the library default to [log](https://docs.rs/log/latest/log/). +//! - `tracing_instrument`: Enables (tracing) instrumentation at the directory level. As a result, the library will generate spans for annotated functions. //! //! Utilities: //! - `public_auditing`: Enables the publishing of audit proofs @@ -473,9 +477,10 @@ //! in the event you wish to directly serialize the structures to transmit between library <-> storage layer or library <-> clients. If you're //! also utilizing VRFs (see (2.) below) it will additionally enable the _serde_ feature in the ed25519-dalek crate. //! - `runtime_metrics`: Collects metrics on the accesses to the storage layer -//! - `public_tests`: Will expose some internal sanity testing functionality, which is often helpful so you don't have to write all your own +//! - `public_tests`: Will expose some internal sanity testing functionality, which is often helpful, so you don't have to write all your own //! unit test cases when implementing a storage layer yourself. This helps guarantee the sanity of a given storage implementation. Should be -//! used only in unit testing scenarios by altering your Cargo.toml as such: +//! used only in unit testing scenarios by altering your Cargo.toml as such. +//! //! #![warn(missing_docs)] @@ -499,6 +504,16 @@ pub mod helper_structs; pub mod storage; pub mod tree_node; +/// Shim module to group logging-related macros more easily +/// when switching between [log](https://docs.rs/log/latest/log/) +/// and [tracing](https://docs.rs/tracing/latest/tracing/). +pub mod log { + #[cfg(not(feature = "tracing"))] + pub use log::{debug, error, info, trace, warn}; + #[cfg(feature = "tracing")] + pub use tracing::{debug, error, info, trace, warn}; +} + #[cfg(feature = "public_auditing")] pub mod local_auditing; diff --git a/akd/src/local_auditing.rs b/akd/src/local_auditing.rs index 6e0794c0..3f0b0a82 100644 --- a/akd/src/local_auditing.rs +++ b/akd/src/local_auditing.rs @@ -8,7 +8,7 @@ //! This module contains all the type conversions between internal AKD & message types //! with the protobuf types //! -//! Additionally it supports the conversion between the output from the `Directory` to +//! Additionally, it supports the conversion between the output from the `Directory` to //! public-storage safe blob types encoded with Protobuf. Download and upload //! to the blob storage medium is left to the new application crate akd_local_auditor diff --git a/akd/src/storage/cache/high_parallelism.rs b/akd/src/storage/cache/high_parallelism.rs index 8990d2d6..315ac5a2 100644 --- a/akd/src/storage/cache/high_parallelism.rs +++ b/akd/src/storage/cache/high_parallelism.rs @@ -9,16 +9,12 @@ //! objects use super::{CachedItem, DEFAULT_CACHE_CLEAN_FREQUENCY_MS, DEFAULT_ITEM_LIFETIME_MS}; +use crate::log::{debug, info}; use crate::storage::DbRecord; use crate::storage::Storable; + use akd_core::SizeOf; use dashmap::DashMap; -#[cfg(not(feature = "runtime_metrics"))] -use log::debug; -use log::info; -#[cfg(feature = "runtime_metrics")] -use log::{debug, error, warn}; - #[cfg(feature = "runtime_metrics")] use std::sync::atomic::AtomicU64; use std::sync::atomic::{AtomicBool, Ordering}; @@ -44,20 +40,14 @@ pub struct TimedCache { impl TimedCache { /// Log cache access metrics along with size information - pub fn log_metrics(&self, _level: log::Level) { + pub fn log_metrics(&self) { #[cfg(feature = "runtime_metrics")] { let hit_count = self.hit_count.swap(0, Ordering::Relaxed); let cache_size = self.map.len(); let msg = format!("Cache hit since last: {hit_count}, cached size: {cache_size} items"); - match _level { - log::Level::Trace => println!("{msg}"), - log::Level::Debug => debug!("{}", msg), - log::Level::Info => info!("{}", msg), - log::Level::Warn => warn!("{}", msg), - _ => error!("{}", msg), - } + info!("{msg}"); } } } @@ -132,7 +122,7 @@ impl TimedCache { /// Create a new timed cache instance. You can supply an optional item lifetime parameter /// or take the default (30s) and an optional memory-pressure limit, where the cache will be - /// cleaned if too much memory is being utilized + /// cleaned if too much memory is being utilized. pub fn new( o_lifetime: Option, o_memory_limit_bytes: Option, @@ -160,7 +150,7 @@ impl TimedCache { } } - /// Perform a hit-test of the cache for a given key. If successful, Some(record) will be returned + /// Perform a hit-test of the cache for a given key. If successful, Some(record) will be returned. pub async fn hit_test(&self, key: &St::StorageKey) -> Option { self.clean().await; @@ -199,7 +189,7 @@ impl TimedCache { None } - /// Put an item into the cache + /// Put an item into the cache. pub async fn put(&self, record: &DbRecord) { self.clean().await; @@ -218,7 +208,7 @@ impl TimedCache { } } - /// Put a batch of items into the cache, utilizing a single write lock + /// Put a batch of items into the cache, utilizing a single write lock. pub async fn batch_put(&self, records: &[DbRecord]) { self.clean().await; @@ -237,13 +227,13 @@ impl TimedCache { } } - /// Flush the cache + /// Flush the cache. pub async fn flush(&self) { self.map.clear(); *(self.azks.write().await) = None; } - /// Retrieve all of the cached items + /// Retrieve all the cached items. pub async fn get_all(&self) -> Vec { self.clean().await; @@ -258,13 +248,13 @@ impl TimedCache { items } - /// Disable cache-cleaning (i.e. during a transaction) + /// Disable cache-cleaning (e.g. during a transaction). pub fn disable_clean(&self) { debug!("Disabling cache cleaning"); self.can_clean.store(false, Ordering::Relaxed); } - /// Re-enable cache cleaning (i.e. when a transaction is over) + /// Re-enable cache cleaning (e.g. when a transaction is over). pub fn enable_clean(&self) { debug!("Enabling cache cleaning"); self.can_clean.store(true, Ordering::Relaxed); diff --git a/akd/src/storage/manager/mod.rs b/akd/src/storage/manager/mod.rs index bc4f2dd1..7328f5c4 100644 --- a/akd/src/storage/manager/mod.rs +++ b/akd/src/storage/manager/mod.rs @@ -9,6 +9,9 @@ //! to manage interactions with the data layer to optimize things like caching and //! transaction management +use crate::log::debug; +#[cfg(feature = "runtime_metrics")] +use crate::log::info; use crate::storage::cache::TimedCache; use crate::storage::transaction::Transaction; use crate::storage::types::DbRecord; @@ -21,9 +24,6 @@ use crate::storage::StorageError; use crate::AkdLabel; use crate::AkdValue; -use log::debug; -#[cfg(feature = "runtime_metrics")] -use log::{error, info, warn}; use std::collections::HashMap; use std::collections::HashSet; #[cfg(feature = "runtime_metrics")] @@ -81,7 +81,7 @@ unsafe impl Sync for StorageManager {} unsafe impl Send for StorageManager {} impl StorageManager { - /// Create a new storage manager with NO CACHE + /// Create a new storage manager with NO CACHE. pub fn new_no_cache(db: Db) -> Self { Self { cache: None, @@ -92,7 +92,7 @@ impl StorageManager { } } - /// Create a new storage manager with a cache utilizing the options provided (or defaults) + /// Create a new storage manager with a cache utilizing the options provided (or defaults). pub fn new( db: Db, cache_item_lifetime: Option, @@ -112,7 +112,7 @@ impl StorageManager { } } - /// Retrieve a reference to the database implementation + /// Retrieve a reference to the database implementation. #[cfg(any(test, feature = "public_tests"))] pub fn get_db(&self) -> Arc { self.db.clone() @@ -123,13 +123,13 @@ impl StorageManager { self.cache.is_some() } - /// Log metrics from the storage manager (cache, transaction, and storage hit rates etc) - pub async fn log_metrics(&self, level: log::Level) { + /// Log metrics from the storage manager (cache, transaction, and storage hit rates etc.). + pub async fn log_metrics(&self) { if let Some(cache) = &self.cache { - cache.log_metrics(level) + cache.log_metrics() } - self.transaction.log_metrics(level); + self.transaction.log_metrics(); #[cfg(feature = "runtime_metrics")] { @@ -169,19 +169,11 @@ impl StorageManager { snapshot[METRIC_WRITE_TIME] ); - match level { - // Currently logs cannot be captured unless they are - // println!. Normally Level::Trace should use the trace! macro. - log::Level::Trace => println!("{msg}"), - log::Level::Debug => debug!("{}", msg), - log::Level::Info => info!("{}", msg), - log::Level::Warn => warn!("{}", msg), - _ => error!("{}", msg), - } + info!("{msg}"); } } - /// Start an in-memory transaction of changes + /// Start an in-memory transaction of changes. pub fn begin_transaction(&self) -> bool { let started = self.transaction.begin_transaction(); @@ -194,7 +186,7 @@ impl StorageManager { started } - /// Commit a transaction in the database + /// Commit a transaction in the database. pub async fn commit_transaction(&self) -> Result { // this retrieves all the trans operations, and "de-activates" the transaction flag let records = self.transaction.commit_transaction()?; @@ -233,7 +225,7 @@ impl StorageManager { Ok(num_records as u64) } - /// Rollback a transaction + /// Rollback a transaction. pub fn rollback_transaction(&self) -> Result<(), StorageError> { self.transaction.rollback_transaction()?; // The transaction is being reverted and therefore we can re-enable @@ -244,26 +236,26 @@ impl StorageManager { Ok(()) } - /// Retrieve a flag determining if there is a transaction active + /// Retrieve a flag determining if there is a transaction active. pub fn is_transaction_active(&self) -> bool { self.transaction.is_transaction_active() } - /// Disable cache cleaning (if present) + /// Disable cache cleaning (if present). pub fn disable_cache_cleaning(&self) { if let Some(cache) = &self.cache { cache.disable_clean(); } } - /// Enable cache cleaning (if present) + /// Enable cache cleaning (if present). pub fn enable_cache_cleaning(&self) { if let Some(cache) = &self.cache { cache.enable_clean(); } } - /// Store a record in the database + /// Store a record in the database. pub async fn set(&self, record: DbRecord) -> Result<(), StorageError> { // we're in a transaction, set the item in the transaction if self.is_transaction_active() { @@ -282,7 +274,7 @@ impl StorageManager { Ok(()) } - /// Set a batch of records in the database + /// Set a batch of records in the database. pub async fn batch_set(&self, records: Vec) -> Result<(), StorageError> { if records.is_empty() { // nothing to do, save the cycles @@ -310,7 +302,7 @@ impl StorageManager { Ok(()) } - /// Retrieve a stored record directly from the data layer, ignoring any caching or transaction processes + /// Retrieve a stored record directly from the data layer, ignoring any caching or transaction processes. pub async fn get_direct( &self, id: &St::StorageKey, @@ -324,7 +316,7 @@ impl StorageManager { } /// Retrieve from the cache only, not falling through to the data-layer. Check's the transaction - /// if active + /// if active. pub async fn get_from_cache_only(&self, id: &St::StorageKey) -> Option { // we're in a transaction, meaning the object _might_ be newer and therefore we should try and read if from the transaction // log instead of the raw storage layer @@ -344,7 +336,7 @@ impl StorageManager { None } - /// Retrieve a stored record from the database + /// Retrieve a stored record from the database. pub async fn get(&self, id: &St::StorageKey) -> Result { if let Some(result) = self.get_from_cache_only::(id).await { return Ok(result); @@ -363,7 +355,7 @@ impl StorageManager { Ok(record) } - /// Retrieve a batch of records by id from the database + /// Retrieve a batch of records by id from the database. pub async fn batch_get( &self, ids: &[St::StorageKey], @@ -418,14 +410,14 @@ impl StorageManager { Ok(records) } - /// Flush the caching of objects (if present) + /// Flush the caching of objects (if present). pub async fn flush_cache(&self) { if let Some(cache) = &self.cache { cache.flush().await; } } - /// Tombstones all value states for a given AkdLabel, up to and including a given epoch + /// Tombstones all value states for a given AkdLabel, up to and including a given epoch. pub async fn tombstone_value_states( &self, username: &AkdLabel, @@ -453,7 +445,7 @@ impl StorageManager { Ok(()) } - /// Retrieve the specified user state object based on the retrieval flag from the database + /// Retrieve the specified user state object based on the retrieval flag from the database. pub async fn get_user_state( &self, username: &AkdLabel, @@ -501,7 +493,7 @@ impl StorageManager { } } - /// Retrieve all values states for a given user + /// Retrieve all values states for a given user. pub async fn get_user_data(&self, username: &AkdLabel) -> Result { let maybe_db_data = match self .tic_toc(METRIC_READ_TIME, self.db.get_user_data(username)) @@ -547,7 +539,7 @@ impl StorageManager { } } - /// Retrieve the user -> state version mapping in bulk. This is the same as get_user_state in a loop, but with less data retrieved from the storage layer + /// Retrieve the user -> state version mapping in bulk. This is the same as get_user_state in a loop, but with less data retrieved from the storage layer. pub async fn get_user_state_versions( &self, usernames: &[AkdLabel], diff --git a/akd/src/storage/transaction.rs b/akd/src/storage/transaction.rs index 34eb6830..66f62222 100644 --- a/akd/src/storage/transaction.rs +++ b/akd/src/storage/transaction.rs @@ -8,14 +8,14 @@ //! A simple in-memory transaction object to minimize data-layer operations use crate::errors::StorageError; +#[cfg(feature = "runtime_metrics")] +use crate::log::info; use crate::storage::types::DbRecord; use crate::storage::types::ValueState; use crate::storage::types::ValueStateRetrievalFlag; use crate::storage::Storable; use dashmap::DashMap; -#[cfg(feature = "runtime_metrics")] -use log::{debug, error, info, trace, warn}; use std::collections::HashMap; #[cfg(feature = "runtime_metrics")] use std::sync::atomic::AtomicU64; @@ -72,30 +72,23 @@ impl Transaction { } /// Log metrics about the current transaction instance. Metrics will be cleared after log call - pub fn log_metrics(&self, _level: log::Level) { + pub fn log_metrics(&self) { #[cfg(feature = "runtime_metrics")] { let r = self.num_reads.swap(0, Ordering::Relaxed); let w = self.num_writes.swap(0, Ordering::Relaxed); let msg = format!("Transaction writes: {w}, Transaction reads: {r}"); - - match _level { - log::Level::Trace => trace!("{}", msg), - log::Level::Debug => debug!("{}", msg), - log::Level::Info => info!("{}", msg), - log::Level::Warn => warn!("{}", msg), - _ => error!("{}", msg), - } + info!("{msg}"); } } - /// Start a transaction in the storage layer + /// Start a transaction in the storage layer. pub fn begin_transaction(&self) -> bool { !self.active.swap(true, Ordering::Relaxed) } - /// Commit a transaction in the storage layer + /// Commit a transaction in the storage layer. pub fn commit_transaction(&self) -> Result, StorageError> { if !self.active.load(Ordering::Relaxed) { return Err(StorageError::Transaction( @@ -120,7 +113,7 @@ impl Transaction { Ok(records) } - /// Rollback a transaction + /// Rollback a transaction. pub fn rollback_transaction(&self) -> Result<(), StorageError> { if !self.active.load(Ordering::Relaxed) { return Err(StorageError::Transaction( @@ -135,12 +128,12 @@ impl Transaction { Ok(()) } - /// Retrieve a flag determining if there is a transaction active + /// Retrieve a flag determining if there is a transaction active. pub fn is_transaction_active(&self) -> bool { self.active.load(Ordering::Relaxed) } - /// Hit test the current transaction to see if it is currently active + /// Hit test the current transaction to see if it is currently active. pub fn get(&self, key: &St::StorageKey) -> Option { let bin_id = St::get_full_binary_key_id(key); @@ -152,7 +145,7 @@ impl Transaction { out } - /// Set a batch of values into the cache + /// Set a batch of values into the cache. pub fn batch_set(&self, records: &[DbRecord]) { for record in records { self.mods @@ -165,7 +158,7 @@ impl Transaction { } } - /// Set a value in the transaction to be committed at transaction commit time + /// Set a value in the transaction to be committed at transaction commit time. pub fn set(&self, record: &DbRecord) { let bin_id = record.get_full_binary_id(); @@ -177,9 +170,9 @@ impl Transaction { } } - /// Retrieve all of the user data for a given username + /// Retrieve all the user data for a given username. /// - /// Note: This is a FULL SCAN operation of the entire transaction log + /// Note: This is a FULL SCAN operation of the entire transaction log. pub fn get_users_data( &self, usernames: &[crate::AkdLabel], @@ -215,9 +208,9 @@ impl Transaction { results } - /// Retrieve the user state given the specified value state retrieval mode + /// Retrieve the user state given the specified value state retrieval mode. /// - /// Note: This is a FULL SCAN operation of the entire transaction log + /// Note: This is a FULL SCAN operation of the entire transaction log. #[allow(clippy::let_and_return)] pub fn get_user_state( &self, @@ -236,9 +229,9 @@ impl Transaction { out } - /// Retrieve the batch of specified users user_state's based on the filtering flag provided + /// Retrieve the batch of specified users user_state's based on the filtering flag provided. /// - /// Note: This is a FULL SCAN operation of the entire transaction log + /// Note: This is a FULL SCAN operation of the entire transaction log. pub fn get_users_states( &self, usernames: &[crate::AkdLabel], @@ -260,7 +253,7 @@ impl Transaction { } /// Find the appropriate item of the cached value states for a given user. This assumes that the incoming vector - /// is already sorted in ascending epoch order + /// is already sorted in ascending epoch order. fn find_appropriate_item( intermediate: Vec, flag: ValueStateRetrievalFlag, diff --git a/akd/src/storage/types.rs b/akd/src/storage/types.rs index 9f6ed190..a5633316 100644 --- a/akd/src/storage/types.rs +++ b/akd/src/storage/types.rs @@ -147,7 +147,7 @@ pub enum ValueStateRetrievalFlag { // == New Data Retrieval Logic == // -/// This needs to be PUBLIC public, since anyone implementing a data-layer will need +/// This needs to be public, since anyone implementing a data-layer will need /// to be able to access this and all the internal types #[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] #[cfg_attr( diff --git a/examples/src/mysql_demo/mod.rs b/examples/src/mysql_demo/mod.rs index d4b6ec0e..4f26b0c4 100644 --- a/examples/src/mysql_demo/mod.rs +++ b/examples/src/mysql_demo/mod.rs @@ -248,7 +248,7 @@ async fn process_input( assert_eq!(Ok(()), storage.batch_set(data).await); let toc: Duration = Instant::now() - tic; println!("Insert batch of {} items in {} ms", len, toc.as_millis()); - storage.log_metrics(log::Level::Warn).await; + storage.log_metrics().await; } else { error!("Command available with MySQL db's only"); } diff --git a/examples/src/mysql_demo/tests/mysql_tests.rs b/examples/src/mysql_demo/tests/mysql_tests.rs index fb5b9cf5..992c59a8 100644 --- a/examples/src/mysql_demo/tests/mysql_tests.rs +++ b/examples/src/mysql_demo/tests/mysql_tests.rs @@ -54,7 +54,7 @@ async fn test_directory_operations() { let storage_manager = StorageManager::new_no_cache(mysql_db.clone()); directory_test_suite::(&storage_manager, 50, &vrf).await; - storage_manager.log_metrics(log::Level::Trace).await; + storage_manager.log_metrics().await; // clean the test infra if let Err(mysql_async::Error::Server(error)) = storage_manager.get_db().drop_tables().await @@ -111,7 +111,7 @@ async fn test_directory_operations_with_caching() { let storage_manager = StorageManager::new(mysql_db.clone(), None, None, None); directory_test_suite::(&storage_manager, 50, &vrf).await; - storage_manager.log_metrics(log::Level::Trace).await; + storage_manager.log_metrics().await; // clean the test infra if let Err(mysql_async::Error::Server(error)) = storage_manager.get_db().drop_tables().await diff --git a/examples/src/mysql_demo/tests/test_util.rs b/examples/src/mysql_demo/tests/test_util.rs index 74544607..4e359c9b 100644 --- a/examples/src/mysql_demo/tests/test_util.rs +++ b/examples/src/mysql_demo/tests/test_util.rs @@ -246,7 +246,7 @@ pub(crate) async fn test_lookups(mysql_db: &StorageManager) { - mysql_db.log_metrics(Level::Warn).await; + mysql_db.log_metrics().await; mysql_db.flush_cache().await; } @@ -343,13 +343,13 @@ pub(crate) async fn directory_test_suite< // Perform an audit proof from 1u64 -> 2u64 - mysql_db.log_metrics(log::Level::Info).await; + mysql_db.log_metrics().await; log::warn!("Beginning audit proof generation"); mysql_db.flush_cache().await; match dir.audit(1u64, 2u64).await { Err(error) => panic!("Error perform audit proof retrieval {:?}", error), Ok(proof) => { - mysql_db.log_metrics(log::Level::Info).await; + mysql_db.log_metrics().await; log::warn!("Done with audit proof generation"); let start_root_hash = root_hashes[0]; let end_root_hash = root_hashes[1];