Skip to content

Commit

Permalink
fs-storage: Implement simple conflict resolution using Monoids (#26)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Tarek <[email protected]>
Signed-off-by: Pushkar Mishra <[email protected]>
Co-authored-by: Kirill Taran <[email protected]>
Co-authored-by: Tarek <[email protected]>
  • Loading branch information
3 people authored May 25, 2024
1 parent 2ce5ee7 commit 07cbdd0
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 44 deletions.
18 changes: 14 additions & 4 deletions fs-storage/src/base_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,16 @@ pub trait BaseStorage<K, V>: AsRef<BTreeMap<K, V>> {
/// Remove an entry from the internal mapping.
fn remove(&mut self, id: &K) -> Result<()>;

/// Check if the storage is up-to-date,
/// i.e. that the internal mapping is consistent
/// with the data in the filesystem.
fn is_storage_updated(&self) -> Result<bool>;
/// Determine if in-memory model
/// or the underlying storage requires syncing.
/// This is a quick method checking timestamps
/// of modification of both model and storage.
///
/// Returns:
/// - `Ok(true)` if the on-disk data and in-memory data are not in sync.
/// - `Ok(false)` if the on-disk data and in-memory data are in sync.
/// - `Err(ArklibError::Storage)` in case of any error retrieving the file metadata.
fn needs_syncing(&self) -> Result<bool>;

/// Scan and load the key-value mapping
/// from pre-configured location in the filesystem.
Expand All @@ -24,4 +30,8 @@ pub trait BaseStorage<K, V>: AsRef<BTreeMap<K, V>> {
/// Remove all persisted data
/// by pre-configured location in the file-system.
fn erase(&self) -> Result<()>;

/// Merge two storages instances
/// and write the result to the filesystem.
fn merge_from(&mut self, other: impl AsRef<BTreeMap<K, V>>) -> Result<()>;
}
140 changes: 100 additions & 40 deletions fs-storage/src/file_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use crate::base_storage::BaseStorage;
use crate::monoid::Monoid;
use crate::utils::read_version_2_fs;
use data_error::{ArklibError, Result};

Expand All @@ -30,7 +31,7 @@ where
{
label: String,
path: PathBuf,
timestamp: SystemTime,
modified: SystemTime,
data: FileStorageData<K, V>,
}

Expand All @@ -57,26 +58,20 @@ where
V: Clone
+ serde::Serialize
+ serde::de::DeserializeOwned
+ std::str::FromStr,
+ std::str::FromStr
+ Monoid<V>,
{
/// Create a new file storage with a diagnostic label and file path
pub fn new(label: String, path: &Path) -> Self {
let mut file_storage = Self {
Self {
label,
path: PathBuf::from(path),
timestamp: SystemTime::now(),
modified: SystemTime::now(),
data: FileStorageData {
version: STORAGE_VERSION,
entries: BTreeMap::new(),
},
};

// Load the data from the file
file_storage.data.entries = match file_storage.read_fs() {
Ok(data) => data,
Err(_) => BTreeMap::new(),
};
file_storage
}
}
}

Expand All @@ -90,41 +85,48 @@ where
V: Clone
+ serde::Serialize
+ serde::de::DeserializeOwned
+ std::str::FromStr,
+ std::str::FromStr
+ Monoid<V>,
{
/// Set a key-value pair in the storage
fn set(&mut self, key: K, value: V) {
self.data.entries.insert(key, value);
self.timestamp = std::time::SystemTime::now();
self.write_fs()
.expect("Failed to write data to disk");
self.modified = std::time::SystemTime::now();
}

/// Remove a key-value pair from the storage given a key
fn remove(&mut self, id: &K) -> Result<()> {
self.data.entries.remove(id).ok_or_else(|| {
ArklibError::Storage(self.label.clone(), "Key not found".to_owned())
})?;
self.timestamp = std::time::SystemTime::now();
self.modified = std::time::SystemTime::now();
self.write_fs()
.expect("Failed to remove data from disk");
Ok(())
}

/// Compare the timestamp of the storage file with the timestamp of the storage instance
/// to determine if the storage file has been updated.
fn is_storage_updated(&self) -> Result<bool> {
let file_timestamp = fs::metadata(&self.path)?.modified()?;
let file_time_secs = file_timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
let self_time_secs = self
.timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
Ok(file_time_secs > self_time_secs)
/// Compare the timestamp of the storage file
/// with the timestamp of the in-memory storage update
/// to determine if either of the two requires syncing.
fn needs_syncing(&self) -> Result<bool> {
match fs::metadata(&self.path) {
Ok(metadata) => {
let get_duration_since_epoch = |time: SystemTime| {
time.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
};

let fs_modified =
get_duration_since_epoch(metadata.modified()?);
let self_modified = get_duration_since_epoch(self.modified);

Ok(fs_modified != self_modified)
}
Err(e) => {
Err(ArklibError::Storage(self.label.clone(), e.to_string()))
}
}
}

/// Read the data from the storage file
Expand All @@ -146,7 +148,7 @@ where
"Version 2 storage format detected for {}",
self.label
);
self.timestamp = fs::metadata(&self.path)?.modified()?;
self.modified = fs::metadata(&self.path)?.modified()?;
return Ok(data);
}
Err(_) => {
Expand Down Expand Up @@ -174,7 +176,7 @@ where
),
));
}
self.timestamp = fs::metadata(&self.path)?.modified()?;
self.modified = fs::metadata(&self.path)?.modified()?;

Ok(data.entries)
}
Expand All @@ -194,10 +196,10 @@ where
writer.write_all(value_data.as_bytes())?;

