From ed6f38f47641c30f62bdc02cfe6ddf90f7958ffc Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Mon, 30 Dec 2024 21:03:19 +0200 Subject: [PATCH] Allow concurrent DB background operations for SSDs Full compactions can be done much faster when running with `db_parallelism=8`. --- internal/config_specification.toml | 6 ++++ src/config.rs | 2 ++ src/db.rs | 50 ++++++++++++++++++------------ src/tracker.rs | 1 + 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/internal/config_specification.toml b/internal/config_specification.toml index c0bc8e009..b21845e13 100644 --- a/internal/config_specification.toml +++ b/internal/config_specification.toml @@ -34,6 +34,12 @@ name = "db_log_dir" type = "std::path::PathBuf" doc = "Directory to store index database internal log (default: same as specified by `db_dir`)" +[[param]] +name = "db_parallelism" +type = "u8" +doc = "Max threads to use for DB background operations (flushes and compactions)" +default = "1" + [[param]] name = "daemon_dir" type = "std::path::PathBuf" diff --git a/src/config.rs b/src/config.rs index a553d1064..03b9e3086 100644 --- a/src/config.rs +++ b/src/config.rs @@ -127,6 +127,7 @@ pub struct Config { pub network: Network, pub db_path: PathBuf, pub db_log_dir: Option, + pub db_parallelism: u8, pub daemon_auth: SensitiveAuth, pub daemon_rpc_addr: SocketAddr, pub daemon_p2p_addr: SocketAddr, @@ -354,6 +355,7 @@ impl Config { network: config.network, db_path: config.db_dir, db_log_dir: config.db_log_dir, + db_parallelism: config.db_parallelism, daemon_auth, daemon_rpc_addr, daemon_p2p_addr, diff --git a/src/db.rs b/src/db.rs index 414504c5c..ec5492e4b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -95,11 +95,14 @@ impl Default for Config { } } -fn default_opts() -> rocksdb::Options { +fn default_opts(parallelism: u8) -> rocksdb::Options { let mut block_opts = rocksdb::BlockBasedOptions::default(); block_opts.set_checksum_type(rocksdb::ChecksumType::CRC32c); let mut opts = rocksdb::Options::default(); + opts.increase_parallelism(parallelism.into()); + opts.set_max_subcompactions(parallelism.into()); + opts.set_keep_log_file_num(10); opts.set_max_open_files(16); opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); @@ -114,23 +117,27 @@ fn default_opts() -> rocksdb::Options { } impl DBStore { - fn create_cf_descriptors() -> Vec { + fn create_cf_descriptors(parallelism: u8) -> Vec { COLUMN_FAMILIES .iter() - .map(|&name| rocksdb::ColumnFamilyDescriptor::new(name, default_opts())) + .map(|&name| rocksdb::ColumnFamilyDescriptor::new(name, default_opts(parallelism))) .collect() } - fn open_internal(path: &Path, log_dir: Option<&Path>) -> Result { - let mut db_opts = default_opts(); + fn open_internal(path: &Path, log_dir: Option<&Path>, parallelism: u8) -> Result { + let mut db_opts = default_opts(parallelism); db_opts.create_if_missing(true); db_opts.create_missing_column_families(true); if let Some(d) = log_dir { db_opts.set_db_log_dir(d); } - let db = rocksdb::DB::open_cf_descriptors(&db_opts, path, Self::create_cf_descriptors()) - .with_context(|| format!("failed to open DB: {}", path.display()))?; + let db = rocksdb::DB::open_cf_descriptors( + &db_opts, + path, + Self::create_cf_descriptors(parallelism), + ) + .with_context(|| format!("failed to open DB: {}", path.display()))?; let live_files = db.live_files()?; info!( "{:?}: {} SST files, {} GB, {} Grows", @@ -155,8 +162,13 @@ impl DBStore { } /// Opens a new RocksDB at the specified location. - pub fn open(path: &Path, log_dir: Option<&Path>, auto_reindex: bool) -> Result { - let mut store = Self::open_internal(path, log_dir)?; + pub fn open( + path: &Path, + log_dir: Option<&Path>, + auto_reindex: bool, + parallelism: u8, + ) -> Result { + let mut store = Self::open_internal(path, log_dir, parallelism)?; let config = store.get_config(); debug!("DB {:?}", config); let mut config = config.unwrap_or_default(); // use default config when DB is empty @@ -182,13 +194,13 @@ impl DBStore { ); // close DB before deletion drop(store); - rocksdb::DB::destroy(&default_opts(), path).with_context(|| { + rocksdb::DB::destroy(&default_opts(parallelism), path).with_context(|| { format!( "re-index required but the old database ({}) can not be deleted", path.display() ) })?; - store = Self::open_internal(path, log_dir)?; + store = Self::open_internal(path, log_dir, parallelism)?; config = Config::default(); // re-init config after dropping DB } if config.compacted { @@ -432,13 +444,13 @@ mod tests { fn test_reindex_new_format() { let dir = tempfile::tempdir().unwrap(); { - let store = DBStore::open(dir.path(), None, false).unwrap(); + let store = DBStore::open(dir.path(), None, false, 1).unwrap(); let mut config = store.get_config().unwrap(); config.format += 1; store.set_config(config); }; assert_eq!( - DBStore::open(dir.path(), None, false) + DBStore::open(dir.path(), None, false, 1) .err() .unwrap() .to_string(), @@ -449,7 +461,7 @@ mod tests { ) ); { - let store = DBStore::open(dir.path(), None, true).unwrap(); + let store = DBStore::open(dir.path(), None, true, 1).unwrap(); store.flush(); let config = store.get_config().unwrap(); assert_eq!(config.format, CURRENT_FORMAT); @@ -467,14 +479,14 @@ mod tests { db.put(b"F", b"").unwrap(); // insert legacy DB compaction marker (in 'default' column family) }; assert_eq!( - DBStore::open(dir.path(), None, false) + DBStore::open(dir.path(), None, false, 1) .err() .unwrap() .to_string(), format!("re-index required due to legacy format",) ); { - let store = DBStore::open(dir.path(), None, true).unwrap(); + let store = DBStore::open(dir.path(), None, true, 1).unwrap(); store.flush(); let config = store.get_config().unwrap(); assert_eq!(config.format, CURRENT_FORMAT); @@ -484,7 +496,7 @@ mod tests { #[test] fn test_db_prefix_scan() { let dir = tempfile::tempdir().unwrap(); - let store = DBStore::open(dir.path(), None, true).unwrap(); + let store = DBStore::open(dir.path(), None, true, 1).unwrap(); let items = [ *b"ab ", @@ -509,7 +521,7 @@ mod tests { #[test] fn test_db_log_in_same_dir() { let dir1 = tempfile::tempdir().unwrap(); - let _store = DBStore::open(dir1.path(), None, true).unwrap(); + let _store = DBStore::open(dir1.path(), None, true, 1).unwrap(); // LOG file is created in dir1 let dir_files = list_log_files(dir1.path()); @@ -517,7 +529,7 @@ mod tests { let dir2 = tempfile::tempdir().unwrap(); let dir3 = tempfile::tempdir().unwrap(); - let _store = DBStore::open(dir2.path(), Some(dir3.path()), true).unwrap(); + let _store = DBStore::open(dir2.path(), Some(dir3.path()), true, 1).unwrap(); // *_LOG file is not created in dir2, but in dir3 let dir_files = list_log_files(dir2.path()); diff --git a/src/tracker.rs b/src/tracker.rs index 1b07a8f0a..2abcf9d02 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -36,6 +36,7 @@ impl Tracker { &config.db_path, config.db_log_dir.as_deref(), config.auto_reindex, + config.db_parallelism, )?; let chain = Chain::new(config.network); Ok(Self {