From 07cbdd05c927a817a46ee50d9c2c49be947ed529 Mon Sep 17 00:00:00 2001 From: Pushkar Mishra Date: Sat, 25 May 2024 16:35:17 +0530 Subject: [PATCH] `fs-storage`: Implement simple conflict resolution using Monoids (#26) --------- Signed-off-by: Tarek Signed-off-by: Pushkar Mishra Co-authored-by: Kirill Taran Co-authored-by: Tarek --- fs-storage/src/base_storage.rs | 18 ++++- fs-storage/src/file_storage.rs | 140 +++++++++++++++++++++++---------- fs-storage/src/lib.rs | 1 + fs-storage/src/monoid.rs | 48 +++++++++++ 4 files changed, 163 insertions(+), 44 deletions(-) create mode 100644 fs-storage/src/monoid.rs diff --git a/fs-storage/src/base_storage.rs b/fs-storage/src/base_storage.rs index 87c7708c..ea8652af 100644 --- a/fs-storage/src/base_storage.rs +++ b/fs-storage/src/base_storage.rs @@ -8,10 +8,16 @@ pub trait BaseStorage: AsRef> { /// Remove an entry from the internal mapping. fn remove(&mut self, id: &K) -> Result<()>; - /// Check if the storage is up-to-date, - /// i.e. that the internal mapping is consistent - /// with the data in the filesystem. - fn is_storage_updated(&self) -> Result; + /// Determine if in-memory model + /// or the underlying storage requires syncing. + /// This is a quick method checking timestamps + /// of modification of both model and storage. + /// + /// Returns: + /// - `Ok(true)` if the on-disk data and in-memory data are not in sync. + /// - `Ok(false)` if the on-disk data and in-memory data are in sync. + /// - `Err(ArklibError::Storage)` in case of any error retrieving the file metadata. + fn needs_syncing(&self) -> Result; /// Scan and load the key-value mapping /// from pre-configured location in the filesystem. @@ -24,4 +30,8 @@ pub trait BaseStorage: AsRef> { /// Remove all persisted data /// by pre-configured location in the file-system. fn erase(&self) -> Result<()>; + + /// Merge two storages instances + /// and write the result to the filesystem. + fn merge_from(&mut self, other: impl AsRef>) -> Result<()>; } diff --git a/fs-storage/src/file_storage.rs b/fs-storage/src/file_storage.rs index d9096400..7c652064 100644 --- a/fs-storage/src/file_storage.rs +++ b/fs-storage/src/file_storage.rs @@ -8,6 +8,7 @@ use std::{ }; use crate::base_storage::BaseStorage; +use crate::monoid::Monoid; use crate::utils::read_version_2_fs; use data_error::{ArklibError, Result}; @@ -30,7 +31,7 @@ where { label: String, path: PathBuf, - timestamp: SystemTime, + modified: SystemTime, data: FileStorageData, } @@ -57,26 +58,20 @@ where V: Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr, + + std::str::FromStr + + Monoid, { /// Create a new file storage with a diagnostic label and file path pub fn new(label: String, path: &Path) -> Self { - let mut file_storage = Self { + Self { label, path: PathBuf::from(path), - timestamp: SystemTime::now(), + modified: SystemTime::now(), data: FileStorageData { version: STORAGE_VERSION, entries: BTreeMap::new(), }, - }; - - // Load the data from the file - file_storage.data.entries = match file_storage.read_fs() { - Ok(data) => data, - Err(_) => BTreeMap::new(), - }; - file_storage + } } } @@ -90,14 +85,13 @@ where V: Clone + serde::Serialize + serde::de::DeserializeOwned - + std::str::FromStr, + + std::str::FromStr + + Monoid, { /// Set a key-value pair in the storage fn set(&mut self, key: K, value: V) { self.data.entries.insert(key, value); - self.timestamp = std::time::SystemTime::now(); - self.write_fs() - .expect("Failed to write data to disk"); + self.modified = std::time::SystemTime::now(); } /// Remove a key-value pair from the storage given a key @@ -105,26 +99,34 @@ where self.data.entries.remove(id).ok_or_else(|| { ArklibError::Storage(self.label.clone(), "Key not found".to_owned()) })?; - self.timestamp = std::time::SystemTime::now(); + self.modified = std::time::SystemTime::now(); self.write_fs() .expect("Failed to remove data from disk"); Ok(()) } - /// Compare the timestamp of the storage file with the timestamp of the storage instance - /// to determine if the storage file has been updated. - fn is_storage_updated(&self) -> Result { - let file_timestamp = fs::metadata(&self.path)?.modified()?; - let file_time_secs = file_timestamp - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - let self_time_secs = self - .timestamp - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - Ok(file_time_secs > self_time_secs) + /// Compare the timestamp of the storage file + /// with the timestamp of the in-memory storage update + /// to determine if either of the two requires syncing. + fn needs_syncing(&self) -> Result { + match fs::metadata(&self.path) { + Ok(metadata) => { + let get_duration_since_epoch = |time: SystemTime| { + time.duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + }; + + let fs_modified = + get_duration_since_epoch(metadata.modified()?); + let self_modified = get_duration_since_epoch(self.modified); + + Ok(fs_modified != self_modified) + } + Err(e) => { + Err(ArklibError::Storage(self.label.clone(), e.to_string())) + } + } } /// Read the data from the storage file @@ -146,7 +148,7 @@ where "Version 2 storage format detected for {}", self.label ); - self.timestamp = fs::metadata(&self.path)?.modified()?; + self.modified = fs::metadata(&self.path)?.modified()?; return Ok(data); } Err(_) => { @@ -174,7 +176,7 @@ where ), )); } - self.timestamp = fs::metadata(&self.path)?.modified()?; + self.modified = fs::metadata(&self.path)?.modified()?; Ok(data.entries) } @@ -194,10 +196,10 @@ where writer.write_all(value_data.as_bytes())?; let new_timestamp = fs::metadata(&self.path)?.modified()?; - if new_timestamp == self.timestamp { + if new_timestamp == self.modified { return Err("Timestamp has not been updated".into()); } - self.timestamp = new_timestamp; + self.modified = new_timestamp; log::info!( "{} {} entries have been written", @@ -213,6 +215,24 @@ where ArklibError::Storage(self.label.clone(), err.to_string()) }) } + + /// Merge the data from another storage instance into this storage instance + fn merge_from(&mut self, other: impl AsRef>) -> Result<()> + where + V: Monoid, + { + let other_entries = other.as_ref(); + for (key, value) in other_entries { + if let Some(existing_value) = self.data.entries.get(key) { + let resolved_value = V::combine(existing_value, value); + self.set(key.clone(), resolved_value); + } else { + self.set(key.clone(), value.clone()) + } + } + self.modified = std::time::SystemTime::now(); + Ok(()) + } } impl AsRef> for FileStorage @@ -263,7 +283,7 @@ mod tests { file_storage.set("key1".to_string(), "value1".to_string()); file_storage.set("key1".to_string(), "value2".to_string()); - + assert!(file_storage.write_fs().is_ok()); assert_eq!(storage_path.exists(), true); if let Err(err) = file_storage.erase() { @@ -280,19 +300,59 @@ mod tests { let mut file_storage = FileStorage::new("TestStorage".to_string(), &storage_path); - + file_storage.write_fs().unwrap(); + assert_eq!(file_storage.needs_syncing().unwrap(), false); + std::thread::sleep(std::time::Duration::from_secs(1)); file_storage.set("key1".to_string(), "value1".to_string()); - assert_eq!(file_storage.is_storage_updated().unwrap(), false); + assert_eq!(file_storage.needs_syncing().unwrap(), true); + file_storage.write_fs().unwrap(); + assert_eq!(file_storage.needs_syncing().unwrap(), false); std::thread::sleep(std::time::Duration::from_secs(1)); // External data manipulation let mut mirror_storage = FileStorage::new("TestStorage".to_string(), &storage_path); + assert_eq!(mirror_storage.needs_syncing().unwrap(), true); + std::thread::sleep(std::time::Duration::from_secs(1)); + mirror_storage.read_fs().unwrap(); + assert_eq!(mirror_storage.needs_syncing().unwrap(), false); mirror_storage.set("key1".to_string(), "value3".to_string()); - assert_eq!(mirror_storage.is_storage_updated().unwrap(), false); + assert_eq!(mirror_storage.needs_syncing().unwrap(), true); + mirror_storage.write_fs().unwrap(); + assert_eq!(mirror_storage.needs_syncing().unwrap(), false); + + assert_eq!(file_storage.needs_syncing().unwrap(), true); + file_storage.read_fs().unwrap(); + assert_eq!(file_storage.needs_syncing().unwrap(), false); + assert_eq!(mirror_storage.needs_syncing().unwrap(), false); + } + + #[test] + fn test_monoid_combine() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let storage_path1 = temp_dir.path().join("teststorage1.txt"); + let storage_path2 = temp_dir.path().join("teststorage2.txt"); + + let mut file_storage_1 = + FileStorage::new("TestStorage1".to_string(), &storage_path1); + + let mut file_storage_2 = + FileStorage::new("TestStorage2".to_string(), &storage_path2); + + file_storage_1.set("key1".to_string(), 2); + file_storage_1.set("key2".to_string(), 6); + + file_storage_2.set("key1".to_string(), 3); + file_storage_2.set("key3".to_string(), 9); - assert_eq!(file_storage.is_storage_updated().unwrap(), true); + file_storage_1 + .merge_from(&file_storage_2) + .unwrap(); + assert_eq!(file_storage_1.as_ref().get("key1"), Some(&3)); + assert_eq!(file_storage_1.as_ref().get("key2"), Some(&6)); + assert_eq!(file_storage_1.as_ref().get("key3"), Some(&9)); } } diff --git a/fs-storage/src/lib.rs b/fs-storage/src/lib.rs index a6a6b03c..d31b27b6 100644 --- a/fs-storage/src/lib.rs +++ b/fs-storage/src/lib.rs @@ -1,5 +1,6 @@ pub mod base_storage; pub mod file_storage; +pub mod monoid; mod utils; pub const ARK_FOLDER: &str = ".ark"; diff --git a/fs-storage/src/monoid.rs b/fs-storage/src/monoid.rs new file mode 100644 index 00000000..b2acf546 --- /dev/null +++ b/fs-storage/src/monoid.rs @@ -0,0 +1,48 @@ +// Currently, we have three structures: Tags (HashSet), Properties (HashSet), Score (int). +// In fact, HashSet already implements a union function, +// so only a special function for integers is needed. +// CRDTs can be considered later when we need to add structures that require +// more powerful combine semantics. + +// Trait defining a Monoid, which represents a mathematical structure with an identity element and an associative binary operation. +pub trait Monoid { + // Returns the neutral element of the monoid. + fn neutral() -> V; + + // Combines two elements of the monoid into a single element. + fn combine(a: &V, b: &V) -> V; + + // Combines multiple elements of the monoid into a single element. + // Default implementation uses `neutral()` as the initial accumulator and `combine()` for folding. + fn combine_all>(values: I) -> V { + values + .into_iter() + .fold(Self::neutral(), |acc, val| Self::combine(&acc, &val)) + } +} + +impl Monoid for i32 { + fn neutral() -> i32 { + 0 + } + + fn combine(a: &i32, b: &i32) -> i32 { + if a > b { + *a + } else { + *b + } + } +} + +impl Monoid for String { + fn neutral() -> String { + String::new() + } + + fn combine(a: &String, b: &String) -> String { + let mut result = a.clone(); + result.push_str(b); + result + } +}