diff --git a/analytical_engine/core/launcher.h b/analytical_engine/core/launcher.h index 38351ddc1142..5e8437ef6133 100644 --- a/analytical_engine/core/launcher.h +++ b/analytical_engine/core/launcher.h @@ -19,7 +19,7 @@ #include #include -#include "boost/process/detail/child_decl.hpp" +#include "boost/process.hpp" #include "core/flags.h" #include "grape/worker/comm_spec.h" diff --git a/analytical_engine/core/object/fragment_wrapper.h b/analytical_engine/core/object/fragment_wrapper.h index da51a2ca3cc2..ab29fa63c8b1 100644 --- a/analytical_engine/core/object/fragment_wrapper.h +++ b/analytical_engine/core/object/fragment_wrapper.h @@ -35,6 +35,7 @@ #include "boost/leaf/error.hpp" #include "boost/leaf/result.hpp" #include "grape/fragment/immutable_edgecut_fragment.h" +// #include "grape/fragment/immutable_vertexcut_fragment.h" #include "grape/serialization/in_archive.h" #include "grape/worker/comm_spec.h" #include "vineyard/client/client.h" @@ -1094,6 +1095,74 @@ class FragmentWrapper< std::shared_ptr fragment_; }; +/* +template +class FragmentWrapper< + grape::ImmutableVertexcutFragment> + : public IFragmentWrapper { + using fragment_t = + grape::ImmutableVertexcutFragment; + + public: + FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def, + std::shared_ptr fragment) + : IFragmentWrapper(id), + graph_def_(std::move(graph_def)), + fragment_(std::move(fragment)) { + CHECK_EQ(graph_def_.graph_type(), rpc::graph::IMMUTABLE_EDGECUT); + } + + std::shared_ptr fragment() const override { + return std::static_pointer_cast(fragment_); + } + + const rpc::graph::GraphDefPb& graph_def() const override { + return graph_def_; + } + + rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; } + + bl::result> CopyGraph( + const grape::CommSpec& comm_spec, const std::string& dst_graph_name, + const std::string& copy_type) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Cannot copy the ArrowProjectedFragment"); + } + + bl::result> ReportGraph( + const grape::CommSpec& comm_spec, const rpc::GSParams& params) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Not implemented."); + } + + bl::result> ToDirected( + const grape::CommSpec& comm_spec, + const std::string& dst_graph_name) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Cannot convert to the directed DynamicProjectedFragment"); + } + + bl::result> ToUndirected( + const grape::CommSpec& comm_spec, + const std::string& dst_graph_name) override { + RETURN_GS_ERROR( + vineyard::ErrorCode::kInvalidOperationError, + "Cannot convert to the undirected DynamicProjectedFragment"); + } + + bl::result> CreateGraphView( + const grape::CommSpec& comm_spec, const std::string& dst_graph_name, + const std::string& copy_type) override { + RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError, + "Cannot generate a view over the ArrowProjectedFragment"); + } + + private: + rpc::graph::GraphDefPb graph_def_; + std::shared_ptr fragment_; +}; +*/ + #ifdef NETWORKX /** * @brief A specialized FragmentWrapper for DynamicFragment. diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 0e16bd2078ed..7fdc708474da 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -89,6 +89,7 @@ data: secondary.instance.enabled={{ .Values.secondary.enabled }} store.data.secondary.path={{ .Values.secondary.storeDataPath }} store.gc.interval.ms={{ .Values.storeGcIntervalMs }} + store.catchup.interval.ms={{ .Values.storeCatchupIntervalMs }} write.ha.enabled={{ .Values.backup.enabled }} diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index a3cfcde1f709..b3eec69bbcff 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -528,7 +528,7 @@ discoveryMode: "file" ## Coordinator Config snapshotIncreaseIntervalMs: 1000 -offsetsPersistIntervalMs: 3000 +offsetsPersistIntervalMs: 1000 fileMetaStorePath: "/etc/groot/my.meta" logRecycleEnable: true logRecycleOffsetReserve: 86400 @@ -538,9 +538,10 @@ storeDataPath: "/var/lib/graphscope-store" storeDataDownloadPath: "/var/lib/graphscope-store/download" storeDataSecondaryPath: "/home/graphscope/secondary" storeWriteThreadCount: 1 -storeQueueBufferSize: "1024000" +storeQueueBufferSize: "102400" storeGcIntervalMs: 5000 +storeCatchupIntervalMs: 5000 ## Kafka Config ## diff --git a/interactive_engine/assembly/src/conf/groot/logback.xml b/interactive_engine/assembly/src/conf/groot/logback.xml index 05953484c7ef..23563a882c9d 100644 --- a/interactive_engine/assembly/src/conf/groot/logback.xml +++ b/interactive_engine/assembly/src/conf/groot/logback.xml @@ -36,6 +36,7 @@ + diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java index a0d093d7bb87..2dcbbaf704f2 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CoordinatorConfig.java @@ -20,7 +20,7 @@ public class CoordinatorConfig { Config.longConfig("snapshot.increase.interval.ms", 1000L); public static final Config OFFSETS_PERSIST_INTERVAL_MS = - Config.longConfig("offsets.persist.interval.ms", 3000L); + Config.longConfig("offsets.persist.interval.ms", 1000L); public static final Config LOG_RECYCLE_ENABLE = Config.boolConfig("log.recycle.enable", false); diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java index db83389a63f1..35ba45a24570 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/StoreConfig.java @@ -24,7 +24,7 @@ public class StoreConfig { Config.intConfig("store.write.thread.count", 1); public static final Config STORE_QUEUE_BUFFER_SIZE = - Config.intConfig("store.queue.buffer.size", 1024000); + Config.intConfig("store.queue.buffer.size", 102400); public static final Config STORE_QUEUE_WAIT_MS = Config.longConfig("store.queue.wait.ms", 3000L); diff --git a/interactive_engine/executor/assembly/groot/src/store/graph.rs b/interactive_engine/executor/assembly/groot/src/store/graph.rs index 4a91c6e34df9..ce85f33a57d0 100644 --- a/interactive_engine/executor/assembly/groot/src/store/graph.rs +++ b/interactive_engine/executor/assembly/groot/src/store/graph.rs @@ -40,7 +40,7 @@ static INIT: Once = Once::new(); #[no_mangle] pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHandle { - trace!("openGraphStore"); + debug!("openGraphStore"); let buf = unsafe { ::std::slice::from_raw_parts(config_bytes, len) }; let proto = parse_pb::(buf).expect("parse config pb failed"); let mut config_builder = GraphConfigBuilder::new(); @@ -77,7 +77,7 @@ pub extern "C" fn closeGraphStore(handle: GraphHandle) -> bool { #[no_mangle] pub extern "C" fn getGraphDefBlob(ptr: GraphHandle) -> Box { - trace!("getGraphDefBlob"); + debug!("getGraphDefBlob"); unsafe { let graph_store_ptr = &*(ptr as *const GraphStore); match graph_store_ptr.get_graph_def_blob() { @@ -100,7 +100,7 @@ pub extern "C" fn getGraphDefBlob(ptr: GraphHandle) -> Box { #[no_mangle] pub extern "C" fn ingestData(ptr: GraphHandle, path: *const c_char) -> Box { - trace!("ingestData"); + debug!("ingestData"); unsafe { let graph_store_ptr = &*(ptr as *const GraphStore); let slice = CStr::from_ptr(path).to_bytes(); @@ -119,7 +119,7 @@ pub extern "C" fn ingestData(ptr: GraphHandle, path: *const c_char) -> Box Box { - trace!("writeBatch"); + debug!("writeBatch"); let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) }; let buf = unsafe { ::std::slice::from_raw_parts(data, len) }; @@ -139,7 +139,7 @@ pub extern "C" fn writeBatch( #[no_mangle] pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box { - trace!("getGraphStatistics"); + debug!("getGraphStatistics"); unsafe { let graph_store_ptr = &*(ptr as *const GraphStore); match graph_store_ptr.get_graph_statistics_blob(snapshot_id) { @@ -163,7 +163,7 @@ pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box< fn do_write_batch( graph: &G, snapshot_id: SnapshotId, buf: &[u8], ) -> GraphResult { - trace!("do_write_batch"); + debug!("do_write_batch"); let proto = parse_pb::(buf)?; let mut has_ddl = false; let operations = proto.get_operations(); @@ -241,7 +241,7 @@ fn do_write_batch( fn commit_data_load( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("commit_data_load"); + info!("commit_data_load"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let commit_data_load_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -256,7 +256,7 @@ fn commit_data_load( fn prepare_data_load( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("prepare_data_load"); + info!("prepare_data_load"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let prepare_data_load_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -269,7 +269,7 @@ fn prepare_data_load( fn create_vertex_type( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("create_vertex_type"); + info!("create_vertex_type"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let create_vertex_type_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -283,7 +283,7 @@ fn create_vertex_type( fn add_vertex_type_properties( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("add_vertex_type_properties"); + info!("add_vertex_type_properties"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let add_vertex_type_properties_pb = @@ -298,7 +298,7 @@ fn add_vertex_type_properties( fn drop_vertex_type( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("drop_vertex_type"); + info!("drop_vertex_type"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let label_id_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -309,7 +309,7 @@ fn drop_vertex_type( fn create_edge_type( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("create_edge_type"); + info!("create_edge_type"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let typedef_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -321,7 +321,7 @@ fn create_edge_type( fn add_edge_type_properties( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("add_edge_type_properties"); + info!("add_edge_type_properties"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let typedef_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -333,7 +333,7 @@ fn add_edge_type_properties( fn drop_edge_type( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("drop_edge_type"); + info!("drop_edge_type"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let label_id_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -342,7 +342,7 @@ fn drop_edge_type( } fn add_edge_kind(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult { - trace!("add_edge_kind"); + info!("add_edge_kind"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let add_edge_kind_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -355,7 +355,7 @@ fn add_edge_kind(graph: &G, snapshot_id: i64, op: &Operati fn remove_edge_kind( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult { - trace!("remove_edge_kind"); + info!("remove_edge_kind"); let ddl_operation_pb = parse_pb::(op.get_dataBytes())?; let schema_version = ddl_operation_pb.get_schemaVersion(); let edge_kind_pb = parse_pb::(ddl_operation_pb.get_ddlBlob())?; @@ -366,7 +366,7 @@ fn remove_edge_kind( fn overwrite_vertex( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult<()> { - trace!("overwrite_vertex"); + debug!("overwrite_vertex"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let vertex_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; @@ -380,7 +380,7 @@ fn overwrite_vertex( } fn update_vertex(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> { - trace!("update_vertex"); + debug!("update_vertex"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let vertex_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; @@ -396,7 +396,7 @@ fn update_vertex(graph: &G, snapshot_id: i64, op: &Operati fn clear_vertex_properties( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult<()> { - trace!("clear vertex properties"); + debug!("clear vertex properties"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let vertex_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; @@ -409,7 +409,7 @@ fn clear_vertex_properties( } fn delete_vertex(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> { - trace!("delete_vertex"); + debug!("delete_vertex"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let vertex_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; diff --git a/interactive_engine/executor/store/groot/src/db/graph/meta.rs b/interactive_engine/executor/store/groot/src/db/graph/meta.rs index 6f184d123b24..11ba598d8f1a 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/meta.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/meta.rs @@ -32,6 +32,7 @@ impl Meta { } pub fn recover(&self) -> GraphResult<(VertexTypeManager, EdgeTypeManager)> { + debug!("meta graph recover start"); { let mut graph_def_val = self.graph_def_lock.lock()?; *graph_def_val = GraphDef::new( @@ -237,6 +238,7 @@ impl Meta { } } } + debug!("meta graph recovered"); Ok((vertex_manager_builder.build(), edge_manager_builder.build())) } diff --git a/interactive_engine/executor/store/groot/src/db/graph/store.rs b/interactive_engine/executor/store/groot/src/db/graph/store.rs index e34e9bed3a3e..fe9d0f8776af 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/store.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/store.rs @@ -249,7 +249,7 @@ impl MultiVersionGraph for GraphStore { fn create_vertex_type( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, table_id: i64, ) -> GraphResult { - debug!("create_vertex_type"); + info!("create_vertex_type"); let _guard = res_unwrap!(self.lock.lock(), create_vertex_type)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -274,7 +274,7 @@ impl MultiVersionGraph for GraphStore { fn add_vertex_type_properties( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, table_id: i64, ) -> GraphResult { - debug!("add_vertex_type_properties"); + info!("add_vertex_type_properties"); let _guard = res_unwrap!(self.lock.lock(), add_vertex_type_properties)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -299,7 +299,7 @@ impl MultiVersionGraph for GraphStore { fn create_edge_type( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, ) -> GraphResult { - debug!("create_edge_type"); + info!("create_edge_type"); let _guard = res_unwrap!(self.lock.lock(), create_edge_type)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -323,7 +323,7 @@ impl MultiVersionGraph for GraphStore { fn add_edge_type_properties( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, ) -> GraphResult { - debug!("add_edge_type_properties"); + info!("add_edge_type_properties"); let _guard = res_unwrap!(self.lock.lock(), add_edge_type_properties)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -347,7 +347,7 @@ impl MultiVersionGraph for GraphStore { fn add_edge_kind( &self, si: i64, schema_version: i64, edge_kind: &EdgeKind, table_id: i64, ) -> GraphResult { - debug!("add_edge_kind"); + info!("add_edge_kind"); let _guard = res_unwrap!(self.lock.lock(), add_edge_kind)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -373,7 +373,7 @@ impl MultiVersionGraph for GraphStore { } fn drop_vertex_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult { - debug!("drop_vertex_type"); + info!("drop_vertex_type"); let _guard = res_unwrap!(self.lock.lock(), drop_vertex_type, si, label_id)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -387,7 +387,7 @@ impl MultiVersionGraph for GraphStore { } fn drop_edge_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult { - debug!("drop_edge_type"); + info!("drop_edge_type"); let _guard = res_unwrap!(self.lock.lock(), drop_edge_type, si, label_id)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -401,7 +401,7 @@ impl MultiVersionGraph for GraphStore { } fn remove_edge_kind(&self, si: i64, schema_version: i64, edge_kind: &EdgeKind) -> GraphResult { - debug!("remove_edge_kind"); + info!("remove_edge_kind"); let _guard = res_unwrap!(self.lock.lock(), remove_edge_kind, si, edge_kind)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -630,6 +630,7 @@ impl MultiVersionGraph for GraphStore { } fn get_graph_def_blob(&self) -> GraphResult> { + debug!("get graphdef blob"); let graph_def = self.meta.get_graph_def().lock()?; let pb = graph_def.to_proto()?; pb.write_to_bytes() @@ -639,6 +640,7 @@ impl MultiVersionGraph for GraphStore { fn prepare_data_load( &self, si: i64, schema_version: i64, target: &DataLoadTarget, table_id: i64, ) -> GraphResult { + info!("prepare data load"); let _guard = res_unwrap!(self.lock.lock(), prepare_data_load)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -653,6 +655,7 @@ impl MultiVersionGraph for GraphStore { &self, si: i64, schema_version: i64, target: &DataLoadTarget, table_id: i64, partition_id: i32, unique_path: &str, ) -> GraphResult { + info!("commit data load"); let _guard = res_unwrap!(self.lock.lock(), prepare_data_load)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { diff --git a/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs b/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs index 080f213d8293..a6f5c0e09dfc 100644 --- a/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs +++ b/interactive_engine/executor/store/groot/src/db/storage/rocksdb.rs @@ -6,7 +6,7 @@ use std::time::Duration; use ::rocksdb::backup::{BackupEngine, BackupEngineOptions, RestoreOptions}; use ::rocksdb::{DBRawIterator, Env, IngestExternalFileOptions, Options, ReadOptions, DB}; -use crossbeam_epoch::{self as epoch, Atomic, Guard, Owned, Shared}; +use crossbeam_epoch::{self as epoch, Atomic, Owned}; use rocksdb::WriteBatch; use super::{StorageIter, StorageRes}; @@ -58,7 +58,7 @@ impl RocksDB { .clone(); if reopen { while Path::new(&sec_path).exists() { - sec_path = format!("{}_1", sec_path); + sec_path = increment_path_string(&sec_path); } } let opts = init_secondary_options(options); @@ -66,55 +66,40 @@ impl RocksDB { DB::open_as_secondary(&opts, path, &sec_path) } - fn get_db<'g>(&self, guard: &'g Guard) -> Shared<'g, Arc> { - self.db.load(Ordering::Acquire, guard) + pub fn get_db(&self) -> GraphResult> { + let guard = &epoch::pin(); + let db_shared = self.db.load(Ordering::SeqCst, guard); + if let Some(db) = unsafe { db_shared.as_ref() } { + Ok(db.clone()) + } else { + let msg = format!("rocksdb.scan_from failed because the acquired db is `None`"); + let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); + Err(err) + } } fn replace_db(&self, db: DB) { - // To force any deferred work to run, we need the epoch to move forward two times. - epoch::pin().flush(); - epoch::pin().flush(); - let guard = epoch::pin(); + let guard = &epoch::pin(); let new_db = Arc::new(db); - let new_db_shared = Owned::new(new_db).into_shared(&guard); - let old_db_shared = self - .db - .swap(new_db_shared, Ordering::Release, &guard); + let cur = Owned::new(new_db).into_shared(guard); - let default = "".to_string(); - let path = self - .options - .get("store.data.path") - .unwrap_or(&default); - // Use Crossbeam's 'defer' mechanism to safely drop the old Arc + let prev = self.db.swap(cur, Ordering::Release, guard); unsafe { - // Convert 'Shared' back to 'Arc' for deferred dropping - // guard.defer_destroy(old_db_shared) - guard.defer_unchecked(move || { - info!("Dropped RocksDB {:}", path); - drop(old_db_shared.into_owned()) - }) + drop(prev.into_owned()); } - info!("RocksDB {:} replaced", path); + info!("RocksDB replaced"); } pub fn get(&self, key: &[u8]) -> GraphResult> { - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - match db.get(key) { - Ok(Some(v)) => Ok(Some(StorageRes::RocksDB(v))), - Ok(None) => Ok(None), - Err(e) => { - let msg = format!("rocksdb.get failed because {}", e.into_string()); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + match db.get(key) { + Ok(Some(v)) => Ok(Some(StorageRes::RocksDB(v))), + Ok(None) => Ok(None), + Err(e) => { + let msg = format!("rocksdb.get failed because {}", e.into_string()); + let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); + Err(err) } - } else { - let msg = format!("rocksdb.get failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) } } @@ -123,18 +108,11 @@ impl RocksDB { info!("Cannot put in secondary instance"); return Ok(()); } - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - db.put(key, val).map_err(|e| { - let msg = format!("rocksdb.put failed because {}", e.into_string()); - gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) - }) - } else { - let msg = format!("rocksdb.put failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + db.put(key, val).map_err(|e| { + let msg = format!("rocksdb.put failed because {}", e.into_string()); + gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) + }) } pub fn delete(&self, key: &[u8]) -> GraphResult<()> { @@ -142,54 +120,26 @@ impl RocksDB { info!("Cannot delete in secondary instance"); return Ok(()); } - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - db.delete(key).map_err(|e| { - let msg = format!("rocksdb.delete failed because {}", e.into_string()); - gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) - }) - } else { - let msg = format!("rocksdb.delete failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + db.delete(key).map_err(|e| { + let msg = format!("rocksdb.delete failed because {}", e.into_string()); + gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) + }) } pub fn scan_prefix(&self, prefix: &[u8]) -> GraphResult { - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - Ok(StorageIter::RocksDB(RocksDBIter::new_prefix(db.clone(), prefix, guard))) - } else { - let msg = format!("rocksdb.scan_prefix failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + Ok(StorageIter::RocksDB(RocksDBIter::new_prefix(db, prefix))) } pub fn scan_from(&self, start: &[u8]) -> GraphResult { - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - Ok(StorageIter::RocksDB(RocksDBIter::new_start(db.clone(), start, guard))) - } else { - let msg = format!("rocksdb.scan_from failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + Ok(StorageIter::RocksDB(RocksDBIter::new_start(db, start))) } pub fn scan_range(&self, start: &[u8], end: &[u8]) -> GraphResult { - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - Ok(StorageIter::RocksDB(RocksDBIter::new_range(db.clone(), start, end, guard))) - } else { - let msg = format!("rocksdb.new_range failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + Ok(StorageIter::RocksDB(RocksDBIter::new_range(db, start, end))) } pub fn delete_range(&self, start: &[u8], end: &[u8]) -> GraphResult<()> { @@ -198,31 +148,24 @@ impl RocksDB { return Ok(()); } let mut batch = WriteBatch::default(); - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - // db.delete_file_in_range(start, end); - batch.delete_range(start, end); - db.write(batch).map_err(|e| { - let msg = format!("rocksdb.delete_range failed because {}", e.into_string()); - gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) - })?; - let mut val = false; - if let Some(conf_str) = self - .options - .get("store.rocksdb.disable.auto.compactions") - { - val = conf_str.parse::().unwrap(); - } - if !val { - db.compact_range(Option::Some(start), Option::Some(end)) - } - Ok(()) - } else { - let msg = format!("rocksdb.delete_range failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) + let db = self.get_db()?; + let _ = db.delete_file_in_range(start, end); + batch.delete_range(start, end); + db.write(batch).map_err(|e| { + let msg = format!("rocksdb.delete_range failed because {}", e.into_string()); + gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) + })?; + let mut val = false; + if let Some(conf_str) = self + .options + .get("store.rocksdb.disable.auto.compactions") + { + val = conf_str.parse::().unwrap(); + } + if !val { + db.compact_range(Option::Some(start), Option::Some(end)) } + Ok(()) } pub fn compact(&self) -> GraphResult<()> { @@ -231,18 +174,10 @@ impl RocksDB { info!("Cannot compact in secondary instance"); return Ok(()); } - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - - if let Some(db) = unsafe { db_shared.as_ref() } { - db.compact_range(None::<&[u8]>, None::<&[u8]>); - info!("compacted rocksdb"); - Ok(()) - } else { - let msg = format!("rocksdb.compact failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + db.compact_range(None::<&[u8]>, None::<&[u8]>); + info!("compacted rocksdb"); + Ok(()) } pub fn load(&self, files: &[&str]) -> GraphResult<()> { @@ -252,19 +187,12 @@ impl RocksDB { } let mut options = IngestExternalFileOptions::default(); options.set_move_files(true); - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - db.ingest_external_file_opts(&options, files.to_vec()) - .map_err(|e| { - let msg = format!("rocksdb.load file {:?} failed because {}", files, e.into_string()); - gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) - }) - } else { - let msg = format!("rocksdb.load failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + db.ingest_external_file_opts(&options, files.to_vec()) + .map_err(|e| { + let msg = format!("rocksdb.load file {:?} failed because {}", files, e.into_string()); + gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) + }) } pub fn open_backup_engine(&self, backup_path: &str) -> GraphResult> { @@ -288,46 +216,24 @@ impl RocksDB { ); gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) })?; - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - let ret = RocksDBBackupEngine { db: db.clone(), backup_engine }; - Ok(Box::from(ret)) - } else { - let msg = format!("open rocksdb backup engine failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + Ok(Box::from(RocksDBBackupEngine { db, backup_engine })) } pub fn new_scan(&self, prefix: &[u8]) -> GraphResult + Send>> { - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - Ok(Box::new(Scan::new(db.clone(), prefix, guard))) - } else { - let msg = format!("rocksdb.new_scan failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + Ok(Box::new(Scan::new(db, prefix))) } pub fn try_catch_up_with_primary(&self) -> GraphResult<()> { if !self.is_secondary { return Ok(()); } - let guard = epoch::pin(); - let db_shared = self.get_db(&guard); - if let Some(db) = unsafe { db_shared.as_ref() } { - db.try_catch_up_with_primary().map_err(|e| { - let msg = format!("rocksdb.try_catch_up_with_primary failed because {:?}", e); - gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) - }) - } else { - let msg = format!("rocksdb.try_catch_up_with_primary failed because the acquired db is `None`"); - let err = gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg); - Err(err) - } + let db = self.get_db()?; + db.try_catch_up_with_primary().map_err(|e| { + let msg = format!("rocksdb.try_catch_up_with_primary failed because {:?}", e); + gen_graph_err!(ErrorCode::EXTERNAL_STORAGE_ERROR, msg) + }) } pub fn reopen(&self, wait_sec: u64) -> GraphResult<()> { @@ -353,13 +259,26 @@ impl RocksDB { } } +fn increment_path_string(string: &str) -> String { + let file_parts: Vec<&str> = string.rsplitn(2, "/").collect(); + let mut parts: Vec<&str> = file_parts[0].rsplitn(2, '_').collect(); + parts.reverse(); + let last = parts[parts.len() - 1]; + let number: i64 = if parts.len() == 1 || !last.chars().all(|c| c.is_digit(10)) { + 0 + } else { + last.parse::().unwrap() + 1 + }; + format!("{}/{}_{}", file_parts[1], parts[0], number) +} + pub struct Scan<'a> { inner_iter: RocksDBIter<'a>, } impl<'a> Scan<'a> { - pub fn new(db: Arc, prefix: &[u8], guard: Guard) -> Self { - Scan { inner_iter: RocksDBIter::new_prefix(db, prefix, guard) } + pub fn new(db: Arc, prefix: &[u8]) -> Self { + Scan { inner_iter: RocksDBIter::new_prefix(db, prefix) } } } @@ -501,82 +420,78 @@ fn init_options(options: &HashMap) -> Options { } pub struct RocksDBIter<'a> { - _db: Arc, - inner: Option>, + _db: *const DB, + inner: DBRawIterator<'a>, just_seeked: bool, - _guard: Guard, } unsafe impl Send for RocksDBIter<'_> {} impl<'a> RocksDBIter<'a> { - fn new_prefix(db: Arc, prefix: &[u8], guard: Guard) -> Self { - let db_ptr = Arc::into_raw(db.clone()) as *const DB; - let mut db_iter = Self { _db: db, inner: None, just_seeked: true, _guard: guard }; - let db_ref = unsafe { &*db_ptr }; - let mut iter = match bytes_upper_bound(prefix) { + fn new_prefix(db: Arc, prefix: &[u8]) -> Self { + let opt = match bytes_upper_bound(prefix) { Some(upper) => { let mut option = ReadOptions::default(); option.set_iterate_upper_bound(upper); - db_ref.raw_iterator_opt(option) + option } - None => db_ref.raw_iterator(), + None => ReadOptions::default(), }; + let db_ptr = Arc::into_raw(db.clone()) as *const DB; + let db_ref = unsafe { &*db_ptr }; + let mut iter = db_ref.raw_iterator_opt(opt); iter.seek(prefix); - - db_iter.inner = Some(iter); - + let db_iter = Self { _db: db_ptr, inner: iter, just_seeked: true }; db_iter } - fn new_start(db: Arc, start: &[u8], guard: Guard) -> Self { + fn new_start(db: Arc, start: &[u8]) -> Self { let db_ptr = Arc::into_raw(db.clone()) as *const DB; - let mut db_iter = Self { _db: db, inner: None, just_seeked: true, _guard: guard }; let db_ref = unsafe { &*db_ptr }; let mut iter = db_ref.raw_iterator(); iter.seek(start); - db_iter.inner = Some(iter); - + let db_iter = Self { _db: db_ptr, inner: iter, just_seeked: true }; db_iter } - fn new_range(db: Arc, start: &[u8], end: &[u8], guard: Guard) -> Self { + fn new_range(db: Arc, start: &[u8], end: &[u8]) -> Self { let db_ptr = Arc::into_raw(db.clone()) as *const DB; - let mut db_iter = Self { _db: db, inner: None, just_seeked: true, _guard: guard }; let db_ref = unsafe { &*db_ptr }; let mut option = ReadOptions::default(); option.set_iterate_upper_bound(end.to_vec()); let mut iter = db_ref.raw_iterator_opt(option); iter.seek(start); - - db_iter.inner = Some(iter); - + let db_iter = Self { _db: db_ptr, inner: iter, just_seeked: true }; db_iter } pub fn next(&mut self) -> Option<(&[u8], &[u8])> { - if let Some(inner) = &mut self.inner { - if !inner.valid() { - return None; - } + if !self.inner.valid() { + return None; + } - if self.just_seeked { - self.just_seeked = false; - } else { - inner.next(); - } + if self.just_seeked { + self.just_seeked = false; + } else { + self.inner.next(); + } - if inner.valid() { - Some((inner.key().unwrap(), inner.value().unwrap())) - } else { - None - } + if self.inner.valid() { + Some((self.inner.key().unwrap(), self.inner.value().unwrap())) } else { None } } } +impl<'a> Drop for RocksDBIter<'a> { + fn drop(&mut self) { + unsafe { + Arc::from_raw(self._db); + } + } +} + fn bytes_upper_bound(bytes: &[u8]) -> Option> { for i in (0..bytes.len()).rev() { if bytes[i] != u8::MAX { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java index b3dce2844287..b42dc7ce3126 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaManager.java @@ -212,6 +212,7 @@ private void recover() { } private void recoverInternal() throws IOException, ExecutionException, InterruptedException { + logger.debug("Start to recover SchemaManager"); long snapshotId = this.snapshotManager.increaseWriteSnapshotId(); CompletableFuture future = new CompletableFuture<>(); this.snapshotManager.addSnapshotListener(snapshotId, () -> future.complete(null)); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java index 579003af4e01..d74450e70880 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/coordinator/SchemaService.java @@ -22,7 +22,11 @@ import io.grpc.stub.StreamObserver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class SchemaService extends SchemaGrpc.SchemaImplBase { + private static final Logger logger = LoggerFactory.getLogger(SchemaService.class); private final SchemaManager schemaManager; @@ -37,6 +41,7 @@ public void submitBatchDdl( String requestId = request.getRequestId(); String sessionId = request.getSessionId(); DdlRequestBatch ddlRequestBatch = DdlRequestBatch.parseProto(request.getDdlRequests()); + logger.info("submitBatchDdl {}", ddlRequestBatch.toProto()); this.schemaManager.submitBatchDdl( requestId, sessionId, diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java index f858bb5da13c..d99e97c2a31b 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/KafkaProcessor.java @@ -269,6 +269,9 @@ public void replayWAL() throws IOException { while ((record = logReader.readNextRecord()) != null) { queue.put(record); replayCount++; + if (replayCount % 10000 == 0) { + logger.info("replayed {} records", replayCount); + } } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java index 1f21cecbad05..c58ae8ddc6ee 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreSchemaService.java @@ -35,6 +35,7 @@ public StoreSchemaService(StoreService storeService) { @Override public void fetchSchema( FetchSchemaRequest request, StreamObserver responseObserver) { + logger.debug("received fetch schema request"); try { GraphDefPb graphDefBlob = this.storeService.getGraphDefBlob(); responseObserver.onNext( @@ -50,6 +51,7 @@ public void fetchSchema( public void fetchStatistics( FetchStatisticsRequest request, StreamObserver responseObserver) { + logger.debug("received fetch statistics request"); try { Map map = this.storeService.getGraphStatisticsBlob(request.getSnapshotId()); diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java index d537032f1082..edde700c84d9 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/WriterAgent.java @@ -136,7 +136,7 @@ public boolean writeStore(StoreDataBatch storeDataBatch) throws InterruptedExcep // logger.info("writeStore {}", storeDataBatch.toProto()); // int queueId = storeDataBatch.getQueueId(); boolean suc = this.bufferQueue.offerQueue(0, storeDataBatch); - logger.debug("Buffer queue: {}, {}", suc, this.bufferQueue.innerQueueSizes()); + // logger.debug("Buffer queue: {}, {}", suc, this.bufferQueue.innerQueueSizes()); return suc; } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java index 554c0c0e1998..3d18e111f9c9 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/jna/JnaGraphStore.java @@ -98,6 +98,7 @@ public void close() throws IOException { @Override public boolean writeBatch(long snapshotId, OperationBatch operationBatch) throws IOException { + logger.debug("write batch {}", operationBatch.toProto()); byte[] dataBytes = operationBatch.toProto().toByteArray(); try (JnaResponse response = GraphLibrary.INSTANCE.writeBatch( @@ -112,6 +113,7 @@ public boolean writeBatch(long snapshotId, OperationBatch operationBatch) throws @Override public GraphDefPb getGraphDefBlob() throws IOException { + logger.debug("getGraphDefBlob"); try (JnaResponse jnaResponse = GraphLibrary.INSTANCE.getGraphDefBlob(this.pointer)) { if (!jnaResponse.success()) { String errMsg = jnaResponse.getErrMsg(); @@ -123,6 +125,7 @@ public GraphDefPb getGraphDefBlob() throws IOException { @Override public Statistics getGraphStatisticsBlob(long si) throws IOException { + logger.debug("getGraphStatisticsBlob"); try (JnaResponse jnaResponse = GraphLibrary.INSTANCE.getGraphStatistics(this.pointer, si)) { if (!jnaResponse.success()) { String errMsg = jnaResponse.getErrMsg(); @@ -134,6 +137,7 @@ public Statistics getGraphStatisticsBlob(long si) throws IOException { @Override public void ingestExternalFile(ExternalStorage storage, String sstPath) throws IOException { + logger.debug("ingestExternalFile"); String[] items = sstPath.split("/"); String unique_path = items[items.length - 2]; String sstName = sstPath.substring(sstPath.lastIndexOf('/') + 1); @@ -184,6 +188,7 @@ public void reopenSecondary(long wait_sec) throws IOException { @Override public void compact() throws IOException { + logger.debug("compact"); ensurePointer(); try (JnaResponse response = GraphLibrary.INSTANCE.compact(this.pointer)) { if (!response.success()) { @@ -192,7 +197,7 @@ public void compact() throws IOException { } } - private void ensurePointer() throws IOException { + private void ensurePointer() { if (this.pointer == null) { throw new ExternalStorageErrorException("JNA pointer is null"); } diff --git a/python/graphscope/gsctl/scripts/install_deps.sh b/python/graphscope/gsctl/scripts/install_deps.sh index ed737c0f2dd8..e04cf97b6dac 100755 --- a/python/graphscope/gsctl/scripts/install_deps.sh +++ b/python/graphscope/gsctl/scripts/install_deps.sh @@ -860,8 +860,8 @@ install_interactive_dependencies() { if ! command -v rustup &>/dev/null; then curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y source $HOME/.cargo/env - rustup install 1.71.0 - rustup default 1.71.0 + rustup install 1.76.0 + rustup default 1.76.0 rustc --version fi # opentelemetry