diff --git a/fs-storage/Cargo.toml b/fs-storage/Cargo.toml index c6ffdd18..de74b108 100644 --- a/fs-storage/Cargo.toml +++ b/fs-storage/Cargo.toml @@ -17,12 +17,13 @@ serde_json = "1.0.82" serde = { version = "1.0.138", features = ["derive"] } jni = { version = "0.21.1", optional = true } jnix = { version = "0.5.1", features = ["derive"], optional = true } - data-error = { path = "../data-error" } [dev-dependencies] anyhow = "1.0.81" +quickcheck = { version = "1.0.3", features = ["use_logging"] } +quickcheck_macros = "1.0.0" tempdir = "0.3.7" [features] diff --git a/fs-storage/README.md b/fs-storage/README.md index cfd68569..6401dce6 100644 --- a/fs-storage/README.md +++ b/fs-storage/README.md @@ -14,22 +14,26 @@ File system storage implementation for writing key value pairs to disk. } ``` +- Select between two storage options: + - file: Stores multiple key-value pairs in a single file. + - folder: Stores each value in a separate file within a folder. + - Run Write Command ```bash -cargo run --example cli write /tmp/z test.json +cargo run --example cli [file|folder] write /tmp/z test.json ``` Alternatively, you can directly provide the input data as a comma-separated list of key-value pairs ```bash -cargo run --example cli write /tmp/z a:1,b:2,c:3 +cargo run --example cli [file|folder] write /tmp/z a:1,b:2,c:3 ``` - Run Read Command ```bash -cargo run --example cli read /tmp/z key1,key2 +cargo run --example cli [file|folder] read /tmp/z key1,key2 ``` - Get Output @@ -42,5 +46,5 @@ key2: value2 - To get all key value pairs ```bash -cargo run --example cli read /tmp/z +cargo run --example cli [file|folder] read /tmp/z ``` diff --git a/fs-storage/examples/cli.rs b/fs-storage/examples/cli.rs index 8214f258..40d373a6 100644 --- a/fs-storage/examples/cli.rs +++ b/fs-storage/examples/cli.rs @@ -1,5 +1,8 @@ use anyhow::{Context, Result}; -use fs_storage::{base_storage::BaseStorage, file_storage::FileStorage}; +use fs_storage::{ + base_storage::BaseStorage, file_storage::FileStorage, + folder_storage::FolderStorage, +}; use serde_json::Value; use std::{env, fs, path::Path}; @@ -13,26 +16,41 @@ fn run() -> Result<()> { let args: Vec = env::args().collect(); if args.len() < 3 { println!("Usage:"); - println!(" cargo run --example cli write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); - println!(" cargo run --example cli read "); + println!(" cargo run --example cli [file|folder] write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); + println!(" cargo run --example cli [file|folder] read "); return Ok(()); } - let command = &args[1]; - let path = &args[2]; - match command.as_str() { - "read" => read_command(&args, path), - "write" => write_command(&args, path), + let storage_type = &args[1]; + let command = &args[2]; + let path = &args[3]; + match storage_type.as_str() { + "file" => match command.as_str() { + "read" => file_read_command(&args, path), + "write" => file_write_command(&args, path), + _ => { + eprintln!("Invalid command. Use 'read' or 'write'."); + Ok(()) + } + }, + "folder" => match command.as_str() { + "read" => folder_read_command(&args, path), + "write" => folder_write_command(&args, path), + _ => { + eprintln!("Invalid command. Use 'read' or 'write'."); + Ok(()) + } + }, _ => { - eprintln!("Invalid command. Use 'read' or 'write'."); + eprintln!("Invalid storage. Use 'file' or 'folder'."); Ok(()) } } } -fn read_command(args: &[String], path: &str) -> Result<()> { +fn file_read_command(args: &[String], path: &str) -> Result<()> { let keys = if args.len() > 3 { - args[3] + args[4] .split(',') .map(|s| s.to_string()) .collect::>() @@ -63,13 +81,13 @@ fn read_command(args: &[String], path: &str) -> Result<()> { Ok(()) } -fn write_command(args: &[String], path: &str) -> Result<()> { +fn file_write_command(args: &[String], path: &str) -> Result<()> { if args.len() < 4 { - println!("Usage: cargo run --example cli write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); + println!("Usage: cargo run --example cli file write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); return Ok(()); } - let content = &args[3]; + let content = &args[4]; // Check if the content is a JSON file path let content_json = Path::new(content) .extension() @@ -107,5 +125,87 @@ fn write_command(args: &[String], path: &str) -> Result<()> { } } } + fs.write_fs().expect("Failed to write to file"); + Ok(()) +} + +fn folder_read_command(args: &[String], path: &str) -> Result<()> { + let keys = if args.len() > 4 { + args[4] + .split(',') + .map(|s| s.to_string()) + .collect::>() + } else { + vec![] + }; + + let mut fs: FolderStorage = + FolderStorage::new("cli".to_string(), Path::new(path)) + .context("Failed to create FolderStorage")?; + + let map = fs + .read_fs() + .expect("No Data is present on this path"); + if keys.is_empty() { + for (key, value) in map { + println!("{}: {}", key, value); + } + } + for key in &keys { + if let Some(value) = map.get(key) { + println!("{}: {}", key, value); + } else { + eprintln!("Key '{}' not found", key); + } + } + + Ok(()) +} + +fn folder_write_command(args: &[String], path: &str) -> Result<()> { + if args.len() < 4 { + println!("Usage: cargo run --example cli folder write [JSON_FILE_PATH | KEY_VALUE_PAIRS]"); + return Ok(()); + } + + let content = &args[4]; + // Check if the content is a JSON file path + let content_json = Path::new(content) + .extension() + .map_or(false, |ext| ext == "json"); + + let mut fs: FolderStorage = + FolderStorage::new("cli".to_string(), Path::new(path)) + .context("Failed to create FolderStorage")?; + if content_json { + let content = + fs::read_to_string(content).context("Failed to read JSON file")?; + let json: Value = + serde_json::from_str(&content).context("Failed to parse JSON")?; + if let Value::Object(object) = json { + for (key, value) in object { + if let Value::String(value_str) = value { + fs.set(key, value_str); + } else { + println!( + "Warning: Skipping non-string value for key '{}'", + key + ); + } + } + } else { + println!("JSON value is not an object"); + return Ok(()); + } + } else { + let pairs = content.split(','); + for pair in pairs { + let kv: Vec<&str> = pair.split(':').collect(); + if kv.len() == 2 { + fs.set(kv[0].to_string(), kv[1].to_string()); + } + } + } + fs.write_fs().expect("Failed to write to folder"); Ok(()) } diff --git a/fs-storage/src/base_storage.rs b/fs-storage/src/base_storage.rs index 4eba03b0..73db729f 100644 --- a/fs-storage/src/base_storage.rs +++ b/fs-storage/src/base_storage.rs @@ -54,7 +54,7 @@ pub trait BaseStorage: AsRef> { fn remove(&mut self, id: &K) -> Result<()>; /// Get [`SyncStatus`] of the storage - fn sync_status(&self) -> Result; + fn sync_status(&mut self) -> Result; /// Sync the in-memory storage with the storage on disk fn sync(&mut self) -> Result<()>; diff --git a/fs-storage/src/file_storage.rs b/fs-storage/src/file_storage.rs index 9b762bbd..26a50969 100644 --- a/fs-storage/src/file_storage.rs +++ b/fs-storage/src/file_storage.rs @@ -191,7 +191,7 @@ where /// Compare the timestamp of the storage file /// with the timestamp of the in-memory storage and the last written /// to time to determine if either of the two requires syncing. - fn sync_status(&self) -> Result { + fn sync_status(&mut self) -> Result { let file_updated = fs::metadata(&self.path)?.modified()?; // Determine the synchronization status based on the modification times diff --git a/fs-storage/src/folder_storage.rs b/fs-storage/src/folder_storage.rs new file mode 100644 index 00000000..68892a49 --- /dev/null +++ b/fs-storage/src/folder_storage.rs @@ -0,0 +1,808 @@ +use std::collections::BTreeSet; +use std::fs::{self, File}; +use std::io::Write; +use std::time::SystemTime; +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; + +use crate::base_storage::{BaseStorage, SyncStatus}; +use crate::monoid::Monoid; +use data_error::{ArklibError, Result}; + +/// Represents a folder storage system that persists data to disk. +pub struct FolderStorage { + /// Label for logging + label: String, + /// Path to the underlying folder where data is persisted + path: PathBuf, + /// `timestamps.0` can be used to track the last time a file was modified in memory. + /// where the key is the path of the file inside the directory. + /// + /// `timestamps.1` can be used to track the last time a file written or read from disk. + /// where the key is the path of the file inside the directory. + timestamps: BTreeMap, + data: BTreeMap, + /// Temporary store for deleted keys until storage is synced + deleted_keys: BTreeSet, +} + +impl AsRef> for FolderStorage +where + K: Ord, +{ + fn as_ref(&self) -> &BTreeMap { + &self.data + } +} + +impl FolderStorage +where + K: Ord + + Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr + + std::fmt::Display, + V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, +{ + /// Create a new folder storage with a diagnostic label and directory path + /// Note: if the folder storage already exists, the data will be read from the folder + /// without overwriting it. + pub fn new(label: String, path: &Path) -> Result { + let mut storage = Self { + label, + path: PathBuf::from(path), + timestamps: BTreeMap::new(), + data: BTreeMap::new(), + deleted_keys: BTreeSet::new(), + }; + + if Path::exists(path) { + storage.read_fs()?; + } + + Ok(storage) + } + + /// Load mapping from folder storage + fn load_fs_data(&mut self) -> Result<()> { + if !self.path.exists() { + return Err(ArklibError::Storage( + self.label.clone(), + "Folder does not exist".to_owned(), + )); + } + + if !self.path.is_dir() { + return Err(ArklibError::Storage( + self.label.clone(), + "Path is not a directory".to_owned(), + )); + } + + let mut data = BTreeMap::new(); + + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path + .extension() + .map_or(false, |ext| ext == "json") + { + let key: K = extract_key_from_file_path(&self.label, &path)?; + let file = File::open(&path)?; + let value: V = + serde_json::from_reader(file).map_err(|err| { + ArklibError::Storage( + self.label.clone(), + err.to_string(), + ) + })?; + + data.insert(key.clone(), value); + + let modified = fs::metadata(path) + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to fetch metadata".to_owned(), + ) + })? + .modified() + .map_err(|_| { + ArklibError::Storage( + self.label.clone(), + "Failed to fetch modified time from metadata" + .to_owned(), + ) + })?; + + self.timestamps.insert(key, (modified, modified)); + } + } + + self.data = data; + Ok(()) + } + + /// Resolves discrepancies between in-memory data and disk data by combining or + /// overwriting values based on which version is more recent, ensuring consistency. + fn resolve_divergence(&mut self) -> Result<()> { + let new_data = FolderStorage::new("new_data".into(), &self.path)?; + + for (key, new_value) in new_data.data.iter() { + if let Some(existing_value) = self.data.get(key) { + let existing_value_updated = self + .timestamps + .get(key) + .map(|timestamp| timestamp.0 > timestamp.1) + .unwrap_or(false); + + // Use monoid to combine value for the given key + // if the memory and disk have diverged + if existing_value_updated { + let resolved_value = V::combine(existing_value, new_value); + self.data.insert(key.clone(), resolved_value); + } else { + self.data.insert(key.clone(), new_value.clone()); + } + } else { + self.data.insert(key.clone(), new_value.clone()); + } + } + + Ok(()) + } + + /// Remove files from disk that are not present in memory + fn remove_files_not_in_ram(&mut self) -> Result<()> { + for key in self.deleted_keys.iter() { + let file_path = self.path.join(format!("{}.json", key)); + if file_path.exists() { + fs::remove_file(&file_path).unwrap_or_else(|_| { + panic!("Failed to delete file at {:?}", file_path) + }) + } + } + Ok(()) + } +} + +impl BaseStorage for FolderStorage +where + K: Ord + + Clone + + serde::Serialize + + serde::de::DeserializeOwned + + std::str::FromStr + + std::fmt::Display, + V: Clone + serde::Serialize + serde::de::DeserializeOwned + Monoid, +{ + /// Set a key-value pair in the internal mapping + fn set(&mut self, key: K, value: V) { + self.data.insert(key.clone(), value); + self.deleted_keys.remove(&key); + self.timestamps + .entry(key) + .and_modify(|timestamp| { + timestamp.0 = SystemTime::now(); + }) + .or_insert((SystemTime::now(), SystemTime::now())); + } + + /// Remove an entry from the internal mapping given a key + fn remove(&mut self, id: &K) -> Result<()> { + match self.data.remove(id) { + Some(_) => { + self.deleted_keys.insert(id.clone()); + Ok(()) + } + None => Err(ArklibError::Storage( + self.label.clone(), + "Key not found".to_owned(), + )), + } + } + + /// Determines the synchronization status between RAM and disk. + /// Compares modification timestamps of files in RAM and on disk, + /// checking for newer versions in path. + fn sync_status(&mut self) -> Result { + let mut ram_newer = !self.deleted_keys.is_empty(); + let mut disk_newer = false; + + for key in self.data.keys() { + let file_path = self.path.join(format!("{}.json", key)); + let ram_timestamp = self + .timestamps + .get(key) + .map(|(ram_time, _)| ram_time) + .expect("Data entry key should have a RAM timestamp"); + + if let Ok(metadata) = fs::metadata(&file_path) { + if let Ok(disk_timestamp) = metadata.modified() { + match ram_timestamp.cmp(&disk_timestamp) { + std::cmp::Ordering::Greater => { + ram_newer = true; + log::debug!( + "RAM newer: file {} is newer in RAM", + file_path.display() + ); + } + std::cmp::Ordering::Less => { + disk_newer = true; + log::debug!( + "Disk newer: file {} is newer on disk, ram: {}, disk: {}", + file_path.display(), + ram_timestamp.elapsed().unwrap().as_secs(), + disk_timestamp.elapsed().unwrap().as_secs() + ); + } + std::cmp::Ordering::Equal => {} + } + } else { + // If we can't read the disk timestamp, assume RAM is newer + ram_newer = true; + log::debug!( + "RAM newer: couldn't read disk timestamp for {}", + file_path.display() + ); + } + } else { + // If the file doesn't exist on disk, RAM is newer + ram_newer = true; + log::debug!( + "RAM newer: file {} doesn't exist on disk", + file_path.display() + ); + } + + // If we've found both RAM and disk modifications, we can stop checking + if ram_newer && disk_newer { + log::debug!( + "Both RAM and disk modifications found, stopping check" + ); + break; + } + } + + // Skip this check if this divergent condition has already been reached + if !(ram_newer && disk_newer) { + // Check for files on disk that aren't in RAM + for entry in fs::read_dir(&self.path)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() + && path + .extension() + .map_or(false, |ext| ext == "json") + { + let key = extract_key_from_file_path(&self.label, &path)?; + if !self.data.contains_key(&key) + && !self.deleted_keys.contains(&key) + { + disk_newer = true; + break; + } + } + } + } + + let new_status = match (ram_newer, disk_newer) { + (false, false) => SyncStatus::InSync, + (true, false) => SyncStatus::StorageStale, + (false, true) => SyncStatus::MappingStale, + (true, true) => SyncStatus::Diverge, + }; + + log::info!("{} sync status is {}", self.label, new_status); + Ok(new_status) + } + + /// Sync the in-memory storage with the storage on disk + fn sync(&mut self) -> Result<()> { + match self.sync_status()? { + SyncStatus::InSync => { + log::info!( + "Memory is synchronized with the storage, {}", + self.label + ); + } + SyncStatus::MappingStale => { + self.read_fs()?; + } + SyncStatus::StorageStale => { + self.write_fs()?; + } + SyncStatus::Diverge => { + self.resolve_divergence()?; + self.write_fs()?; + } + }; + + self.deleted_keys.clear(); + Ok(()) + } + + /// Read the data from folder storage + fn read_fs(&mut self) -> Result<&BTreeMap> { + self.load_fs_data()?; + Ok(&self.data) + } + + /// Get a value from the internal mapping + fn get(&self, id: &K) -> Option<&V> { + self.data.get(id) + } + + /// Writes the data to a folder. + /// + /// Updates the file's modified timestamp to avoid OS timing issues, which may arise due to file system timestamp precision. + /// EXT3 has 1-second precision, while EXT4 can be more precise but not always. + /// This is addressed by modifying the metadata and calling `sync_all()` after file writes. + fn write_fs(&mut self) -> Result<()> { + fs::create_dir_all(&self.path)?; + + for (key, value) in &self.data { + let file_path = self.path.join(format!("{}.json", key)); + let mut file = File::create(&file_path)?; + file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; + file.flush()?; + + let new_timestamp = SystemTime::now(); + file.set_modified(new_timestamp)?; + file.sync_all()?; + + self.timestamps + .insert(key.clone(), (new_timestamp, new_timestamp)); + } + + // Delete files for previously deleted keys + self.deleted_keys.iter().for_each(|key| { + log::debug!("Deleting key: {}", key); + self.data.remove(key); + self.timestamps.remove(key); + let file_path = self.path.join(format!("{}.json", key)); + if file_path.exists() { + fs::remove_file(&file_path).expect("Failed to delete file"); + } + }); + self.deleted_keys.clear(); + + // Remove files for keys that no longer exist + self.remove_files_not_in_ram()?; + + log::info!( + "{} {} entries have been written", + self.label, + self.data.len() + ); + Ok(()) + } + + /// Erase the folder from disk + fn erase(&self) -> Result<()> { + fs::remove_dir_all(&self.path).map_err(|err| { + ArklibError::Storage(self.label.clone(), err.to_string()) + }) + } + + /// Merge the data from another folder storage instance into this folder storage instance + fn merge_from(&mut self, other: impl AsRef>) -> Result<()> + where + V: Monoid, + { + let other_entries = other.as_ref(); + for (key, value) in other_entries { + if let Some(existing_value) = self.data.get(key) { + let resolved_value = V::combine(existing_value, value); + self.set(key.clone(), resolved_value); + } else { + self.set(key.clone(), value.clone()) + } + self.timestamps + .entry(key.clone()) + .and_modify(|timestamp| { + timestamp.0 = SystemTime::now(); + }) + .or_insert((SystemTime::now(), SystemTime::now())); + } + Ok(()) + } +} + +fn extract_key_from_file_path(label: &str, path: &Path) -> Result +where + K: std::str::FromStr, +{ + path.file_stem() + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to extract file stem from filename".to_owned(), + ) + })? + .to_str() + .ok_or_else(|| { + ArklibError::Storage( + label.to_owned(), + "Failed to convert file stem to string".to_owned(), + ) + })? + .parse::() + .map_err(|_| { + ArklibError::Storage( + label.to_owned(), + "Failed to parse key from filename".to_owned(), + ) + }) +} + +#[cfg(test)] +mod tests { + use crate::{ + base_storage::{BaseStorage, SyncStatus}, + folder_storage::FolderStorage, + monoid::Monoid, + }; + use std::{ + collections::BTreeSet, + fs::{self, File}, + io::Write, + thread, + time::{Duration, SystemTime}, + }; + + use data_error::Result; + use quickcheck_macros::quickcheck; + use serde::{Deserialize, Serialize}; + use tempdir::TempDir; + + #[test] + fn test_folder_storage_write_read() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + + storage.set("key1".to_owned(), "value1".to_string()); + storage.set("key2".to_owned(), "value2".to_string()); + + assert!(storage.remove(&"key1".to_string()).is_ok()); + storage + .write_fs() + .expect("Failed to write data to disk"); + + let data_read = storage + .read_fs() + .expect("Failed to read data from disk"); + assert_eq!(data_read.len(), 1); + assert_eq!(data_read.get("key2"), Some(&"value2".to_string())); + } + + #[test] + fn test_folder_storage_auto_delete() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + + storage.set("key1".to_string(), "value1".to_string()); + storage.set("key1".to_string(), "value2".to_string()); + assert!(storage.write_fs().is_ok()); + assert_eq!(temp_dir.path().exists(), true); + + if let Err(err) = storage.erase() { + panic!("Failed to delete folder: {:?}", err); + } + assert!(!temp_dir.path().exists()); + } + + #[test] + fn test_folder_metadata_timestamp_updated() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + storage.write_fs().unwrap(); + + storage.set("key1".to_string(), "value1".to_string()); + let before_write = fs::metadata(&temp_dir.path()) + .unwrap() + .modified() + .unwrap(); + thread::sleep(Duration::from_millis(10)); + storage.write_fs().unwrap(); + let after_write = fs::metadata(&temp_dir.path()) + .unwrap() + .modified() + .unwrap(); + println!( + "before_write: {:?}, after_write: {:?}", + before_write, after_write + ); + assert!(before_write < after_write); + } + + #[test] + fn test_folder_storage_is_storage_updated() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + storage.write_fs().unwrap(); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::InSync); + + storage.set("key1".to_string(), "value1".to_string()); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::StorageStale); + storage.write_fs().unwrap(); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::InSync); + + // External data manipulation + let mut mirror_storage = FolderStorage::new( + "MirrorTestStorage".to_string(), + temp_dir.path(), + ) + .unwrap(); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); + + mirror_storage.set("key1".to_string(), "value3".to_string()); + assert_eq!( + mirror_storage.sync_status().unwrap(), + SyncStatus::StorageStale + ); + + mirror_storage.write_fs().unwrap(); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); + + // receive updates from external data manipulation + assert_eq!(storage.sync_status().unwrap(), SyncStatus::MappingStale); + storage.read_fs().unwrap(); + assert_eq!(storage.sync_status().unwrap(), SyncStatus::InSync); + assert_eq!(mirror_storage.sync_status().unwrap(), SyncStatus::InSync); + } + + #[test] + fn test_monoid_combine() { + let temp_dir = + TempDir::new("tmp").expect("Failed to create temporary directory"); + let mut storage1 = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + let mut storage2 = + FolderStorage::new("test".to_owned(), temp_dir.path()).unwrap(); + + storage1.set("key1".to_string(), 2); + storage1.set("key2".to_string(), 6); + + storage2.set("key1".to_string(), 3); + storage2.set("key3".to_string(), 9); + + storage1.merge_from(&storage2).unwrap(); + assert_eq!(storage1.as_ref().get("key1"), Some(&3)); + assert_eq!(storage1.as_ref().get("key2"), Some(&6)); + assert_eq!(storage1.as_ref().get("key3"), Some(&9)); + } + + use quickcheck::{Arbitrary, Gen}; + use std::collections::{BTreeMap, HashSet}; + + // Assuming FolderStorage, BaseStorage, SyncStatus, and other necessary types are in scope + + #[derive(Clone, Debug)] + enum StorageOperation { + Set(String), + Remove(String), + Sync, + ExternalModify(String), + ExternalSet(String), + } + + #[derive(Clone, Debug)] + struct StorageOperationSequence(Vec); + + impl Arbitrary for StorageOperationSequence { + fn arbitrary(g: &mut Gen) -> Self { + let mut existing_keys = HashSet::new(); + let mut ops = Vec::new(); + let size = usize::arbitrary(g) % 100 + 1; // Generate 1 to 100 operations + + for _ in 0..size { + let op = match u8::arbitrary(g) % 9 { + 0 | 1 | 2 | 3 | 4 => { + let key = u8::arbitrary(g).to_string(); + existing_keys.insert(key.clone()); + StorageOperation::Set(key) + } + 5 if !existing_keys.is_empty() => { + let key = g + .choose( + &existing_keys + .iter() + .cloned() + .collect::>(), + ) + .unwrap() + .clone(); + existing_keys.remove(&key); + StorageOperation::Remove(key) + } + 6 => StorageOperation::Sync, + 7 if !existing_keys.is_empty() => { + let key = g + .choose( + &existing_keys + .iter() + .cloned() + .collect::>(), + ) + .unwrap() + .clone(); + StorageOperation::ExternalModify(key) + } + _ => { + let key = u8::arbitrary(g).to_string(); + StorageOperation::ExternalSet(key) + } + }; + ops.push(op); + } + + StorageOperationSequence(ops) + } + } + + #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] + struct Dummy; + + impl Monoid for Dummy { + fn neutral() -> Dummy { + Dummy + } + + fn combine(_a: &Dummy, _b: &Dummy) -> Dummy { + Dummy + } + } + + // #[test_log::test] + #[quickcheck] + fn prop_folder_storage_correct( + StorageOperationSequence(operations): StorageOperationSequence, + ) { + let temp_dir = + TempDir::new("temp").expect("Failed to create temporary directory"); + let path = temp_dir.path(); + + let mut storage = + FolderStorage::::new("test".to_string(), &path) + .unwrap(); + let mut expected_data: BTreeMap = BTreeMap::new(); + let mut pending_deletes = BTreeSet::new(); + let mut pending_sets: BTreeMap = BTreeMap::new(); + let mut pending_external: BTreeMap = BTreeMap::new(); + + // Check initial state + assert_eq!( + storage.sync_status().unwrap(), + SyncStatus::InSync, + "Storage should be InSync when created" + ); + + let v = Dummy; + for (i, op) in operations.into_iter().enumerate() { + match op { + StorageOperation::Set(k) => { + storage.set(k.clone(), v); + pending_sets.insert(k.clone(), i); + pending_deletes.remove(&k); + + let status = storage.sync_status().unwrap(); + let expected_status = expected_status( + &pending_external, + &pending_sets, + &pending_deletes, + ); + assert_eq!(status, expected_status); + } + StorageOperation::Remove(k) => { + storage.remove(&k).unwrap(); + pending_sets.remove(&k); + pending_deletes.insert(k.clone()); + + let status = storage.sync_status().unwrap(); + let expected_status = expected_status( + &pending_external, + &pending_sets, + &pending_deletes, + ); + assert_eq!(status, expected_status); + } + StorageOperation::Sync => { + storage.sync().unwrap(); + + // Note: Concurrent deletes are overriden by sets + // Hence, deletes are weak. Also, for values where + // monoidal combination is relevant, this logic will + // have to be updated. + pending_sets + .keys() + .chain(pending_external.keys()) + .for_each(|k| { + expected_data.insert(k.clone(), v); + }); + pending_deletes.iter().for_each(|key| { + expected_data.remove(key); + }); + + pending_sets.clear(); + pending_external.clear(); + pending_deletes.clear(); + + let status = storage.sync_status().unwrap(); + assert_eq!(status, SyncStatus::InSync); + assert_eq!(storage.data, expected_data); + } + StorageOperation::ExternalModify(k) + | StorageOperation::ExternalSet(k) => { + perform_external_modification(path, &k, v).unwrap(); + pending_external.insert(k.clone(), i); + let status = storage.sync_status().unwrap(); + let expected_status = expected_status( + &pending_external, + &pending_sets, + &pending_deletes, + ); + assert_eq!(status, expected_status); + } + } + + assert!( + pending_sets + .keys() + .filter(|key| pending_deletes.contains(*key)) + .count() + == 0 + ); + } + } + + fn perform_external_modification( + path: &std::path::Path, + key: &str, + value: Dummy, + ) -> Result<()> { + let mut file = File::create(path.join(format!("{}.json", key)))?; + file.write_all(serde_json::to_string_pretty(&value)?.as_bytes())?; + file.flush()?; + let time = SystemTime::now(); + file.set_modified(time).unwrap(); + file.sync_all()?; + Ok(()) + } + + fn expected_status( + pending_external: &BTreeMap, + pending_sets: &BTreeMap, + pending_deletes: &BTreeSet, + ) -> SyncStatus { + let ram_newer = !pending_deletes.is_empty(); + let ram_newer = ram_newer + || pending_sets + .iter() + .any(|(k, v)| pending_external.get(k).map_or(true, |e| v > e)); + let disk_newer = pending_external + .iter() + .filter(|(k, _)| !pending_deletes.contains(*k)) + .any(|(k, v)| pending_sets.get(k).map_or(true, |s| v > s)); + + match (ram_newer, disk_newer) { + (false, false) => SyncStatus::InSync, + (true, false) => SyncStatus::StorageStale, + (false, true) => SyncStatus::MappingStale, + (true, true) => SyncStatus::Diverge, + } + } +} diff --git a/fs-storage/src/lib.rs b/fs-storage/src/lib.rs index bc1442b7..83e3868c 100644 --- a/fs-storage/src/lib.rs +++ b/fs-storage/src/lib.rs @@ -1,6 +1,7 @@ pub mod base_storage; pub mod btreemap_iter; pub mod file_storage; +pub mod folder_storage; #[cfg(feature = "jni-bindings")] pub mod jni; pub mod monoid;