diff --git a/sources/api/datastore/src/error.rs b/sources/api/datastore/src/error.rs index d1c7207ae1a..86d386b52ad 100644 --- a/sources/api/datastore/src/error.rs +++ b/sources/api/datastore/src/error.rs @@ -59,6 +59,12 @@ pub enum Error { #[snafu(display("Key name beyond maximum length {}: {}", name, max))] KeyTooLong { name: String, max: usize }, + + #[snafu(display("Unable to serialize data: {}", source))] + Serialize { source: serde_json::Error }, + + #[snafu(display("Unable to de-serialize data: {}", source))] + DeSerialize { source: serde_json::Error }, } pub type Result = std::result::Result; diff --git a/sources/api/datastore/src/filesystem.rs b/sources/api/datastore/src/filesystem.rs index bf89eed783f..99bba948e5c 100644 --- a/sources/api/datastore/src/filesystem.rs +++ b/sources/api/datastore/src/filesystem.rs @@ -4,7 +4,7 @@ //! Data is kept in files with paths resembling the keys, e.g. a/b/c for a.b.c, and metadata is //! kept in a suffixed file next to the data, e.g. a/b/c.meta for metadata "meta" about a.b.c -use log::{debug, error, trace}; +use log::{debug, error, trace, warn}; use percent_encoding::{percent_decode_str, utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC}; use snafu::{ensure, OptionExt, ResultExt}; use std::collections::{HashMap, HashSet}; @@ -13,6 +13,8 @@ use std::io; use std::path::{self, Path, PathBuf}; use walkdir::{DirEntry, WalkDir}; +use crate::{deserialize_scalar, serialize_scalar, ScalarError}; + use super::key::{Key, KeyType}; use super::{error, Committed, DataStore, Result}; @@ -413,6 +415,7 @@ impl DataStore for FilesystemDataStore { fn list_populated_metadata( &self, prefix: S1, + committed: &Committed, metadata_key_name: &Option, ) -> Result>> where @@ -420,7 +423,7 @@ impl DataStore for FilesystemDataStore { S2: AsRef, { // Find metadata key paths on disk - let key_paths = find_populated_key_paths(self, KeyType::Meta, prefix, &Committed::Live)?; + let key_paths = find_populated_key_paths(self, KeyType::Meta, prefix, committed)?; // For each file on disk, check the user's conditions, and add it to our output let mut result = HashMap::new(); @@ -460,8 +463,13 @@ impl DataStore for FilesystemDataStore { self.delete_key_path(path, committed) } - fn get_metadata_raw(&self, metadata_key: &Key, data_key: &Key) -> Result> { - let path = self.metadata_path(metadata_key, data_key, &Committed::Live)?; + fn get_metadata_raw( + &self, + metadata_key: &Key, + data_key: &Key, + committed: &Committed, + ) -> Result> { + let path = self.metadata_path(metadata_key, data_key, committed)?; read_file_for_key(metadata_key, &path) } @@ -470,8 +478,9 @@ impl DataStore for FilesystemDataStore { metadata_key: &Key, data_key: &Key, value: S, + committed: &Committed, ) -> Result<()> { - let path = self.metadata_path(metadata_key, data_key, &Committed::Live)?; + let path = self.metadata_path(metadata_key, data_key, committed)?; write_file_mkdir(path, value) } @@ -489,6 +498,95 @@ impl DataStore for FilesystemDataStore { let pending = Committed::Pending { tx: transaction.into(), }; + + // We will first commit metadata to correctly check that if a setting exist and + // its strength is strong, we will not change it to weak. + // Get metadata from pending transaction + let transaction_metadata = + self.get_metadata_prefix("settings.", &pending, &None as &Option<&str>)?; + + trace!( + "commit_transaction: transaction_metadata: {:?}", + transaction_metadata + ); + + for (key, value) in transaction_metadata { + for (metadata_key, metadata_value) in value { + // For now we are only processing the strength metadata from pending + // transaction to live + if metadata_key.name() != "strength" { + continue; + } + + // strength in pending transaction + let pending_strength: String = + deserialize_scalar::<_, ScalarError>(&metadata_value.clone()) + .with_context(|_| error::DeSerializeSnafu {})?; + + // Get the setting strength in live + // get_metadata function returns Ok(None) in case strength does not exist + // We will consider this case as strength equals strong. + let committed_strength = + match self.get_metadata(&metadata_key, &key, &Committed::Live) { + Ok(Some(v)) => v, + Ok(None) => "strong".to_string(), + Err(_) => continue, + }; + + // The get key funtion returns Ok(None) in case if the path does not exist + // and error if some path exist and some error occurred in fetching + // Hence we we will return error in case of error + // from get key function and continue to add/change to weak key + // if the value is None. + let value = self.get_key(&key, &Committed::Live)?; + + trace!( + "commit_transaction: key: {:?}, metadata_key: {:?}, metadata_value: {:?}", + key.name(), + metadata_key.name(), + metadata_value + ); + + match (pending_strength.as_str(), committed_strength.as_str()) { + ("weak", "strong") => { + // Do not change from strong to weak if setting exists + // otherwise commit strength metadata with value as "weak" + if value.is_some() { + warn!("Trying to change the strength from strong to weak for key: {}, Operation ignored", key.name()); + continue; + } else { + let met_value = serialize_scalar::<_, ScalarError>(&pending_strength) + .with_context(|_| error::SerializeSnafu {})?; + + self.set_metadata(&metadata_key, &key, met_value, &Committed::Live)?; + } + } + ("strong", "weak") => { + let met_value = serialize_scalar::<_, ScalarError>(&pending_strength) + .with_context(|_| error::SerializeSnafu {})?; + self.set_metadata(&metadata_key, &key, met_value, &Committed::Live)?; + } + ("weak", "weak") => { + trace!("The strength for setting {} is already weak", key.name()); + continue; + } + ("strong", "strong") => { + trace!("The strength for setting {} is already strong", key.name()); + continue; + } + _ => { + warn!( + "The strength for setting {} is not one of weak or strong. Pending strength: {}, Committed strength: {} ", + key.name(), + pending_strength, + committed_strength + ); + continue; + } + }; + } + } + // Get data for changed keys let pending_data = self.get_prefix("settings.", &pending)?; diff --git a/sources/api/datastore/src/lib.rs b/sources/api/datastore/src/lib.rs index 9156a525a19..3d375d35cf4 100644 --- a/sources/api/datastore/src/lib.rs +++ b/sources/api/datastore/src/lib.rs @@ -72,6 +72,7 @@ pub trait DataStore { fn list_populated_metadata( &self, prefix: S1, + committed: &Committed, metadata_key_name: &Option, ) -> Result>> where @@ -89,7 +90,12 @@ pub trait DataStore { /// Retrieve the value for a single metadata key from the datastore. Values will inherit from /// earlier in the tree, if more specific values are not found later. - fn get_metadata(&self, metadata_key: &Key, data_key: &Key) -> Result> { + fn get_metadata( + &self, + metadata_key: &Key, + data_key: &Key, + committed: &Committed, + ) -> Result> { let mut result = Ok(None); let mut current_path = Vec::new(); @@ -101,7 +107,7 @@ pub trait DataStore { unreachable!("Prefix of Key failed to make Key: {:?}", current_path) }); - if let Some(md) = self.get_metadata_raw(metadata_key, &data_key)? { + if let Some(md) = self.get_metadata_raw(metadata_key, &data_key, committed)? { result = Ok(Some(md)); } } @@ -110,13 +116,19 @@ pub trait DataStore { /// Retrieve the value for a single metadata key from the datastore, without taking into /// account inheritance of metadata from earlier in the tree. - fn get_metadata_raw(&self, metadata_key: &Key, data_key: &Key) -> Result>; + fn get_metadata_raw( + &self, + metadata_key: &Key, + data_key: &Key, + committed: &Committed, + ) -> Result>; /// Set the value of a single metadata key in the datastore. fn set_metadata>( &mut self, metadata_key: &Key, data_key: &Key, value: S, + committed: &Committed, ) -> Result<()>; /// Removes the given metadata key from the given data key in the datastore. If we /// succeeded, we return Ok(()); if the data or metadata key didn't exist, we also return @@ -205,13 +217,14 @@ pub trait DataStore { fn get_metadata_prefix( &self, find_prefix: S1, + committed: &Committed, metadata_key_name: &Option, ) -> Result>> where S1: AsRef, S2: AsRef, { - let meta_map = self.list_populated_metadata(&find_prefix, metadata_key_name)?; + let meta_map = self.list_populated_metadata(&find_prefix, committed, metadata_key_name)?; trace!("Found populated metadata: {:?}", meta_map); if meta_map.is_empty() { return Ok(HashMap::new()); @@ -234,12 +247,12 @@ pub trait DataStore { meta_key, &data_key ); - let value = self.get_metadata(&meta_key, &data_key)?.context( - error::ListedMetaNotPresentSnafu { + let value = self + .get_metadata(&meta_key, &data_key, committed)? + .context(error::ListedMetaNotPresentSnafu { meta_key: meta_key.name(), data_key: data_key.name(), - }, - )?; + })?; // Insert a top-level map entry for the data key if we've found metadata. let data_entry = result.entry(data_key.clone()).or_insert_with(HashMap::new); @@ -336,14 +349,20 @@ mod test { let grandchild = Key::new(KeyType::Data, "a.b.c").unwrap(); // Set metadata on parent - m.set_metadata(&meta, &parent, "value").unwrap(); + m.set_metadata(&meta, &parent, "value", &Committed::Live) + .unwrap(); // Metadata shows up on grandchild... assert_eq!( - m.get_metadata(&meta, &grandchild).unwrap(), + m.get_metadata(&meta, &grandchild, &Committed::Live) + .unwrap(), Some("value".to_string()) ); // ...but only through inheritance, not directly. - assert_eq!(m.get_metadata_raw(&meta, &grandchild).unwrap(), None); + assert_eq!( + m.get_metadata_raw(&meta, &grandchild, &Committed::Live) + .unwrap(), + None + ); } #[test] @@ -379,20 +398,22 @@ mod test { let mk1 = Key::new(KeyType::Meta, "metatest1").unwrap(); let mk2 = Key::new(KeyType::Meta, "metatest2").unwrap(); let mk3 = Key::new(KeyType::Meta, "metatest3").unwrap(); - m.set_metadata(&mk1, &k1, "41").unwrap(); - m.set_metadata(&mk2, &k2, "42").unwrap(); - m.set_metadata(&mk3, &k3, "43").unwrap(); + m.set_metadata(&mk1, &k1, "41", &Committed::Live).unwrap(); + m.set_metadata(&mk2, &k2, "42", &Committed::Live).unwrap(); + m.set_metadata(&mk3, &k3, "43", &Committed::Live).unwrap(); // Check all metadata assert_eq!( - m.get_metadata_prefix("x.", &None as &Option<&str>).unwrap(), + m.get_metadata_prefix("x.", &Committed::Live, &None as &Option<&str>) + .unwrap(), hashmap!(k1 => hashmap!(mk1 => "41".to_string()), k2.clone() => hashmap!(mk2.clone() => "42".to_string())) ); // Check metadata matching a given name assert_eq!( - m.get_metadata_prefix("x.", &Some("metatest2")).unwrap(), + m.get_metadata_prefix("x.", &Committed::Live, &Some("metatest2")) + .unwrap(), hashmap!(k2 => hashmap!(mk2 => "42".to_string())) ); } diff --git a/sources/api/datastore/src/memory.rs b/sources/api/datastore/src/memory.rs index 4ffc397912f..812272d85eb 100644 --- a/sources/api/datastore/src/memory.rs +++ b/sources/api/datastore/src/memory.rs @@ -16,6 +16,9 @@ pub struct MemoryDataStore { // Map of data keys to their metadata, which in turn is a mapping of metadata keys to // arbitrary (string/serialized) values. metadata: HashMap>, + // Map of data keys to their metadata, which in turn is a mapping of metadata keys to + // arbitrary (string/serialized) values in pending transaction + pending_metadata: HashMap>, } impl MemoryDataStore { @@ -57,14 +60,20 @@ impl DataStore for MemoryDataStore { fn list_populated_metadata( &self, prefix: S1, + committed: &Committed, metadata_key_name: &Option, ) -> Result>> where S1: AsRef, S2: AsRef, { + let metadata_to_use = match committed { + Committed::Live => &self.metadata, + Committed::Pending { .. } => &self.pending_metadata, + }; + let mut result = HashMap::new(); - for (data_key, meta_map) in self.metadata.iter() { + for (data_key, meta_map) in metadata_to_use.iter() { // Confirm data key matches requested prefix. if !data_key.name().starts_with(prefix.as_ref()) { continue; @@ -112,8 +121,18 @@ impl DataStore for MemoryDataStore { Ok(dataset.contains_key(key)) } - fn get_metadata_raw(&self, metadata_key: &Key, data_key: &Key) -> Result> { - let metadata_for_data = self.metadata.get(data_key); + fn get_metadata_raw( + &self, + metadata_key: &Key, + data_key: &Key, + committed: &Committed, + ) -> Result> { + let metadata_to_use = match committed { + Committed::Live => &self.metadata, + Committed::Pending { .. } => &self.pending_metadata, + }; + + let metadata_for_data = metadata_to_use.get(data_key); // If we have a metadata entry for this data key, then we can try fetching the requested // metadata key, otherwise we'll return early with Ok(None). let result = metadata_for_data.and_then(|m| m.get(metadata_key)); @@ -125,17 +144,14 @@ impl DataStore for MemoryDataStore { metadata_key: &Key, data_key: &Key, value: S, + committed: &Committed, ) -> Result<()> { - // If we don't already have a metadata entry for this data key, insert one. - let metadata_for_data = self - .metadata - // Clone data key because we want the HashMap key type to be Key, not &Key, and we - // can't pass ownership because we only have a reference from our parameters. - .entry(data_key.clone()) - .or_default(); - - metadata_for_data.insert(metadata_key.clone(), value.as_ref().to_owned()); - Ok(()) + match committed { + Committed::Live => set_metadata_raw(&mut self.metadata, metadata_key, data_key, value), + Committed::Pending { .. } => { + set_metadata_raw(&mut self.pending_metadata, metadata_key, data_key, value) + } + } } fn unset_metadata(&mut self, metadata_key: &Key, data_key: &Key) -> Result<()> { @@ -179,6 +195,23 @@ impl DataStore for MemoryDataStore { } } +fn set_metadata_raw>( + metadata_to_use: &mut HashMap>, + metadata_key: &Key, + data_key: &Key, + value: S, +) -> Result<()> { + // If we don't already have a metadata entry for this data key, insert one. + let metadata_for_data = metadata_to_use + // Clone data key because we want the HashMap key type to be Key, not &Key, and we + // can't pass ownership because we only have a reference from our parameters. + .entry(data_key.clone()) + .or_default(); + + metadata_for_data.insert(metadata_key.clone(), value.as_ref().to_owned()); + Ok(()) +} + #[cfg(test)] mod test { use super::super::{Committed, DataStore, Key, KeyType}; @@ -198,14 +231,38 @@ mod test { let mdkey = Key::new(KeyType::Meta, "testmd").unwrap(); let md = "mdval"; - m.set_metadata(&mdkey, &k, md).unwrap(); + m.set_metadata(&mdkey, &k, md, &Committed::Live).unwrap(); assert_eq!( - m.get_metadata_raw(&mdkey, &k).unwrap(), + m.get_metadata_raw(&mdkey, &k, &Committed::Live).unwrap(), + Some(md.to_string()) + ); + + m.set_metadata( + &mdkey, + &k, + md, + &Committed::Pending { + tx: "test".to_owned(), + }, + ) + .unwrap(); + assert_eq!( + m.get_metadata_raw( + &mdkey, + &k, + &Committed::Pending { + tx: "test".to_owned() + } + ) + .unwrap(), Some(md.to_string()) ); m.unset_metadata(&mdkey, &k).unwrap(); - assert_eq!(m.get_metadata_raw(&mdkey, &k).unwrap(), None); + assert_eq!( + m.get_metadata_raw(&mdkey, &k, &Committed::Live).unwrap(), + None + ); m.unset_key(&k, &Committed::Live).unwrap(); assert_eq!(m.get_key(&k, &Committed::Live).unwrap(), None);