From 9f012b7f0d9dac19c9ee6357aee6bb81f097bca2 Mon Sep 17 00:00:00 2001 From: Siyuan Zhang Date: Thu, 21 Sep 2023 13:43:48 +0800 Subject: [PATCH] fix(interactive): Not requiring eid in update/delete edges in groot (#3240) - Add API for writing vertices and edges in one batch - Add eid to the Edge class in client - Update an not existing edge will create a new edge. - Update and Delete when eid is not given will try to search for it by src and dst vertex first. --- .../assembly/src/conf/groot/logback.xml | 13 +- .../assembly/groot/src/store/graph.rs | 8 +- .../executor/common/dyn_type/src/serde.rs | 2 +- .../executor/ir/core/src/plan/logical.rs | 5 +- .../executor/store/groot/src/api/mod.rs | 2 +- .../executor/store/groot/src/db/graph/iter.rs | 5 +- .../store/groot/src/db/graph/store.rs | 204 ++++++++++++++---- .../store/groot/src/db/graph/types/edge.rs | 12 +- .../graphscope/groot/sdk/GrootClient.java | 44 ++++ .../graphscope/groot/sdk/schema/Edge.java | 7 + .../groot/frontend/ClientWriteService.java | 2 +- .../groot/frontend/write/GraphWriter.java | 6 +- .../graphscope/groot/operation/EdgeId.java | 6 +- 13 files changed, 246 insertions(+), 70 deletions(-) diff --git a/interactive_engine/assembly/src/conf/groot/logback.xml b/interactive_engine/assembly/src/conf/groot/logback.xml index 0f9e071dd532..9b5b2c7eb66b 100644 --- a/interactive_engine/assembly/src/conf/groot/logback.xml +++ b/interactive_engine/assembly/src/conf/groot/logback.xml @@ -1,4 +1,5 @@ + @@ -6,16 +7,16 @@ ${log_dir}/${log_name}.%d{yyyy-MM-dd}.%i.log 100MB - 60 - 10GB + 10 + 1GB - [%d{ISO8601}][%p][%t][%c:%L] %m%n + [%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n - [%d{ISO8601}][%p][%t][%c:%L] %m%n + [%d{ISO8601}][%p][%t][${hostname}][%c:%L] %m%n @@ -23,8 +24,8 @@ ${log_dir}/metric.log.%d{yyyy-MM-dd}.%i.log 100MB - 60 - 10GB + 10 + 1GB %m%n diff --git a/interactive_engine/executor/assembly/groot/src/store/graph.rs b/interactive_engine/executor/assembly/groot/src/store/graph.rs index f9416fff28ef..cd6bc5d35241 100644 --- a/interactive_engine/executor/assembly/groot/src/store/graph.rs +++ b/interactive_engine/executor/assembly/groot/src/store/graph.rs @@ -368,7 +368,7 @@ fn delete_vertex(graph: &G, snapshot_id: i64, op: &Operati } fn overwrite_edge(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> { - trace!("overwrite_edge"); + debug!("overwrite_edge"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let edge_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; @@ -388,7 +388,7 @@ fn overwrite_edge(graph: &G, snapshot_id: i64, op: &Operat } fn update_edge(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> { - trace!("update_edge"); + debug!("update_edge"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let edge_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; @@ -405,7 +405,7 @@ fn update_edge(graph: &G, snapshot_id: i64, op: &Operation fn clear_edge_properties( graph: &G, snapshot_id: i64, op: &OperationPb, ) -> GraphResult<()> { - trace!("update_edge"); + debug!("clear_edge_properties"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let edge_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; @@ -420,7 +420,7 @@ fn clear_edge_properties( } fn delete_edge(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> { - trace!("delete_edge"); + debug!("delete_edge"); let data_operation_pb = parse_pb::(op.get_dataBytes())?; let edge_id_pb = parse_pb::(data_operation_pb.get_keyBlob())?; diff --git a/interactive_engine/executor/common/dyn_type/src/serde.rs b/interactive_engine/executor/common/dyn_type/src/serde.rs index 1d34005ab57c..c396246bccaf 100644 --- a/interactive_engine/executor/common/dyn_type/src/serde.rs +++ b/interactive_engine/executor/common/dyn_type/src/serde.rs @@ -17,10 +17,10 @@ use std::any::TypeId; use std::collections::BTreeMap; use std::io; +use chrono::{Datelike, Timelike}; use pegasus_common::codec::{Decode, Encode, ReadExt, WriteExt}; use crate::{de_dyn_obj, DateTimeFormats, Object, Primitives}; -use chrono::{Datelike, Timelike}; impl Encode for Primitives { fn write_to(&self, writer: &mut W) -> io::Result<()> { diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 1f6e140fdb11..433eaabdade6 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -20,13 +20,12 @@ use std::fmt; use std::iter::FromIterator; use std::rc::Rc; +use fraction::Fraction; use ir_common::error::ParsePbError; use ir_common::generated::algebra as pb; use ir_common::generated::algebra::pattern::binder::Item; use ir_common::generated::common as common_pb; use ir_common::{KeyId, LabelId, NameOrId}; - -use fraction::Fraction; use vec_map::VecMap; use crate::error::{IrError, IrResult}; @@ -1580,7 +1579,7 @@ impl AsLogical for pb::Unfold { let curr_node = plan_meta.get_curr_node(); let tag_id = get_or_set_tag_id(tag, plan_meta)?; // plan_meta.set_tag_nodes(tag_id, plan_meta.get_curr_referred_nodes().to_vec()); - + let tag_nodes = plan_meta.get_tag_nodes(tag_id).to_vec(); plan_meta.refer_to_nodes(curr_node, tag_nodes.clone()); } diff --git a/interactive_engine/executor/store/groot/src/api/mod.rs b/interactive_engine/executor/store/groot/src/api/mod.rs index 6ce4b159f9ef..c6cddf618133 100644 --- a/interactive_engine/executor/store/groot/src/api/mod.rs +++ b/interactive_engine/executor/store/groot/src/api/mod.rs @@ -14,7 +14,7 @@ //! limitations under the License. pub mod condition; -mod elem; +pub mod elem; mod filter; mod multi_version; pub mod prelude; diff --git a/interactive_engine/executor/store/groot/src/db/graph/iter.rs b/interactive_engine/executor/store/groot/src/db/graph/iter.rs index 29c885486335..8db5d9ef549c 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/iter.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/iter.rs @@ -182,12 +182,13 @@ impl IntoIterator for EdgeKindScan { { Ok(decoder) => { let edge_kind = self.edge_kind_info.get_type(); - Some(Ok(RocksEdgeImpl::new( + let edge = RocksEdgeImpl::new( edge_id.into(), edge_kind.into(), Some(decoder), raw_val, - ))) + ); + Some(Ok(edge)) } Err(e) => Some(Err(e.into())), } diff --git a/interactive_engine/executor/store/groot/src/db/graph/store.rs b/interactive_engine/executor/store/groot/src/db/graph/store.rs index dfeecfdecf0e..2a36a0335528 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/store.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/store.rs @@ -11,10 +11,12 @@ use super::bin::*; use super::codec::*; use super::meta::*; use super::types::*; +use crate::api::elem::Edge; use crate::api::Condition; use crate::api::ElemFilter; use crate::api::PropId; use crate::db::api::multi_version_graph::{GraphBackup, MultiVersionGraph}; +use crate::db::api::types::RocksEdge; use crate::db::api::GraphErrorCode::{InvalidData, TypeNotFound}; use crate::db::api::*; use crate::db::common::bytes::transform; @@ -72,7 +74,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_vertex"); + debug!("get_vertex {:?}, {:?}, {:?}", vertex_id, label_id, property_ids); if let Some(label_id) = label_id { self.get_vertex_from_label(snapshot_id, vertex_id, label_id, property_ids) } else { @@ -95,7 +97,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: Option<&EdgeKind>, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_edge"); + debug!("get_edge {:?}", edge_id); if let Some(relation) = edge_relation { self.get_edge_from_relation(snapshot_id, edge_id, relation, property_ids) } else { @@ -124,7 +126,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("scan_vertex"); + debug!("scan_vertex {:?}, {:?}", label_id, condition); let mut iter = match label_id { Some(label_id) => { match self @@ -178,7 +180,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("scan_edge"); + debug!("scan_edge {:?}", label_id); self.query_edges(snapshot_id, None, EdgeDirection::Both, label_id, condition, property_ids) } @@ -186,7 +188,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_out_edges"); + debug!("get_out_edges {:?}, {:?}", vertex_id, label_id); self.query_edges( snapshot_id, Some(vertex_id), @@ -201,14 +203,14 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_in_edges"); + debug!("get_in_edges {:?}, {:?}", vertex_id, label_id); self.query_edges(snapshot_id, Some(vertex_id), EdgeDirection::In, label_id, condition, property_ids) } fn get_out_degree( &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, ) -> GraphResult { - trace!("get_out_degree"); + debug!("get_out_degree {:?}, {:?}", vertex_id, label_id); let edges_iter = self.get_out_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?; Ok(edges_iter.count()) @@ -217,7 +219,7 @@ impl MultiVersionGraph for GraphStore { fn get_in_degree( &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: Option, ) -> GraphResult { - trace!("get_in_degree"); + debug!("get_in_degree {:?}, {:?}", vertex_id, label_id); let edges_iter = self.get_in_edges(snapshot_id, vertex_id, label_id, None, Some(vec![]).as_ref())?; Ok(edges_iter.count()) @@ -227,7 +229,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_kth_out_edge"); + debug!("get_kth_out_edge"); let mut edges_iter = self.get_out_edges( snapshot_id, vertex_id, @@ -242,7 +244,7 @@ impl MultiVersionGraph for GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, edge_relation: &EdgeKind, k: SerialId, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_kth_in_edge"); + debug!("get_kth_in_edge"); let mut edges_iter = self.get_in_edges( snapshot_id, vertex_id, @@ -256,7 +258,7 @@ impl MultiVersionGraph for GraphStore { fn create_vertex_type( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, table_id: i64, ) -> GraphResult { - trace!("create_vertex_type"); + debug!("create_vertex_type"); let _guard = res_unwrap!(self.lock.lock(), create_vertex_type)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -281,7 +283,7 @@ impl MultiVersionGraph for GraphStore { fn create_edge_type( &self, si: i64, schema_version: i64, label_id: LabelId, type_def: &TypeDef, ) -> GraphResult { - trace!("create_edge_type"); + debug!("create_edge_type"); let _guard = res_unwrap!(self.lock.lock(), create_edge_type)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -305,7 +307,7 @@ impl MultiVersionGraph for GraphStore { fn add_edge_kind( &self, si: i64, schema_version: i64, edge_kind: &EdgeKind, table_id: i64, ) -> GraphResult { - trace!("add_edge_kind"); + debug!("add_edge_kind"); let _guard = res_unwrap!(self.lock.lock(), add_edge_kind)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -331,7 +333,7 @@ impl MultiVersionGraph for GraphStore { } fn drop_vertex_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult { - trace!("drop_vertex_type"); + debug!("drop_vertex_type"); let _guard = res_unwrap!(self.lock.lock(), drop_vertex_type, si, label_id)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -345,7 +347,7 @@ impl MultiVersionGraph for GraphStore { } fn drop_edge_type(&self, si: i64, schema_version: i64, label_id: LabelId) -> GraphResult { - trace!("drop_edge_type"); + debug!("drop_edge_type"); let _guard = res_unwrap!(self.lock.lock(), drop_edge_type, si, label_id)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -359,7 +361,7 @@ impl MultiVersionGraph for GraphStore { } fn remove_edge_kind(&self, si: i64, schema_version: i64, edge_kind: &EdgeKind) -> GraphResult { - trace!("remove_edge_kind"); + debug!("remove_edge_kind"); let _guard = res_unwrap!(self.lock.lock(), remove_edge_kind, si, edge_kind)?; self.check_si_guard(si)?; if let Err(_) = self.meta.check_version(schema_version) { @@ -378,7 +380,7 @@ impl MultiVersionGraph for GraphStore { fn insert_overwrite_vertex( &self, si: SnapshotId, id: VertexId, label: LabelId, properties: &dyn PropertyMap, ) -> GraphResult<()> { - trace!("insert_overwrite_vertex"); + debug!("insert_overwrite_vertex"); self.check_si_guard(si)?; let res = self .vertex_manager @@ -391,7 +393,7 @@ impl MultiVersionGraph for GraphStore { fn insert_update_vertex( &self, si: i64, id: i64, label: LabelId, properties: &dyn PropertyMap, ) -> GraphResult<()> { - trace!("insert_update_vertex"); + debug!("insert_update_vertex"); self.check_si_guard(si)?; let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?; match res_unwrap!(self.get_vertex_data(si, id, &info), insert_update_vertex, si, id, label)? { @@ -417,7 +419,7 @@ impl MultiVersionGraph for GraphStore { fn clear_vertex_properties( &self, si: i64, id: i64, label: LabelId, prop_ids: &[PropertyId], ) -> GraphResult<()> { - trace!("clear_vertex_properties"); + debug!("clear_vertex_properties"); self.check_si_guard(si)?; let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?; if let Some(data) = self.get_vertex_data(si, id, &info)? { @@ -434,7 +436,7 @@ impl MultiVersionGraph for GraphStore { } fn delete_vertex(&self, si: i64, id: i64, label: LabelId) -> GraphResult<()> { - trace!("delete_vertex"); + debug!("delete_vertex"); self.check_si_guard(si)?; let info = res_unwrap!(self.vertex_manager.get_type(si, label), si, id, label)?; if let Some(table) = info.get_table(si) { @@ -450,7 +452,7 @@ impl MultiVersionGraph for GraphStore { fn insert_overwrite_edge( &self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap, ) -> GraphResult<()> { - trace!("insert_overwrite_edge"); + debug!("insert_overwrite_edge"); self.check_si_guard(si)?; let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; let res = self @@ -464,8 +466,11 @@ impl MultiVersionGraph for GraphStore { fn insert_update_edge( &self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, properties: &dyn PropertyMap, ) -> GraphResult<()> { - trace!("insert_update_edge"); + debug!("insert_update_edge, {:?}, {:?}, {}", id, edge_kind, forward); self.check_si_guard(si)?; + + // if edge id is not 0, it may be existed edge id, or next edge id to be created. + let info = res_unwrap!( self.edge_manager.get_edge_kind(si, edge_kind), insert_update_edge, @@ -474,7 +479,30 @@ impl MultiVersionGraph for GraphStore { edge_kind )?; let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; - let data_res = self.get_edge_data(si, id, &info, direction)?; + + let mut data_res = self.get_edge_data(si, id, &info, direction)?; + let mut complete_id = id; + match data_res { + Some(_) => { + debug!("Edge exists"); + } + None => { + // inner_id != 0 && no edge found using this edge id. it may be next edge id, or an useless edge id to be discarded. + let edge_id = + self.get_eid_by_vertex(si, edge_kind.edge_label_id, id.src_id, id.dst_id, forward); + match edge_id { + Some(edge_id) => { + complete_id = edge_id; + data_res = self.get_edge_data(si, edge_id, &info, direction)?; + } + None => { + // will create new edge + warn!("Edge doesn't exists, will create new edge"); + } + } + } + } + match data_res { Some(data) => { let version = get_codec_version(data); @@ -482,13 +510,13 @@ impl MultiVersionGraph for GraphStore { let mut old = decoder.decode_all(data); merge_updates(&mut old, properties); let res = self - .do_insert_edge_data(si, id, info, direction, &old) + .do_insert_edge_data(si, complete_id, info, direction, &old) .map(|_| self.update_si_guard(si)); res_unwrap!(res, insert_update_edge, si, id, edge_kind) } None => { let res = self - .do_insert_edge_data(si, id, info, direction, properties) + .do_insert_edge_data(si, complete_id, info, direction, properties) .map(|_| self.update_si_guard(si)); res_unwrap!(res, insert_update_edge, si, id, edge_kind) } @@ -498,9 +526,23 @@ impl MultiVersionGraph for GraphStore { fn clear_edge_properties( &self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, prop_ids: &[PropertyId], ) -> GraphResult<()> { - trace!("clear_edge_properties"); - self.check_si_guard(si)?; + debug!("clear_edge_properties"); self.check_si_guard(si)?; + + let mut complete_id = id; + if id.inner_id == 0 { + let edge_id = + self.get_eid_by_vertex(si, edge_kind.edge_label_id, id.src_id, id.dst_id, forward); + match edge_id { + Some(edge_id) => { + complete_id = edge_id; + } + None => { + warn!("Skipped clearing edge properties"); + } + } + } + let info = res_unwrap!( self.edge_manager.get_edge_kind(si, edge_kind), insert_update_edge, @@ -509,31 +551,36 @@ impl MultiVersionGraph for GraphStore { edge_kind )?; let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; - if let Some(data) = self.get_edge_data(si, id, &info, direction)? { + if let Some(data) = self.get_edge_data(si, complete_id, &info, direction)? { let version = get_codec_version(data); let decoder = info.get_decoder(si, version)?; let mut old = decoder.decode_all(data); clear_props(&mut old, prop_ids); let res = self - .do_insert_edge_data(si, id, info, direction, &old) + .do_insert_edge_data(si, complete_id, info, direction, &old) .map(|_| self.update_si_guard(si)); - return res_unwrap!(res, clear_edge_properties, si, id, edge_kind); + return res_unwrap!(res, clear_edge_properties, si, complete_id, edge_kind); } Ok(()) } fn delete_edge(&self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool) -> GraphResult<()> { - trace!("delete_edge"); + trace!("delete_edge {:?}, {:?}, {}", id, edge_kind, forward); self.check_si_guard(si)?; - let info = res_unwrap!(self.edge_manager.get_edge_kind(si, edge_kind), si, id, edge_kind)?; - let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; - if let Some(table) = info.get_table(si) { - let ts = si - table.start_si; - let key = edge_key(table.id, id, direction, ts); - res_unwrap!(self.storage.put(&key, &[]), delete_edge, si, id, edge_kind)?; + let mut complete_id = id; + if id.inner_id == 0 { + let edge_id = + self.get_eid_by_vertex(si, edge_kind.edge_label_id, id.src_id, id.dst_id, forward); + match edge_id { + Some(edge_id) => { + complete_id = edge_id; + } + None => { + warn!("Skipped delete edge"); + } + } } - self.update_si_guard(si); - Ok(()) + self.delete_edge_impl(si, complete_id, edge_kind, forward) } fn gc(&self, si: i64) -> GraphResult<()> { @@ -652,7 +699,7 @@ impl GraphStore { fn get_vertex_data( &self, si: SnapshotId, id: VertexId, info: &VertexTypeInfoRef, ) -> GraphResult> { - trace!("get_vertex_data"); + debug!("get_vertex_data"); if let Some(table) = info.get_table(si) { let key = vertex_key(table.id, id, si - table.start_si); let mut iter = self.storage.scan_from(&key)?; @@ -669,7 +716,7 @@ impl GraphStore { fn get_edge_data( &self, si: SnapshotId, id: EdgeId, info: &EdgeKindInfoRef, direction: EdgeDirection, ) -> GraphResult> { - trace!("get_edge_data"); + debug!("get_edge_data"); if let Some(table) = info.get_table(si) { let ts = si - table.start_si; let key = edge_key(table.id, id, direction, ts); @@ -687,7 +734,7 @@ impl GraphStore { fn do_insert_vertex_data( &self, si: SnapshotId, info: VertexTypeInfoRef, id: VertexId, properties: &dyn PropertyMap, ) -> GraphResult<()> { - trace!("do_insert_vertex_data"); + debug!("do_insert_vertex_data"); if let Some(table) = info.get_table(si) { let encoder = res_unwrap!(info.get_encoder(si), do_insert_vertex_data)?; let mut buf = Vec::new(); @@ -708,7 +755,7 @@ impl GraphStore { &self, si: SnapshotId, edge_id: EdgeId, info: EdgeKindInfoRef, direction: EdgeDirection, properties: &dyn PropertyMap, ) -> GraphResult<()> { - trace!("do_insert_edge_data"); + debug!("do_insert_edge_data {:?} {:?}", edge_id, direction); if let Some(table) = info.get_table(si) { let encoder = res_unwrap!(info.get_encoder(si), do_insert_edge_data)?; let mut buf = Vec::new(); @@ -754,7 +801,7 @@ impl GraphStore { &self, snapshot_id: SnapshotId, vertex_id: VertexId, label_id: LabelId, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_vertex_from_label"); + debug!("get_vertex_from_label {:?}, {:?}, {:?}", vertex_id, label_id, property_ids); let snapshot_id = snapshot_id as i64; let vertex_type_info = self .vertex_manager @@ -785,7 +832,7 @@ impl GraphStore { &self, snapshot_id: SnapshotId, edge_id: EdgeId, edge_relation: &EdgeKind, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("get_edge_from_relation"); + debug!("get_edge_from_relation, {:?}, {:?}", edge_id, edge_relation); let snapshot_id = snapshot_id as i64; let info = self .edge_manager @@ -816,7 +863,7 @@ impl GraphStore { &self, snapshot_id: SnapshotId, vertex_id: Option, direction: EdgeDirection, label_id: Option, condition: Option<&Condition>, property_ids: Option<&Vec>, ) -> GraphResult> { - trace!("query_edges"); + debug!("query_edges {:?}, {:?}, {:?} {:?}", vertex_id, label_id, property_ids, direction); let mut iter = match label_id { Some(label_id) => { match self @@ -889,6 +936,71 @@ impl GraphStore { .collect() }) } + + fn delete_edge_impl( + &self, si: i64, id: EdgeId, edge_kind: &EdgeKind, forward: bool, + ) -> GraphResult<()> { + trace!("delete_edge impl {:?}, {:?}, {}", id, edge_kind, forward); + self.check_si_guard(si)?; + let info = res_unwrap!(self.edge_manager.get_edge_kind(si, edge_kind), si, id, edge_kind)?; + let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; + if let Some(table) = info.get_table(si) { + let ts = si - table.start_si; + let key = edge_key(table.id, id, direction, ts); + res_unwrap!(self.storage.put(&key, &[]), delete_edge, si, id, edge_kind)?; + } + self.update_si_guard(si); + Ok(()) + } + + fn get_eid_by_vertex( + &self, si: i64, label_id: LabelId, src_id: VertexId, dst_id: VertexId, forward: bool, + ) -> Option { + let direction = if forward { EdgeDirection::Out } else { EdgeDirection::In }; + let edge: GraphResult; + if forward { + edge = self.get_edge_by_vertex(si, label_id, src_id, dst_id, direction); + } else { + edge = self.get_edge_by_vertex(si, label_id, dst_id, src_id, direction); + } + match edge { + Ok(edge) => Some(*RocksEdge::get_edge_id(&edge)), + Err(_) => None, + } + } + + fn get_edge_by_vertex( + &self, si: i64, label_id: LabelId, src_id: VertexId, dst_id: VertexId, direction: EdgeDirection, + ) -> GraphResult { + let iter = self.query_edges(si, Some(src_id), direction, Some(label_id), None, None); + debug!("get_edge_by_vertex {:?}, {}, {}, {:?}", label_id, src_id, dst_id, direction); + let target_id = if direction == EdgeDirection::Out { dst_id } else { src_id }; + match iter { + Ok(mut iter) => { + while let Some(edge) = iter.next() { + match edge { + Ok(edge) => { + if edge.get_dst_id() == target_id { + return Ok(edge); + } else { + debug!("This edge doesn't match, continue") + } + } + Err(err) => { + return Err(err); + } + } + } + let msg = format!( + "edge not found. labelId {}, srcId {}, dstId {}, direction {:?}", + label_id, src_id, dst_id, direction + ); + error!("{}", msg); + Err(gen_graph_err!(GraphErrorCode::DataNotExists, msg, get_edge_by_vertex)) + } + Err(err) => Err(err), + } + } } fn merge_updates<'a>(old: &mut HashMap>, updates: &'a dyn PropertyMap) { diff --git a/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs b/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs index 55bc837d327d..e9f16c0404db 100644 --- a/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs +++ b/interactive_engine/executor/store/groot/src/db/graph/types/edge.rs @@ -152,6 +152,7 @@ pub struct EdgeInfoIter { impl EdgeInfoIter { pub fn next(&mut self) -> Option { + debug!("EdgeInfoIter::next"); loop { let info = self.inner.next()?.as_ref(); if info.is_alive_at(self.si) { @@ -161,6 +162,7 @@ impl EdgeInfoIter { } pub fn next_info(&mut self) -> Option> { + debug!("EdgeInfoIter::next_info"); loop { let info = self.inner.next()?; if info.is_alive_at(self.si) { @@ -204,6 +206,7 @@ impl EdgeTypeManager { } pub fn get_edge(&self, si: SnapshotId, label: LabelId) -> GraphResult { + debug!("EdgeTypeManager::get_edge"); let guard = epoch::pin(); let inner = self.get_inner(&guard); let info = res_unwrap!(inner.get_edge(si, label), get_edge, si, label)?; @@ -212,6 +215,7 @@ impl EdgeTypeManager { } pub fn get_edge_info(&self, si: SnapshotId, label: LabelId) -> GraphResult> { + debug!("EdgeTypeManager::get_edge_info"); let guard = &epoch::pin(); let inner = self.get_inner(guard); let ret = res_unwrap!(inner.get_edge_info(si, label), get_edge, si, label)?; @@ -219,6 +223,7 @@ impl EdgeTypeManager { } pub fn get_all_edges(&self, si: SnapshotId) -> EdgeInfoIter { + debug!("EdgeTypeManager::get_all_edges"); let guard = epoch::pin(); let inner = self.get_inner(&guard); let iter = inner.get_all_edges(); @@ -348,6 +353,7 @@ impl EdgeManagerInner { } fn get_edge(&self, si: SnapshotId, label: LabelId) -> GraphResult<&EdgeInfo> { + debug!("EdgeManagerInner::get_edge {:?}", label); if let Some(info) = self.info_map.get(&label) { if info.lifetime.is_alive_at(si) { return Ok(info.as_ref()); @@ -362,16 +368,17 @@ impl EdgeManagerInner { } fn get_edge_info(&self, si: SnapshotId, label: LabelId) -> GraphResult> { + debug!("EdgeManagerInner::get_edge_info {:?}", label); if let Some(info) = self.info_map.get(&label) { if info.lifetime.is_alive_at(si) { return Ok(info.clone()); } let msg = format!("edge#{} is not alive at {}", label, si); - let err = gen_graph_err!(GraphErrorCode::TypeNotFound, msg, get_edge, si, label); + let err = gen_graph_err!(GraphErrorCode::TypeNotFound, msg, get_edge_info, si, label); return Err(err); } let msg = format!("edge#{} not found", label); - let err = gen_graph_err!(GraphErrorCode::TypeNotFound, msg, get_edge, si, label); + let err = gen_graph_err!(GraphErrorCode::TypeNotFound, msg, get_edge_info, si, label); Err(err) } @@ -391,6 +398,7 @@ impl EdgeManagerInner { } fn get_all_edges(&self) -> Values> { + debug!("EdgeManagerInner::get_all_edges"); self.info_map.values() } diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java index 6596878c1bdf..f785f4404977 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/GrootClient.java @@ -129,6 +129,50 @@ private void modifyEdge( submit(requests, callback); } + private long modifyVerticesAndEdge( + List vertices, List edges, WriteTypePb writeType) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + requests.addAll(getEdgeWriteRequestPbs(edges, writeType)); + return submit(requests); + } + + private void modifyVerticesAndEdge( + List vertices, + List edges, + StreamObserver callback, + WriteTypePb writeType) { + List requests = getVertexWriteRequestPbs(vertices, writeType); + requests.addAll(getEdgeWriteRequestPbs(edges, writeType)); + submit(requests, callback); + } + + public long addVerticesAndEdges(List vertices, List edges) { + return modifyVerticesAndEdge(vertices, edges, WriteTypePb.INSERT); + } + + public long updateVerticesAndEdges(List vertices, List edges) { + return modifyVerticesAndEdge(vertices, edges, WriteTypePb.UPDATE); + } + + public long deleteVerticesAndEdges(List vertices, List edges) { + return modifyVerticesAndEdge(vertices, edges, WriteTypePb.DELETE); + } + + public void addVerticesAndEdges( + List vertices, List edges, StreamObserver callback) { + modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.INSERT); + } + + public void updateVerticesAndEdges( + List vertices, List edges, StreamObserver callback) { + modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.UPDATE); + } + + public void deleteVerticesAndEdges( + List vertices, List edges, StreamObserver callback) { + modifyVerticesAndEdge(vertices, edges, callback, WriteTypePb.DELETE); + } + /** * Add vertex by realtime write * @param vertex vertex that contains label and pk properties and other properties diff --git a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java index 933d292c09b0..3115d210a156 100644 --- a/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java +++ b/interactive_engine/groot-client/src/main/java/com/alibaba/graphscope/groot/sdk/schema/Edge.java @@ -12,6 +12,8 @@ public class Edge { public Map dstPk; public Map properties; + public long eid; + /** * Construct an edge * @param label edge label @@ -59,6 +61,10 @@ public Edge(String label, Vertex src, Vertex dst) { this(label, src, dst, null); } + public void setEid(long eid) { + this.eid = eid; + } + public String getLabel() { return label; } @@ -88,6 +94,7 @@ public EdgeRecordKeyPb toEdgeRecordKey() { .setLabel(label) .setSrcVertexKey(toVertexRecordKey(srcLabel, srcPk)) .setDstVertexKey(toVertexRecordKey(dstLabel, dstPk)) + .setInnerId(eid) .build(); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java index 1c7bb6d98ef3..fa5c656c602f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/ClientWriteService.java @@ -42,7 +42,7 @@ public void batchWrite( String writeSession = request.getClientId(); int writeRequestsCount = request.getWriteRequestsCount(); List writeRequests = new ArrayList<>(writeRequestsCount); - logger.info( + logger.debug( "received batchWrite request. requestId [" + requestId + "] writeSession [" diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java index 7da92153507e..1fcee8edb33f 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/frontend/write/GraphWriter.java @@ -205,6 +205,11 @@ private void addDeleteEdgeOperation( private void addUpdateEdgeOperation( OperationBatch.Builder batchBuilder, GraphSchema schema, DataRecord dataRecord) { EdgeId edgeId = getEdgeId(schema, dataRecord, false); + if (edgeId.id == 0) { + // This is for update edge, if edgeInnerId is 0, generate new id, incase there isn't + // such a edge + edgeId.id = edgeIdGenerator.getNextId(); + } EdgeKind edgeKind = getEdgeKind(schema, dataRecord); GraphElement edgeDef = schema.getElement(edgeKind.getEdgeLabelId().getId()); @@ -356,7 +361,6 @@ private EdgeId getEdgeId(GraphSchema schema, DataRecord dataRecord, boolean over getPrimaryKeysHashId(dstVertexDef.getLabelId(), dstVertexPkVals, dstVertexDef); long edgeInnerId = overwrite ? edgeIdGenerator.getNextId() : edgeRecordKey.getEdgeInnerId(); - return new EdgeId( new VertexId(srcVertexHashId), new VertexId(dstVertexHashId), edgeInnerId); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/EdgeId.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/EdgeId.java index 9c6c1081de9d..2ab4b64136fe 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/EdgeId.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/operation/EdgeId.java @@ -17,9 +17,9 @@ public class EdgeId { - private VertexId srcId; - private VertexId dstId; - private long id; + public VertexId srcId; + public VertexId dstId; + public long id; public EdgeId(VertexId srcId, VertexId dstId, long id) { this.srcId = srcId;