let new_timestamp = fs::metadata(&self.path)?.modified()?;
if new_timestamp == self.timestamp {
if new_timestamp == self.modified {
return Err("Timestamp has not been updated".into());
}
self.timestamp = new_timestamp;
self.modified = new_timestamp;

log::info!(
"{} {} entries have been written",
Expand All @@ -213,6 +215,24 @@ where
ArklibError::Storage(self.label.clone(), err.to_string())
})
}

/// Merge the data from another storage instance into this storage instance
fn merge_from(&mut self, other: impl AsRef<BTreeMap<K, V>>) -> Result<()>
where
V: Monoid<V>,
{
let other_entries = other.as_ref();
for (key, value) in other_entries {
if let Some(existing_value) = self.data.entries.get(key) {
let resolved_value = V::combine(existing_value, value);
self.set(key.clone(), resolved_value);
} else {
self.set(key.clone(), value.clone())
}
}
self.modified = std::time::SystemTime::now();
Ok(())
}
}

impl<K, V> AsRef<BTreeMap<K, V>> for FileStorage<K, V>
Expand Down Expand Up @@ -263,7 +283,7 @@ mod tests {

file_storage.set("key1".to_string(), "value1".to_string());
file_storage.set("key1".to_string(), "value2".to_string());

assert!(file_storage.write_fs().is_ok());
assert_eq!(storage_path.exists(), true);

if let Err(err) = file_storage.erase() {
Expand All @@ -280,19 +300,59 @@ mod tests {

let mut file_storage =
FileStorage::new("TestStorage".to_string(), &storage_path);

file_storage.write_fs().unwrap();
assert_eq!(file_storage.needs_syncing().unwrap(), false);
std::thread::sleep(std::time::Duration::from_secs(1));
file_storage.set("key1".to_string(), "value1".to_string());
assert_eq!(file_storage.is_storage_updated().unwrap(), false);
assert_eq!(file_storage.needs_syncing().unwrap(), true);
file_storage.write_fs().unwrap();
assert_eq!(file_storage.needs_syncing().unwrap(), false);

std::thread::sleep(std::time::Duration::from_secs(1));

// External data manipulation
let mut mirror_storage =
FileStorage::new("TestStorage".to_string(), &storage_path);
assert_eq!(mirror_storage.needs_syncing().unwrap(), true);
std::thread::sleep(std::time::Duration::from_secs(1));
mirror_storage.read_fs().unwrap();
assert_eq!(mirror_storage.needs_syncing().unwrap(), false);

mirror_storage.set("key1".to_string(), "value3".to_string());
assert_eq!(mirror_storage.is_storage_updated().unwrap(), false);
assert_eq!(mirror_storage.needs_syncing().unwrap(), true);
mirror_storage.write_fs().unwrap();
assert_eq!(mirror_storage.needs_syncing().unwrap(), false);

assert_eq!(file_storage.needs_syncing().unwrap(), true);
file_storage.read_fs().unwrap();
assert_eq!(file_storage.needs_syncing().unwrap(), false);
assert_eq!(mirror_storage.needs_syncing().unwrap(), false);
}

#[test]
fn test_monoid_combine() {
let temp_dir =
TempDir::new("tmp").expect("Failed to create temporary directory");
let storage_path1 = temp_dir.path().join("teststorage1.txt");
let storage_path2 = temp_dir.path().join("teststorage2.txt");

let mut file_storage_1 =
FileStorage::new("TestStorage1".to_string(), &storage_path1);

let mut file_storage_2 =
FileStorage::new("TestStorage2".to_string(), &storage_path2);

file_storage_1.set("key1".to_string(), 2);
file_storage_1.set("key2".to_string(), 6);

file_storage_2.set("key1".to_string(), 3);
file_storage_2.set("key3".to_string(), 9);

assert_eq!(file_storage.is_storage_updated().unwrap(), true);
file_storage_1
.merge_from(&file_storage_2)
.unwrap();
assert_eq!(file_storage_1.as_ref().get("key1"), Some(&3));
assert_eq!(file_storage_1.as_ref().get("key2"), Some(&6));
assert_eq!(file_storage_1.as_ref().get("key3"), Some(&9));
}
}
1 change: 1 addition & 0 deletions fs-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod base_storage;
pub mod file_storage;
pub mod monoid;
mod utils;
pub const ARK_FOLDER: &str = ".ark";

Expand Down
48 changes: 48 additions & 0 deletions fs-storage/src/monoid.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Currently, we have three structures: Tags (HashSet), Properties (HashSet), Score (int).
// In fact, HashSet already implements a union function,
// so only a special function for integers is needed.
// CRDTs can be considered later when we need to add structures that require
// more powerful combine semantics.

// Trait defining a Monoid, which represents a mathematical structure with an identity element and an associative binary operation.
pub trait Monoid<V> {
// Returns the neutral element of the monoid.
fn neutral() -> V;

// Combines two elements of the monoid into a single element.
fn combine(a: &V, b: &V) -> V;

// Combines multiple elements of the monoid into a single element.
// Default implementation uses `neutral()` as the initial accumulator and `combine()` for folding.
fn combine_all<I: IntoIterator<Item = V>>(values: I) -> V {
values
.into_iter()
.fold(Self::neutral(), |acc, val| Self::combine(&acc, &val))
}
}

impl Monoid<i32> for i32 {
fn neutral() -> i32 {
0
}

fn combine(a: &i32, b: &i32) -> i32 {
if a > b {
*a
} else {
*b
}
}
}

impl Monoid<String> for String {
fn neutral() -> String {
String::new()
}

fn combine(a: &String, b: &String) -> String {
let mut result = a.clone();
result.push_str(b);
result
}
}

0 comments on commit 07cbdd0

Please sign in to comment.