Skip to content

Commit

Permalink
fix(interactive): Fix secondary catchup error by reopening secondary (#…
Browse files Browse the repository at this point in the history
…3567)

Also support calling compact and reopen from groot-client
  • Loading branch information
siyuan0322 authored Feb 21, 2024
1 parent 98420d3 commit 5829153
Show file tree
Hide file tree
Showing 42 changed files with 1,025 additions and 591 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ public class StoreConfig {
public static final Config<String> STORE_DATA_PATH =
Config.stringConfig("store.data.path", "./data");

public static final Config<String> STORE_DATA_DOWNLOAD_PATH =
Config.stringConfig("store.data.download.path", "");

public static final Config<Integer> STORE_WRITE_THREAD_COUNT =
Config.intConfig("store.write.thread.count", 1);

Expand All @@ -35,9 +38,15 @@ public class StoreConfig {
public static final Config<Long> STORE_GC_INTERVAL_MS =
Config.longConfig("store.gc.interval.ms", 5000L);

public static final Config<Long> STORE_CATCHUP_INTERVAL_MS =
Config.longConfig("store.catchup.interval.ms", 30000L);

// set by IS_SECONDARY_INSTANCE, used in graph.rs
public static final Config<String> STORE_STORAGE_ENGINE =
Config.stringConfig("store.storage.engine", "rocksdb");
public static final Config<String> STORE_SECONDARY_DATA_PATH =
Config.stringConfig("store.data.secondary.path", "./data_secondary");

public static final Config<String> STORE_WAL_DIR =
Config.stringConfig("store.rocksdb.wal.dir", "");
}
3 changes: 2 additions & 1 deletion interactive_engine/executor/assembly/groot/src/store/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ pub extern "C" fn OpenPartitionGraph(store_path: *const c_char) -> PartitionGrap
let store_path_str = str::from_utf8(slice).unwrap();
let mut config_builder = GraphConfigBuilder::new();
config_builder.set_storage_engine("rocksdb");
config_builder.add_storage_option("store.data.path", store_path_str);
let config = config_builder.build();
let graph_store = Arc::new(GraphStore::open(&config, store_path_str).unwrap());
let graph_store = Arc::new(GraphStore::open(&config).unwrap());
let partition_graph = WrapperPartitionGraph::new(graph_store);
Box::into_raw(Box::new(partition_graph)) as PartitionGraphHandle
}
Expand Down
98 changes: 70 additions & 28 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use std::ffi::CStr;
use std::os::raw::{c_char, c_void};
use std::str;
use std::sync::{Arc, Once};
use std::time::Duration;
use std::{str, thread};

use groot_store::db::api::multi_version_graph::MultiVersionGraph;
use groot_store::db::api::PropertyMap;
Expand Down Expand Up @@ -63,9 +64,6 @@ pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHa
config_builder.set_storage_engine(engine);
config_builder.set_storage_options(proto.get_configs().clone());
let config = config_builder.build();
let path = config
.get_storage_option("store.data.path")
.expect("invalid config, missing store.data.path");
INIT.call_once(|| {
if let Some(config_file) = config.get_storage_option("log4rs.config") {
log4rs::init_file(config_file, Default::default()).expect("init log4rs failed");
Expand All @@ -74,14 +72,14 @@ pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHa
println!("No valid log4rs.config, rust won't print logs");
}
});
let handle = Box::new(GraphStore::open(&config, path).unwrap());
let handle = Box::new(GraphStore::open(&config).unwrap());
let ret = Box::into_raw(handle);
ret as GraphHandle
}

#[no_mangle]
pub extern "C" fn closeGraphStore(handle: GraphHandle) -> bool {
trace!("closeGraphStore");
info!("closeGraphStore");
let graph_store_ptr = handle as *mut GraphStore;
unsafe {
drop(Box::from_raw(graph_store_ptr));
Expand Down Expand Up @@ -134,22 +132,21 @@ pub extern "C" fn writeBatch(
ptr: GraphHandle, snapshot_id: i64, data: *const u8, len: usize,
) -> Box<JnaResponse> {
trace!("writeBatch");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
let buf = ::std::slice::from_raw_parts(data, len);
let ret = match do_write_batch(graph_store_ptr, snapshot_id, buf) {
Ok(has_ddl) => {
let mut response = JnaResponse::new_success();
response.has_ddl(has_ddl);
response
}
Err(e) => {
let err_msg = format!("{:?}", e);
JnaResponse::new_error(&err_msg)
}
};
return ret;
}

let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };
let buf = unsafe { ::std::slice::from_raw_parts(data, len) };
let ret = match do_write_batch(graph_store_ptr, snapshot_id, buf) {
Ok((has_ddl)) => {
let mut response = JnaResponse::new_success();
response.has_ddl(has_ddl);
response
}
Err(e) => {
let err_msg = format!("{:?}", e);
JnaResponse::new_error(&err_msg)
}
};
return ret;
}

fn do_write_batch<G: MultiVersionGraph>(
Expand All @@ -160,7 +157,7 @@ fn do_write_batch<G: MultiVersionGraph>(
let mut has_ddl = false;
let operations = proto.get_operations();
if operations.is_empty() {
return Ok(false);
return Ok(has_ddl);
}
for op in operations {
match op.get_opType() {
Expand Down Expand Up @@ -441,20 +438,65 @@ fn delete_edge<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operation
graph.delete_edge(snapshot_id, edge_id, &edge_kind, edge_location_pb.get_forward())
}

#[no_mangle]
pub extern "C" fn reopenSecondary(ptr: GraphHandle, wait_sec: i64) -> Box<JnaResponse> {
let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };
match graph_store_ptr.reopen(wait_sec as u64) {
Ok(_) => {
info!("Reopened store");
JnaResponse::new_success()
}
Err(e) => {
let msg = format!("Reopen failed: {:?}", e);
error!("{}", msg);
JnaResponse::new_error(&msg)
}
}
}

#[no_mangle]
pub extern "C" fn garbageCollectSnapshot(ptr: GraphHandle, snapshot_id: i64) -> Box<JnaResponse> {
let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };
// if snapshot_id % 60 != 0 {
// return JnaResponse::new_success();
// }
debug!("garbageCollectSnapshot si {}", snapshot_id);
match graph_store_ptr.gc(snapshot_id) {
Ok(_) => JnaResponse::new_success(),
Err(e) => {
let msg = format!("{:?}", e);
JnaResponse::new_error(&msg)
}
}
}

#[no_mangle]
pub extern "C" fn tryCatchUpWithPrimary(ptr: GraphHandle) -> Box<JnaResponse> {
let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };
match graph_store_ptr.try_catch_up_with_primary() {
Ok(_) => (),
Ok(_) => JnaResponse::new_success(),
Err(e) => {
error!("Error during catch up primary {:?}", e);
// sleep 2 min for the underlying storage catch latest changes.
match graph_store_ptr.reopen(120) {
Ok(_) => {
info!("Reopened store");
JnaResponse::new_success()
}
Err(e) => {
let msg = format!("Reopen failed: {:?}", e);
error!("{}", msg);
JnaResponse::new_error(&msg)
}
}
}
};
if snapshot_id % 3600 != 0 {
return JnaResponse::new_success();
}
match graph_store_ptr.gc(snapshot_id) {
}

#[no_mangle]
pub extern "C" fn compact(ptr: GraphHandle) -> Box<JnaResponse> {
let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };
match graph_store_ptr.compact() {
Ok(_) => JnaResponse::new_success(),
Err(e) => {
let msg = format!("{:?}", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ impl<D, F: BufferFactory<D>> Drop for BufferPool<D, F> {
}

impl<D, F: BufferFactory<D>> BufferFactory<D> for BufferPool<D, F> {
fn create(&mut self, batch_size: usize) -> Option<Buffer<D>> {
fn create(&mut self, _batch_size: usize) -> Option<Buffer<D>> {
// This method is only called by BufferPool, batch_size and self.batch_size are always same
// assert_eq!(batch_size, self.batch_size);
if let Some(inner) = self.fetch() {
Expand Down
18 changes: 8 additions & 10 deletions interactive_engine/executor/engine/pegasus/network/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,14 @@ impl SimpleServerDetector {

pub fn update_peer_view<Iter: Iterator<Item = (u64, ServerAddr)>>(&self, peer_view: Iter) {
let new_peers = peer_view
.map(|(id, server_addr)| {
loop {
let addr = server_addr.to_socket_addr();
if addr.is_ok() {
let addr = addr.unwrap();
return Server { id, addr }
} else {
error!("Cannot resolve address {:?}, retrying...", server_addr);
sleep(Duration::from_secs(3));
}
.map(|(id, server_addr)| loop {
let addr = server_addr.to_socket_addr();
if addr.is_ok() {
let addr = addr.unwrap();
return Server { id, addr };
} else {
error!("Cannot resolve address {:?}, retrying...", server_addr);
sleep(Duration::from_secs(3));
}
})
.collect::<Vec<Server>>();
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/executor/store/groot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ grpcio-sys = { version = "0.10", features = ["openssl"] }
# deactivation of bzip2 due to https://github.com/rust-rocksdb/rust-rocksdb/issues/609
# deactivation of zstd due to the 'hidden symbol "ZSTD_maxCLevel" is referenced by DSO' error
rocksdb = { version = "0.21.0", features = ["snappy", "lz4", "zlib"], default-features = false }
#rocksdb = { git = "https://github.com/siyuan0322/rust-rocksdb.git", rev = "c44ea2b", features = ["snappy", "lz4", "zlib"], default-features = false }
dyn_type = { path = "../../common/dyn_type" }
rustversion = "1.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ fn main() {
builder.add_storage_option("store.rocksdb.max.write.buffer.num", max_write_buffer_num);
builder.add_storage_option("store.rocksdb.level0.compaction.trigger", level_zero_compaction_trigger);
builder.add_storage_option("store.rocksdb.max.level.base.mb", max_level_base_mb);
builder.add_storage_option("store.data.path", &path);
let config = builder.build();
let store = Arc::new(GraphStore::open(&config, &path).unwrap());
let store = Arc::new(GraphStore::open(&config).unwrap());
println!("store opened.");
let mut type_def_builer = TypeDefBuilder::new();
type_def_builer.version(1);
Expand Down
17 changes: 8 additions & 9 deletions interactive_engine/executor/store/groot/src/db/graph/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ use crate::db::graph::bin::{
use crate::db::graph::codec::get_codec_version;
use crate::db::graph::entity::{RocksEdgeImpl, RocksVertexImpl};
use crate::db::graph::types::{EdgeInfo, EdgeKindInfo, VertexTypeInfo};
use crate::db::storage::ExternalStorage;
use crate::db::storage::rocksdb::RocksDB;

pub struct VertexTypeScan {
storage: Arc<dyn ExternalStorage>,
storage: Arc<RocksDB>,
si: SnapshotId,
vertex_type_info: Arc<VertexTypeInfo>,
with_prop: bool,
Expand All @@ -33,8 +33,7 @@ fn check_e(id: EdgeId, ts: SnapshotId, prev_id: Option<EdgeId>, data_ts: Snapsho

impl VertexTypeScan {
pub fn new(
storage: Arc<dyn ExternalStorage>, si: SnapshotId, vertex_type_info: Arc<VertexTypeInfo>,
with_prop: bool,
storage: Arc<RocksDB>, si: SnapshotId, vertex_type_info: Arc<VertexTypeInfo>, with_prop: bool,
) -> Self {
VertexTypeScan { storage, si, vertex_type_info, with_prop }
}
Expand Down Expand Up @@ -88,7 +87,7 @@ impl IntoIterator for VertexTypeScan {
}

pub struct EdgeTypeScan {
storage: Arc<dyn ExternalStorage>,
storage: Arc<RocksDB>,
si: SnapshotId,
edge_info: Arc<EdgeInfo>,
vertex_id: Option<VertexId>,
Expand All @@ -98,8 +97,8 @@ pub struct EdgeTypeScan {

impl EdgeTypeScan {
pub fn new(
storage: Arc<dyn ExternalStorage>, si: SnapshotId, edge_info: Arc<EdgeInfo>,
vertex_id: Option<VertexId>, direction: EdgeDirection, with_prop: bool,
storage: Arc<RocksDB>, si: SnapshotId, edge_info: Arc<EdgeInfo>, vertex_id: Option<VertexId>,
direction: EdgeDirection, with_prop: bool,
) -> Self {
EdgeTypeScan { storage, si, edge_info, vertex_id, direction, with_prop }
}
Expand Down Expand Up @@ -136,7 +135,7 @@ impl IntoIterator for EdgeTypeScan {
}

pub struct EdgeKindScan {
storage: Arc<dyn ExternalStorage>,
storage: Arc<RocksDB>,
si: SnapshotId,
edge_kind_info: Arc<EdgeKindInfo>,
vertex_id: Option<VertexId>,
Expand All @@ -146,7 +145,7 @@ pub struct EdgeKindScan {

impl EdgeKindScan {
pub fn new(
storage: Arc<dyn ExternalStorage>, si: SnapshotId, edge_kind_info: Arc<EdgeKindInfo>,
storage: Arc<RocksDB>, si: SnapshotId, edge_kind_info: Arc<EdgeKindInfo>,
vertex_id: Option<VertexId>, direction: EdgeDirection, with_prop: bool,
) -> Self {
EdgeKindScan { storage, si, edge_kind_info, vertex_id, direction, with_prop }
Expand Down
12 changes: 7 additions & 5 deletions interactive_engine/executor/store/groot/src/db/graph/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ use crate::db::common::bytes::util::parse_pb;
use crate::db::common::str::parse_str;
use crate::db::proto::model::DataLoadTargetPb;
use crate::db::proto::schema_common::EdgeKindPb;
use crate::db::storage::ExternalStorage;
use crate::db::storage::rocksdb::RocksDB;
use crate::db::util::lock::GraphMutexLock;

const META_TABLE_ID: TableId = i64::min_value();

pub struct Meta {
store: Arc<dyn ExternalStorage>,
store: Arc<RocksDB>,
graph_def_lock: GraphMutexLock<GraphDef>,
}

impl Meta {
pub fn new(store: Arc<dyn ExternalStorage>) -> Self {
pub fn new(store: Arc<RocksDB>) -> Self {
Meta { store, graph_def_lock: GraphMutexLock::new(GraphDef::default()) }
}

Expand Down Expand Up @@ -769,7 +769,7 @@ impl ItemCommon for RemoveEdgeKindItem {
}
}

fn get_items<I: ItemCommon>(store: &dyn ExternalStorage) -> GraphResult<Vec<I>> {
fn get_items<I: ItemCommon>(store: &RocksDB) -> GraphResult<Vec<I>> {
let mut ret = Vec::new();
let mut prefix = Vec::new();
let table_prefix = transform::i64_to_arr(META_TABLE_ID.to_be());
Expand Down Expand Up @@ -832,7 +832,9 @@ mod tests {
let path = "test_meta_normal";
fs::rmr(path).unwrap();
{
let db = RocksDB::open(&HashMap::new(), path).unwrap();
let mut config = HashMap::new();
config.insert("store.data.path".to_owned(), path.to_owned());
let db = RocksDB::open(&config).unwrap();
let store = Arc::new(db);
let meta = Meta::new(store.clone());
let mut schema_version = 1;
Expand Down
Loading

0 comments on commit 5829153

Please sign in to comment.