From f9b257ecddf7d98d3f9862060fb402e9139db757 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Mon, 25 Mar 2024 19:22:31 +0800 Subject: [PATCH 01/17] [GIE Proto] support optional edge_expand and path_expand --- interactive_engine/executor/ir/proto/algebra.proto | 4 ++++ interactive_engine/executor/ir/proto/physical.proto | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/interactive_engine/executor/ir/proto/algebra.proto b/interactive_engine/executor/ir/proto/algebra.proto index f01e135ff94f..4a7dd61b7962 100644 --- a/interactive_engine/executor/ir/proto/algebra.proto +++ b/interactive_engine/executor/ir/proto/algebra.proto @@ -307,6 +307,8 @@ message EdgeExpand { // Expand option, i.e., expand vertices/edges/degree. ExpandOpt expand_opt = 5; MetaData meta_data = 6; + // Whether the expand is optional, if true, the expand will return a `None` if the edge does not exist + bool is_optional = 7; } message PathExpand { @@ -347,6 +349,8 @@ message PathExpand { ResultOpt result_opt = 6; // A condition formulated as an expression predicate common.Expression condition = 7; + // Whether the expand is optional, if true, the expand will return a `None` if the path does not exist + bool is_optional = 8; } /* diff --git a/interactive_engine/executor/ir/proto/physical.proto b/interactive_engine/executor/ir/proto/physical.proto index a67f14536337..4a3983c55941 100644 --- a/interactive_engine/executor/ir/proto/physical.proto +++ b/interactive_engine/executor/ir/proto/physical.proto @@ -205,6 +205,8 @@ message EdgeExpand { google.protobuf.Int32Value alias = 4; // Expand option, i.e., expand vertices/edges/degree. ExpandOpt expand_opt = 5; + // Whether the expand is optional, if true, the expand will return a `None` if the edge does not exist + bool is_optional = 6; } message PathExpand { @@ -245,6 +247,8 @@ message PathExpand { ResultOpt result_opt = 6; // A condition formulated as an expression predicate common.Expression condition = 7; + // Whether the path expand is optional, if true, the path expand will return a `None` if the path does not exist + bool is_optional = 8; } message Sink { From b364656583cc63a425aadc760806b8716a446501 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Mon, 25 Mar 2024 21:10:05 +0800 Subject: [PATCH 02/17] [GIE Runtime] support optional edge_expand --- .../executor/ir/common/src/utils.rs | 2 + .../executor/ir/core/src/plan/ffi.rs | 3 + .../process/operator/flatmap/edge_expand.rs | 139 ++++++++++++++++-- 3 files changed, 128 insertions(+), 16 deletions(-) diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index ea646339e706..7090f763acb4 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -903,6 +903,7 @@ impl From for physical_pb::EdgeExpand { params: edge.params, alias: edge.alias.map(|tag| tag.try_into().unwrap()), expand_opt: edge.expand_opt, + is_optional: edge.is_optional, } } } @@ -925,6 +926,7 @@ impl From for physical_pb::PathExpand { path_opt: path.path_opt, result_opt: path.result_opt, condition: path.condition, + is_optional: path.is_optional, } } } diff --git a/interactive_engine/executor/ir/core/src/plan/ffi.rs b/interactive_engine/executor/ir/core/src/plan/ffi.rs index 5ee412489262..f622220aa164 100644 --- a/interactive_engine/executor/ir/core/src/plan/ffi.rs +++ b/interactive_engine/executor/ir/core/src/plan/ffi.rs @@ -2175,6 +2175,7 @@ mod graph { alias: None, expand_opt: unsafe { std::mem::transmute::(expand_opt) }, meta_data: None, + is_optional: false, }); Box::into_raw(edgexpd) as *const c_void @@ -2339,6 +2340,7 @@ mod graph { path_opt: unsafe { std::mem::transmute::(path_opt) }, result_opt: unsafe { std::mem::transmute::(result_opt) }, condition: None, + is_optional: false, }); Box::into_raw(pathxpd) as *const c_void @@ -2362,6 +2364,7 @@ mod graph { path_opt: unsafe { std::mem::transmute::(path_opt) }, result_opt: unsafe { std::mem::transmute::(result_opt) }, condition: None, + is_optional: false, }); Box::into_raw(pathxpd) as *const c_void diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs index 2220e5b21cd3..e7116cbc5098 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs @@ -15,6 +15,7 @@ use std::convert::TryInto; +use dyn_type::Object; use graph_proxy::apis::{ get_graph, Direction, DynDetails, GraphElement, QueryParams, Statement, Vertex, ID, }; @@ -95,6 +96,82 @@ impl FlatMapFunction for EdgeExpandOperator< } } +pub struct OptionalEdgeExpandOperator { + start_v_tag: Option, + alias: Option, + stmt: Box>, + expand_opt: ExpandOpt, +} + +impl FlatMapFunction for OptionalEdgeExpandOperator { + type Target = DynIter; + + fn exec(&self, mut input: Record) -> FnResult { + if let Some(entry) = input.get(self.start_v_tag) { + match entry.get_type() { + EntryType::Vertex => { + let id = entry.id(); + let mut iter = self.stmt.exec(id)?.peekable(); + match self.expand_opt { + // the case of expand edge, and get end vertex; + ExpandOpt::Vertex => { + if iter.peek().is_none() { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + let neighbors_iter = iter.map(|e| { + if let Some(e) = e.as_edge() { + Vertex::new( + e.get_other_id(), + e.get_other_label().cloned(), + DynDetails::default(), + ) + } else { + unreachable!() + } + }); + Ok(Box::new(RecordExpandIter::new( + input, + self.alias.as_ref(), + Box::new(neighbors_iter), + ))) + } + } + // the case of expand neighbors, including edges/vertices + ExpandOpt::Edge => { + if iter.peek().is_none() { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + Ok(Box::new(RecordExpandIter::new( + input, + self.alias.as_ref(), + Box::new(iter), + ))) + } + } + // the case of get degree. TODO: this case should be a `Map` + ExpandOpt::Degree => { + let degree = iter.count(); + input.append(object!(degree), self.alias); + Ok(Box::new(vec![input].into_iter())) + } + } + } + EntryType::Path => Err(FnExecError::unsupported_error( + "Do not support Optional Edge Expand in Path entry", + ))?, + _ => Err(FnExecError::unexpected_data_error(&format!( + "Cannot Expand from current entry {:?}", + entry + )))?, + } + } else { + Ok(Box::new(vec![].into_iter())) + } + } +} + impl FlatMapFuncGen for pb::EdgeExpand { fn gen_flat_map( self, @@ -119,32 +196,62 @@ impl FlatMapFuncGen for pb::EdgeExpand { // Expand vertices with filters on edges. // This can be regarded as a combination of EdgeExpand (with is_edge = true) + GetV let stmt = graph.prepare_explore_edge(direction, &query_params)?; - let edge_expand_operator = EdgeExpandOperator { - start_v_tag, - alias: edge_or_end_v_tag, - stmt, - expand_opt: ExpandOpt::Vertex, - }; - Ok(Box::new(edge_expand_operator)) + if self.is_optional { + let edge_expand_operator = OptionalEdgeExpandOperator { + start_v_tag, + alias: edge_or_end_v_tag, + stmt, + expand_opt: ExpandOpt::Vertex, + }; + Ok(Box::new(edge_expand_operator)) + } else { + let edge_expand_operator = EdgeExpandOperator { + start_v_tag, + alias: edge_or_end_v_tag, + stmt, + expand_opt: ExpandOpt::Vertex, + }; + Ok(Box::new(edge_expand_operator)) + } } else { // Expand vertices without any filters let stmt = graph.prepare_explore_vertex(direction, &query_params)?; - let edge_expand_operator = EdgeExpandOperator { + if self.is_optional { + let edge_expand_operator = OptionalEdgeExpandOperator { + start_v_tag, + alias: edge_or_end_v_tag, + stmt, + expand_opt: ExpandOpt::Edge, + }; + Ok(Box::new(edge_expand_operator)) + } else { + let edge_expand_operator = EdgeExpandOperator { + start_v_tag, + alias: edge_or_end_v_tag, + stmt, + expand_opt: ExpandOpt::Edge, + }; + Ok(Box::new(edge_expand_operator)) + } + } + } + _ => { + // Expand edges or degree + let stmt = graph.prepare_explore_edge(direction, &query_params)?; + if self.is_optional { + let edge_expand_operator = OptionalEdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, - expand_opt: ExpandOpt::Edge, + expand_opt, }; Ok(Box::new(edge_expand_operator)) + } else { + let edge_expand_operator = + EdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, expand_opt }; + Ok(Box::new(edge_expand_operator)) } } - _ => { - // Expand edges or degree - let stmt = graph.prepare_explore_edge(direction, &query_params)?; - let edge_expand_operator = - EdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, expand_opt }; - Ok(Box::new(edge_expand_operator)) - } } } } From 4946a5200d7f796eef182333e0c1ba9ed70fb376 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 15:06:58 +0800 Subject: [PATCH 03/17] fix in ci tests --- .../executor/ir/core/src/plan/logical.rs | 10 ++++++++ .../executor/ir/core/src/plan/patmat.rs | 8 +++++-- .../executor/ir/core/src/plan/physical.rs | 24 +++++++++++++++++++ 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index 83835af9182c..46f87d408ba6 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -2467,6 +2467,7 @@ mod test { expand_opt: 0, alias: Some("here".into()), meta_data: None, + is_optional: false, }; plan.append_operator_as_node(expand.into(), vec![1]) .unwrap(); @@ -2545,6 +2546,7 @@ mod test { expand_opt: 1, alias: Some("e".into()), meta_data: None, + is_optional: false, }; plan.append_operator_as_node(expand.into(), vec![1]) .unwrap(); @@ -2691,6 +2693,7 @@ mod test { expand_opt: 1, alias: Some("b".into()), meta_data: None, + is_optional: false, }; opr_id = plan .append_operator_as_node(expand.into(), vec![opr_id as NodeId]) @@ -2873,6 +2876,7 @@ mod test { expand_opt: 0, alias: Some("a".into()), meta_data: None, + is_optional: false, }; plan.append_operator_as_node(expand.into(), vec![1]) .unwrap(); @@ -2886,6 +2890,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; plan.append_operator_as_node(expand.into(), vec![2]) .unwrap(); @@ -2950,6 +2955,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let oprid = plan .append_operator_as_node(expand.into(), vec![0]) @@ -3182,6 +3188,7 @@ mod test { expand_opt: 1, alias: None, meta_data: None, + is_optional: false, }; let subtask = plan .append_operator_as_node(expand.into(), vec![]) @@ -3319,6 +3326,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let filter = pb::Select { predicate: Some(str_to_expr_pb("@.age > 10".to_string()).unwrap()) }; @@ -3419,6 +3427,7 @@ mod test { expand_opt: 0, alias: Some("o".into()), meta_data: None, + is_optional: false, }; let root_id = plan @@ -3497,6 +3506,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let root_id = plan .append_operator_as_node(expand.into(), vec![]) diff --git a/interactive_engine/executor/ir/core/src/plan/patmat.rs b/interactive_engine/executor/ir/core/src/plan/patmat.rs index c3e16392dfc2..c59cf4f8b59e 100644 --- a/interactive_engine/executor/ir/core/src/plan/patmat.rs +++ b/interactive_engine/executor/ir/core/src/plan/patmat.rs @@ -1242,6 +1242,7 @@ mod test { alias: None, expand_opt: if is_edge { 1 } else { 0 }, meta_data: None, + is_optional: false, })), }], end: y.and_then(|s| s.try_into().ok()), @@ -1265,6 +1266,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, })), }], end: y.and_then(|s| s.try_into().ok()), @@ -1567,7 +1569,8 @@ mod test { params: Some(query_params()), expand_opt: 0, alias: None, - meta_data: None + meta_data: None, + is_optional: false, } .into() ); @@ -1750,7 +1753,8 @@ mod test { params: Some(query_params()), expand_opt: 0, alias: None, - meta_data: None + meta_data: None, + is_optional: false, } .into() ); diff --git a/interactive_engine/executor/ir/core/src/plan/physical.rs b/interactive_engine/executor/ir/core/src/plan/physical.rs index 8363740c9706..e9dd0f68d042 100644 --- a/interactive_engine/executor/ir/core/src/plan/physical.rs +++ b/interactive_engine/executor/ir/core/src/plan/physical.rs @@ -1151,6 +1151,7 @@ mod test { alias, expand_opt, meta_data: None, + is_optional: false, } } @@ -1653,6 +1654,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let limit_opr = pb::Limit { range: Some(pb::Range { lower: 10, upper: 11 }) }; @@ -1749,6 +1751,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let path_opr = pb::PathExpand { @@ -1759,6 +1762,7 @@ mod test { path_opt: 0, result_opt: 0, condition: None, + is_optional: false, }; let mut logical_plan = LogicalPlan::with_node(Node::new(0, source_opr.clone().into())); @@ -1813,6 +1817,7 @@ mod test { expand_opt: 1, // expand edge alias: None, meta_data: None, + is_optional: false, }; let getv = pb::GetV { @@ -1831,6 +1836,7 @@ mod test { path_opt: 0, result_opt: 0, condition: None, + is_optional: false, }; let fused_edge_expand = pb::EdgeExpand { @@ -1840,6 +1846,7 @@ mod test { expand_opt: 0, // expand vertex alias: None, meta_data: None, + is_optional: false, }; let fused_path_opr = pb::PathExpand { base: Some(fused_edge_expand.into()), @@ -1849,6 +1856,7 @@ mod test { path_opt: 0, result_opt: 0, condition: None, + is_optional: false, }; let mut logical_plan = LogicalPlan::with_node(Node::new(0, source_opr.clone().into())); @@ -1903,6 +1911,7 @@ mod test { expand_opt: 1, // expand edge alias: None, meta_data: None, + is_optional: false, }; let getv = pb::GetV { @@ -1929,6 +1938,7 @@ mod test { path_opt: 0, result_opt: 0, condition: None, + is_optional: false, }; let fused_edge_expand = pb::EdgeExpand { @@ -1938,6 +1948,7 @@ mod test { expand_opt: 0, // expand vertex alias: None, meta_data: None, + is_optional: false, }; let fused_getv_with_filter = pb::GetV { tag: None, @@ -1962,6 +1973,7 @@ mod test { path_opt: 0, result_opt: 0, condition: None, + is_optional: false, }; let mut logical_plan = LogicalPlan::with_node(Node::new(0, source_opr.clone().into())); @@ -2054,6 +2066,7 @@ mod test { expand_opt: 0, alias: Some(1.into()), meta_data: None, + is_optional: false, }; let root_id = plan @@ -2275,6 +2288,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let join_opr = pb::Join { left_keys: vec![], right_keys: vec![], kind: 0 }; let limit_opr = pb::Limit { range: Some(pb::Range { lower: 10, upper: 11 }) }; @@ -2337,6 +2351,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_b = pb::GetV { @@ -2355,6 +2370,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let mut expand_ac_opr_vertex = expand_ac_opr_edge.clone(); @@ -2368,6 +2384,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let mut expand_bc_opr_vertex = expand_bc_opr_edge.clone(); @@ -2422,6 +2439,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Vertex as i32, alias: Some(1.into()), meta_data: None, + is_optional: false, }; let mut expected_builder = PlanBuilder::default(); expected_builder.add_scan_source(source_opr); @@ -2455,6 +2473,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_b = pb::GetV { @@ -2473,6 +2492,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let mut expand_ac_opr_vertex = expand_ac_opr_edge.clone(); @@ -2486,6 +2506,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let mut expand_bc_opr_vertex = expand_bc_opr_edge.clone(); @@ -2544,6 +2565,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Vertex as i32, alias: Some(1.into()), meta_data: None, + is_optional: false, }; let mut expected_builder = PlanBuilder::default(); expected_builder.add_scan_source(source_opr); @@ -3087,6 +3109,7 @@ mod test { expand_opt: 0, // vertex alias: None, meta_data: None, + is_optional: false, }; let path_opr = pb::PathExpand { @@ -3097,6 +3120,7 @@ mod test { path_opt: 0, // ARBITRARY result_opt: 1, // ALL_V condition: None, + is_optional: false, }; let project_opr = pb::Project { From 140396c81d972518b8f642b34c8d1514dcb6b1e7 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 15:07:34 +0800 Subject: [PATCH 04/17] [GIE Runtime] support optional expand_intersect --- .../process/operator/map/expand_intersect.rs | 98 ++++++++++++++++++- 1 file changed, 93 insertions(+), 5 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs index b2dd3096398c..a242a79ae8e5 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs @@ -45,7 +45,7 @@ struct ExpandOrIntersect { /// An optimized entry implementation for intersection, which denotes a collection of vertices; /// Specifically, vertex_vec records the unique vertex ids in the collection, /// and count_vec records the number of the corresponding vertex, since duplicated vertices are allowed. -#[derive(Debug, Clone, Hash, PartialEq, PartialOrd)] +#[derive(Debug, Clone, Hash, PartialEq, PartialOrd, Default)] pub struct IntersectionEntry { vertex_vec: Vec, count_vec: Vec, @@ -71,6 +71,9 @@ impl IntersectionEntry { fn intersect>(&mut self, seeker: Iter) { let len = self.vertex_vec.len(); + if len == 0 { + return; + } let mut s = vec![0; len]; for vid in seeker { if let Ok(idx) = self @@ -204,6 +207,79 @@ impl FilterMapFunction for ExpandOrIntersect } } +/// An OptionalExpandOrIntersect operator to expand neighbor +struct OptionalExpandOrIntersect { + start_v_tag: Option, + edge_or_end_v_tag: KeyId, + stmt: Box>, +} + +impl FilterMapFunction for OptionalExpandOrIntersect { + fn exec(&self, mut input: Record) -> FnResult> { + let entry = input.get(self.start_v_tag).ok_or_else(|| { + FnExecError::get_tag_error(&format!( + "get start_v_tag {:?} from record in `ExpandOrIntersect` operator, the record is {:?}", + self.start_v_tag, input + )) + })?; + match entry.get_type() { + EntryType::Vertex => { + let id = entry.id(); + let mut iter = self + .stmt + .exec(id)? + .map(|e| { + if let Some(vertex) = e.as_vertex() { + vertex.id() as ID + } else if let Some(edge) = e.as_edge() { + edge.get_other_id() as ID + } else { + unreachable!() + } + }) + .peekable(); + if iter.peek().is_none() { + // if no neighbors found, append columns with an empty IntersectionEntry (without changing head) + let columns = input.get_columns_mut(); + columns.insert( + self.edge_or_end_v_tag as usize, + DynEntry::new(IntersectionEntry::default()), + ); + Ok(Some(input)) + } else if let Some(pre_entry) = input.get_mut(Some(self.edge_or_end_v_tag)) { + // the case of expansion and intersection + let pre_intersection = pre_entry + .as_any_mut() + .downcast_mut::() + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( + "entry is not a intersection in ExpandOrIntersect" + )) + })?; + pre_intersection.intersect(iter); + Ok(Some(input)) + } else { + // the case of expansion only + let neighbors_intersection = IntersectionEntry::from_iter(iter); + if neighbors_intersection.is_empty() { + Ok(None) + } else { + // append columns without changing head + let columns = input.get_columns_mut(); + columns + .insert(self.edge_or_end_v_tag as usize, DynEntry::new(neighbors_intersection)); + Ok(Some(input)) + } + } + } + _ => Err(FnExecError::unsupported_error(&format!( + "expand or intersect entry {:?} of tag {:?} failed in ExpandOrIntersect", + entry, self.edge_or_end_v_tag + )))?, + } + } +} + impl FilterMapFuncGen for pb::EdgeExpand { fn gen_filter_map(self) -> FnGenResult>> { let graph = graph_proxy::apis::get_graph().ok_or_else(|| FnGenError::NullGraphError)?; @@ -227,13 +303,25 @@ impl FilterMapFuncGen for pb::EdgeExpand { // Expand vertices with filters on edges. // This can be regarded as a combination of EdgeExpand (with expand_opt as Edge) + GetV let stmt = graph.prepare_explore_edge(direction, &query_params)?; - let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; - Ok(Box::new(edge_expand_operator)) + if self.is_optional { + let edge_expand_operator = + OptionalExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; + Ok(Box::new(edge_expand_operator)) + } else { + let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; + Ok(Box::new(edge_expand_operator)) + } } else { // Expand vertices without any filters let stmt = graph.prepare_explore_vertex(direction, &query_params)?; - let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; - Ok(Box::new(edge_expand_operator)) + if self.is_optional { + let edge_expand_operator = + OptionalExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; + Ok(Box::new(edge_expand_operator)) + } else { + let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; + Ok(Box::new(edge_expand_operator)) + } } } } From 31e7000d41d72c6a32eb7a5264a0c89cd599260e Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 15:08:07 +0800 Subject: [PATCH 05/17] [GIE Runtime] deal with Object::None in other oprs --- .../process/operator/flatmap/edge_expand.rs | 30 ++++++++++++++++++- .../src/process/operator/flatmap/unfold.rs | 19 ++++++++---- 2 files changed, 42 insertions(+), 7 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs index e7116cbc5098..4f4c347309e2 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs @@ -85,6 +85,20 @@ impl FlatMapFunction for EdgeExpandOperator< let curr_path = graph_path.clone(); Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter))) } + EntryType::Object => { + let obj = entry + .as_object() + .ok_or_else(|| FnExecError::Unreachable)?; + if Object::None.eq(obj) { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + Err(FnExecError::unexpected_data_error(&format!( + "Cannot Expand from current entry {:?}", + entry + )))? + } + } _ => Err(FnExecError::unexpected_data_error(&format!( "Cannot Expand from current entry {:?}", entry @@ -159,8 +173,22 @@ impl FlatMapFunction for OptionalEdgeExpandO } } EntryType::Path => Err(FnExecError::unsupported_error( - "Do not support Optional Edge Expand in Path entry", + "Have not supported Optional Edge Expand in Path entry yet", ))?, + EntryType::Object => { + let obj = entry + .as_object() + .ok_or_else(|| FnExecError::Unreachable)?; + if Object::None.eq(obj) { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + Err(FnExecError::unexpected_data_error(&format!( + "Cannot Expand from current entry {:?}", + entry + )))? + } + } _ => Err(FnExecError::unexpected_data_error(&format!( "Cannot Expand from current entry {:?}", entry diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs index b7e73c355fb0..1f48e8e39dea 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs @@ -13,6 +13,7 @@ //! See the License for the specific language governing permissions and //! limitations under the License. +use dyn_type::Object; use graph_proxy::apis::{DynDetails, Element, Vertex}; use ir_common::generated::physical as pb; use ir_common::KeyId; @@ -59,13 +60,19 @@ impl FlatMapFunction for UnfoldOperator { .ok_or_else(|| { FnExecError::unexpected_data_error("downcast intersection entry in UnfoldOperator") })?; - let mut res = Vec::with_capacity(intersection.len()); - for item in intersection.iter().cloned() { - let mut new_entry = input.clone(); - new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias); - res.push(new_entry); + let len = intersection.len(); + if len == 0 { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + let mut res = Vec::with_capacity(len); + for item in intersection.iter().cloned() { + let mut new_entry = input.clone(); + new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias); + res.push(new_entry); + } + Ok(Box::new(res.into_iter())) } - Ok(Box::new(res.into_iter())) } EntryType::Collection => { let entry = input.get(self.tag).unwrap(); From 32be362eaa216263fb5e8c4c950b17394cf95a43 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 15:21:24 +0800 Subject: [PATCH 06/17] minor: refine the impl of GetVertexOperator --- .../runtime/src/process/operator/map/get_v.rs | 45 +++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index 55a3613b5c85..4fb11c2bbd7e 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -57,8 +57,22 @@ impl GetVertexOperator { impl FilterMapFunction for GetVertexOperator { fn exec(&self, mut input: Record) -> FnResult> { - if let Some(entry) = input.get(self.start_tag) { - if let Some(e) = entry.as_edge() { + let entry_type = + if let Some(entry) = input.get(self.start_tag) { entry.get_type() } else { return Ok(None) }; + match entry_type { + EntryType::Edge => { + let e = input + .get(self.start_tag) + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!("get of Edge failed in {:?}", self)) + })? + .as_edge() + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( + "entry is not an edge in GetV {:?}", + self + )) + })?; let (id, label) = match self.opt { VOpt::Start => (e.src_id, e.get_src_label()), VOpt::End => (e.dst_id, e.get_dst_label()), @@ -72,7 +86,8 @@ impl FilterMapFunction for GetVertexOperator { } else { Ok(None) } - } else if let Some(graph_path) = entry.as_graph_path() { + } + EntryType::Path => { // we check VOpt here: // for `Other`, we treat it as to get_other_id() in the Edge within the Path (in which case is expanding the path with a adj vertex) // for `End`, we treat it as to get EndV() in the Path (in which case is getting the end vertex from the path) @@ -117,6 +132,18 @@ impl FilterMapFunction for GetVertexOperator { } } VOpt::End => { + let graph_path = input + .get(self.start_tag) + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( + "get_mut of GraphPath failed in {:?}", + self + )) + })? + .as_graph_path() + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!("entry is not a path in GetV")) + })?; let path_end_vertex = graph_path .get_path_end() .as_vertex() @@ -135,13 +162,13 @@ impl FilterMapFunction for GetVertexOperator { self.opt )))?, } - } else { - Err(FnExecError::unexpected_data_error( - "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry", - ))? } - } else { - Ok(None) + _ => Err(FnExecError::unexpected_data_error( + &format!( + "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", + entry_type + ) + ))?, } } } From 2b7f3c368930e74065e7c89542dd7b0f9b14aea7 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 15:25:16 +0800 Subject: [PATCH 07/17] [GIE Runtime] deal with Object::None in other oprs --- .../runtime/src/process/operator/map/get_v.rs | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index 4fb11c2bbd7e..aafb73b5dd7e 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -15,6 +15,7 @@ use std::convert::TryInto; +use dyn_type::Object; use graph_proxy::apis::GraphElement; use graph_proxy::apis::{get_graph, DynDetails, GraphPath, QueryParams, Vertex}; use graph_proxy::utils::expr::eval_pred::EvalPred; @@ -64,7 +65,7 @@ impl FilterMapFunction for GetVertexOperator { let e = input .get(self.start_tag) .ok_or_else(|| { - FnExecError::unexpected_data_error(&format!("get of Edge failed in {:?}", self)) + FnExecError::Unreachable })? .as_edge() .ok_or_else(|| { @@ -135,10 +136,7 @@ impl FilterMapFunction for GetVertexOperator { let graph_path = input .get(self.start_tag) .ok_or_else(|| { - FnExecError::unexpected_data_error(&format!( - "get_mut of GraphPath failed in {:?}", - self - )) + FnExecError::Unreachable })? .as_graph_path() .ok_or_else(|| { @@ -163,6 +161,26 @@ impl FilterMapFunction for GetVertexOperator { )))?, } } + EntryType::Object => { + let obj = input + .get(self.start_tag) + .ok_or_else(|| { + FnExecError::Unreachable + })? + .as_object() + .ok_or_else(|| FnExecError::Unreachable)?; + if Object::None.eq(obj) { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } else { + Err(FnExecError::unexpected_data_error( + &format!( + "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", + entry_type + ) + ))? + } + } _ => Err(FnExecError::unexpected_data_error( &format!( "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", @@ -266,6 +284,20 @@ impl FilterMapFunction for AuxiliaOperator { return Ok(None); } } + EntryType::Object => { + let obj = entry + .as_object() + .ok_or_else(|| FnExecError::Unreachable)?; + if Object::None.eq(obj) { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } else { + Err(FnExecError::unexpected_data_error(&format!( + "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}", + entry + )))? + } + } _ => Err(FnExecError::unexpected_data_error(&format!( "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}", entry From bc37733166b5188aad32b9dfc6625fdedcd546e2 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 15:47:23 +0800 Subject: [PATCH 08/17] fix in ci tests --- .../ir/core/tests/common/pattern_cases.rs | 16 ++++++++ .../ir/core/tests/test_multi_source.rs | 3 ++ .../ir/integrated/tests/apply_test.rs | 2 + .../ir/integrated/tests/auxilia_test.rs | 8 ++++ .../ir/integrated/tests/catalog_test.rs | 41 +++++++++++++++++++ .../ir/integrated/tests/expand_test.rs | 40 +++++++++++++++++- .../ir/integrated/tests/graph_query_test.rs | 2 + .../ir/integrated/tests/match_test.rs | 4 ++ .../ir/integrated/tests/nested_branch_test.rs | 24 ++++++++++- .../ir/integrated/tests/pathxd_test.rs | 12 ++++++ .../ir/integrated/tests/sample_test.rs | 1 + .../runtime/src/process/operator/map/get_v.rs | 12 ++---- 12 files changed, 152 insertions(+), 13 deletions(-) diff --git a/interactive_engine/executor/ir/core/tests/common/pattern_cases.rs b/interactive_engine/executor/ir/core/tests/common/pattern_cases.rs index f7a5c797e4b6..1bad1463d1cd 100644 --- a/interactive_engine/executor/ir/core/tests/common/pattern_cases.rs +++ b/interactive_engine/executor/ir/core/tests/common/pattern_cases.rs @@ -467,6 +467,7 @@ pub fn build_ldbc_pattern_from_pb_case1() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -517,6 +518,7 @@ pub fn build_ldbc_pattern_from_pb_case2() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = pb::EdgeExpand { v_tag: None, @@ -525,6 +527,7 @@ pub fn build_ldbc_pattern_from_pb_case2() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr3 = pb::EdgeExpand { v_tag: None, @@ -533,6 +536,7 @@ pub fn build_ldbc_pattern_from_pb_case2() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -579,6 +583,7 @@ pub fn build_ldbc_pattern_from_pb_case3() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -655,6 +660,7 @@ pub fn build_ldbc_pattern_from_pb_case4() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = pb::EdgeExpand { v_tag: None, @@ -663,6 +669,7 @@ pub fn build_ldbc_pattern_from_pb_case4() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr3 = pb::EdgeExpand { v_tag: None, @@ -671,6 +678,7 @@ pub fn build_ldbc_pattern_from_pb_case4() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr4 = pb::EdgeExpand { v_tag: None, @@ -679,6 +687,7 @@ pub fn build_ldbc_pattern_from_pb_case4() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -739,6 +748,7 @@ pub fn build_ldbc_pattern_from_pb_case5() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr1 = pb::EdgeExpand { v_tag: None, @@ -747,6 +757,7 @@ pub fn build_ldbc_pattern_from_pb_case5() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -806,6 +817,7 @@ pub fn build_ldbc_pattern_from_pb_case6() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr1 = pb::EdgeExpand { v_tag: None, @@ -814,6 +826,7 @@ pub fn build_ldbc_pattern_from_pb_case6() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = pb::EdgeExpand { v_tag: None, @@ -822,6 +835,7 @@ pub fn build_ldbc_pattern_from_pb_case6() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr3 = pb::EdgeExpand { v_tag: None, @@ -830,6 +844,7 @@ pub fn build_ldbc_pattern_from_pb_case6() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr4 = pb::EdgeExpand { v_tag: None, @@ -838,6 +853,7 @@ pub fn build_ldbc_pattern_from_pb_case6() -> IrPatternResult { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ diff --git a/interactive_engine/executor/ir/core/tests/test_multi_source.rs b/interactive_engine/executor/ir/core/tests/test_multi_source.rs index 877609378fae..912c9c5c637c 100644 --- a/interactive_engine/executor/ir/core/tests/test_multi_source.rs +++ b/interactive_engine/executor/ir/core/tests/test_multi_source.rs @@ -55,6 +55,7 @@ mod tests { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = expand_opr1.clone(); @@ -134,6 +135,7 @@ mod tests { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = expand_opr1.clone(); @@ -217,6 +219,7 @@ mod tests { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; // build pattern 1: as('a').out().as('b') diff --git a/interactive_engine/executor/ir/integrated/tests/apply_test.rs b/interactive_engine/executor/ir/integrated/tests/apply_test.rs index 679e6a7d4bff..5a854aae3388 100644 --- a/interactive_engine/executor/ir/integrated/tests/apply_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/apply_test.rs @@ -49,6 +49,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let mut job_builder = JobBuilder::default(); @@ -160,6 +161,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let fold_opr = pb::GroupBy { diff --git a/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs b/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs index 81bc7616b925..6bfff6bbefac 100644 --- a/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/auxilia_test.rs @@ -58,6 +58,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 0, alias: None, + is_optional: false, }; let auxilia_opr = pb::GetV { @@ -102,6 +103,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 0, alias: None, + is_optional: false, }; let auxilia_opr = pb::GetV { @@ -153,6 +155,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 0, alias: None, + is_optional: false, }; let auxilia_opr = pb::GetV { @@ -204,6 +207,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 0, alias: None, + is_optional: false, }; let auxilia_opr = pb::GetV { @@ -259,6 +263,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 0, alias: None, + is_optional: false, }; let auxilia_opr = pb::GetV { @@ -422,6 +427,7 @@ mod test { params: Some(query_param), expand_opt: 0, alias: None, + is_optional: false, }; let auxilia_opr = pb::GetV { @@ -476,6 +482,7 @@ mod test { params: None, expand_opt: 0, alias: Some(TAG_B.into()), + is_optional: false, }; let project_opr = pb::Project { @@ -519,6 +526,7 @@ mod test { params: Some(query_params(vec![], vec!["weight".into()], None)), expand_opt: 1, // edge alias: None, + is_optional: false, }; let auxilia_opr = pb::GetV { diff --git a/interactive_engine/executor/ir/integrated/tests/catalog_test.rs b/interactive_engine/executor/ir/integrated/tests/catalog_test.rs index 2a276bd138cf..42ba5e4f2b48 100644 --- a/interactive_engine/executor/ir/integrated/tests/catalog_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/catalog_test.rs @@ -68,6 +68,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v = pb::GetV { tag: None, @@ -104,6 +105,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_b = pb::GetV { tag: None, @@ -151,6 +153,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v = pb::GetV { tag: None, @@ -204,6 +207,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_b = pb::GetV { tag: None, @@ -268,6 +272,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_in = pb::EdgeExpand { v_tag: None, @@ -276,6 +281,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_start = pb::GetV { tag: None, @@ -327,6 +333,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_in = pb::EdgeExpand { v_tag: None, @@ -335,6 +342,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_a = pb::GetV { tag: None, @@ -386,6 +394,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v = pb::GetV { tag: None, @@ -439,6 +448,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_b = pb::GetV { tag: None, @@ -503,6 +513,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v = pb::GetV { tag: None, @@ -571,6 +582,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_a_c = pb::EdgeExpand { v_tag: None, @@ -579,6 +591,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_b_c = pb::EdgeExpand { v_tag: None, @@ -587,6 +600,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_b = pb::GetV { tag: None, @@ -694,6 +708,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v = pb::GetV { tag: None, @@ -713,6 +728,7 @@ mod test { path_opt: pb::path_expand::PathOpt::Simple as i32, result_opt: pb::path_expand::ResultOpt::EndV as i32, condition: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![pb::pattern::Sentence { @@ -742,6 +758,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v = pb::GetV { tag: None, @@ -761,6 +778,7 @@ mod test { path_opt: pb::path_expand::PathOpt::Simple as i32, result_opt: pb::path_expand::ResultOpt::EndV as i32, condition: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -798,6 +816,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v = pb::GetV { tag: None, @@ -828,6 +847,7 @@ mod test { path_opt: pb::path_expand::PathOpt::Simple as i32, result_opt: pb::path_expand::ResultOpt::EndV as i32, condition: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -865,6 +885,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_in = pb::EdgeExpand { v_tag: None, @@ -873,6 +894,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_end = pb::GetV { tag: None, @@ -936,6 +958,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_in = pb::EdgeExpand { v_tag: None, @@ -944,6 +967,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_end = pb::GetV { tag: None, @@ -1024,6 +1048,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_in = pb::EdgeExpand { v_tag: None, @@ -1032,6 +1057,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_end = pb::GetV { tag: None, @@ -1105,6 +1131,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let expand_opr_in = pb::EdgeExpand { v_tag: None, @@ -1113,6 +1140,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_lop = pb::GetV { tag: None, @@ -1210,6 +1238,7 @@ mod test { expand_opt: pb::edge_expand::ExpandOpt::Edge as i32, alias: None, meta_data: None, + is_optional: false, }; let get_v_software = pb::GetV { tag: None, @@ -1250,6 +1279,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let select_person = pb::Select { predicate: Some(str_to_expr_pb("@.~label == 1".to_string()).unwrap()) }; @@ -1311,6 +1341,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = pb::EdgeExpand { v_tag: None, @@ -1319,6 +1350,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr3 = pb::EdgeExpand { v_tag: None, @@ -1327,6 +1359,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let select_person = pb::Select { predicate: Some(str_to_expr_pb("@.~label == 1".to_string()).unwrap()) }; @@ -1379,6 +1412,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -1451,6 +1485,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = pb::EdgeExpand { v_tag: None, @@ -1459,6 +1494,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr3 = pb::EdgeExpand { v_tag: None, @@ -1467,6 +1503,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr4 = pb::EdgeExpand { v_tag: None, @@ -1475,6 +1512,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ @@ -1525,6 +1563,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr1 = pb::EdgeExpand { v_tag: None, @@ -1533,6 +1572,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let expand_opr2 = pb::EdgeExpand { v_tag: None, @@ -1541,6 +1581,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let pattern = pb::Pattern { sentences: vec![ diff --git a/interactive_engine/executor/ir/integrated/tests/expand_test.rs b/interactive_engine/executor/ir/integrated/tests/expand_test.rs index 7c71e9b03f1f..f85343caf8c8 100644 --- a/interactive_engine/executor/ir/integrated/tests/expand_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/expand_test.rs @@ -125,8 +125,14 @@ mod test { // g.V().out() #[test] fn expand_outv_test() { - let expand_opr_pb = - pb::EdgeExpand { v_tag: None, direction: 0, params: None, expand_opt: 0, alias: None }; + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: None, + expand_opt: 0, + alias: None, + is_optional: false, + }; let mut result = expand_test(expand_opr_pb); let mut result_ids = vec![]; let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); @@ -154,6 +160,7 @@ mod test { params: Some(query_param), expand_opt: 1, alias: None, + is_optional: false, }; let mut result = expand_test(expand_opr_pb); let mut result_edges = vec![]; @@ -180,6 +187,7 @@ mod test { params: Some(query_param), expand_opt: 1, alias: None, + is_optional: false, }; let mut result = expand_test(expand_opr_pb); let mut result_edges = vec![]; @@ -210,6 +218,7 @@ mod test { params: Some(query_param), expand_opt: 1, alias: None, + is_optional: false, }; let mut result = expand_test(expand_opr_pb); let mut result_ids_with_prop = vec![]; @@ -241,6 +250,7 @@ mod test { params: Some(query_param), expand_opt: 0, alias: None, + is_optional: false, }; let mut result = expand_test(expand_opr_pb); let mut cnt = 0; @@ -261,6 +271,7 @@ mod test { params: Some(query_param), expand_opt: 0, alias: Some(TAG_B.into()), + is_optional: false, }; let mut result = expand_test_with_source_tag(TAG_A.into(), expand_opr_pb); let mut result_ids = vec![]; @@ -294,6 +305,7 @@ mod test { params: Some(query_param), expand_opt: 0, alias: None, + is_optional: false, }; let conf = JobConf::new("expand_test"); @@ -335,6 +347,7 @@ mod test { params: Some(edge_query_param), expand_opt: 0, alias: None, + is_optional: false, }; let vertex_query_param = query_params(vec![], vec![], str_to_expr_pb("@.id == 2".to_string()).ok()); let auxilia_opr_pb = pb::GetV { tag: None, opt: 4, params: Some(vertex_query_param), alias: None }; @@ -376,6 +389,7 @@ mod test { params: Some(query_param), expand_opt: 0, alias: None, + is_optional: false, }; let mut result = expand_test(expand_opr_pb); let mut result_ids = vec![]; @@ -398,6 +412,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 1, alias: None, + is_optional: false, }; let getv_opr = pb::GetV { @@ -442,6 +457,7 @@ mod test { params: Some(query_params(vec![CREATED_LABEL.into()], vec![], None)), expand_opt: 1, alias: None, + is_optional: false, }; let getv_opr = pb::GetV { @@ -486,6 +502,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 1, alias: None, + is_optional: false, }; let getv_opr = pb::GetV { @@ -530,6 +547,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), expand_opt: 1, alias: None, + is_optional: false, }; let getv_opr = pb::GetV { @@ -574,6 +592,7 @@ mod test { params: None, expand_opt: 2, alias: Some(1.into()), + is_optional: false, }; let mut pegasus_result = expand_degree_opt_test(expand_opr_pb); let mut results = vec![]; @@ -606,6 +625,7 @@ mod test { params: None, expand_opt: 2, alias: Some(1.into()), + is_optional: false, }; let mut pegasus_result = expand_degree_opt_test(expand_opr_pb); let mut results = vec![]; @@ -638,6 +658,7 @@ mod test { params: None, expand_opt: 2, alias: Some(1.into()), + is_optional: false, }; let mut pegasus_result = expand_degree_opt_test(expand_opr_pb); let mut results = vec![]; @@ -672,6 +693,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_B.into()), + is_optional: false, }; // marko (A) -> josh (C): expand C; @@ -681,6 +703,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; let conf = JobConf::new("expand_and_intersection_expand_test"); @@ -742,6 +765,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_B.into()), + is_optional: false, }; // marko (A) -> josh (C): expand C; @@ -751,6 +775,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; // lop (B) <- josh (C): expand C and intersect on C; @@ -760,6 +785,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; let conf = JobConf::new("expand_and_intersection_intersect_test"); @@ -820,6 +846,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_B.into()), + is_optional: false, }; // marko (A) -> josh (C): expand C; @@ -829,6 +856,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; // lop (B) <- josh (C): expand C and intersect on C; @@ -838,6 +866,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; // unfold tag C @@ -893,6 +922,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_B.into()), + is_optional: false, }; // A <-> C: expand C; @@ -902,6 +932,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; // B <-> C: expand C and intersect on C; @@ -911,6 +942,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; // unfold tag C @@ -963,6 +995,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_B.into()), + is_optional: false, }; // A <-> C: expand C; @@ -976,6 +1009,7 @@ mod test { )), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; // B <-> C: expand C and intersect on C; @@ -985,6 +1019,7 @@ mod test { params: Some(query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None)), expand_opt: 0, alias: Some(TAG_C.into()), + is_optional: false, }; // unfold tag C @@ -1041,6 +1076,7 @@ mod test { params: Some(query_params(vec![], vec![], None)), expand_opt: 1, alias: None, + is_optional: false, }; let getv_opr = pb::GetV { diff --git a/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs b/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs index 44291ad5eb22..bd8b84e3f567 100644 --- a/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/graph_query_test.rs @@ -48,6 +48,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let mut job_builder = JobBuilder::default(); @@ -139,6 +140,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let project_opr = pb::Project { diff --git a/interactive_engine/executor/ir/integrated/tests/match_test.rs b/interactive_engine/executor/ir/integrated/tests/match_test.rs index dbe19c4e124c..5638fca06f77 100644 --- a/interactive_engine/executor/ir/integrated/tests/match_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/match_test.rs @@ -71,6 +71,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, } } @@ -82,6 +83,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, } } @@ -93,6 +95,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, } } @@ -108,6 +111,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, } } diff --git a/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs b/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs index a6f02ef44a92..472a1a4c2a22 100644 --- a/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/nested_branch_test.rs @@ -68,11 +68,27 @@ mod test { } fn get_out_edge(alias: Option) -> pb::EdgeExpand { - pb::EdgeExpand { v_tag: None, direction: 0, params: None, expand_opt: 0, alias, meta_data: None } + pb::EdgeExpand { + v_tag: None, + direction: 0, + params: None, + expand_opt: 0, + alias, + meta_data: None, + is_optional: false, + } } fn get_in_edge(alias: Option) -> pb::EdgeExpand { - pb::EdgeExpand { v_tag: None, direction: 1, params: None, expand_opt: 0, alias, meta_data: None } + pb::EdgeExpand { + v_tag: None, + direction: 1, + params: None, + expand_opt: 0, + alias, + meta_data: None, + is_optional: false, + } } fn get_out_knows(alias: Option) -> pb::EdgeExpand { @@ -83,6 +99,7 @@ mod test { expand_opt: 0, alias, meta_data: None, + is_optional: false, } } @@ -94,6 +111,7 @@ mod test { expand_opt: 0, alias, meta_data: None, + is_optional: false, } } @@ -105,6 +123,7 @@ mod test { expand_opt: 0, alias, meta_data: None, + is_optional: false, } } @@ -116,6 +135,7 @@ mod test { expand_opt: 0, alias, meta_data: None, + is_optional: false, } } diff --git a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs index 773ddabcbefd..94f79d088c3f 100644 --- a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs @@ -48,6 +48,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let path_expand_opr = pb::PathExpand { @@ -58,6 +59,7 @@ mod test { path_opt, result_opt, condition: None, + is_optional: false, }; let mut job_builder = JobBuilder::default(); @@ -98,6 +100,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let path_expand_opr = pb::PathExpand { @@ -108,6 +111,7 @@ mod test { path_opt: 0, result_opt: 1, condition: str_to_expr_pb("@.name == \"marko\"".to_string()).ok(), + is_optional: false, }; let mut job_builder = JobBuilder::default(); @@ -138,6 +142,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let getv = pb::GetV { @@ -156,6 +161,7 @@ mod test { path_opt: 0, result_opt: if is_whole_path { 1 } else { 0 }, condition: None, + is_optional: false, }; let mut job_builder = JobBuilder::default(); @@ -896,6 +902,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let path_expand_opr = pb::PathExpand { @@ -906,6 +913,7 @@ mod test { path_opt: 0, // Arbitrary result_opt, condition: None, + is_optional: false, }; let project_opr = pb::Project { @@ -978,6 +986,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let path_expand_opr = pb::PathExpand { @@ -988,6 +997,7 @@ mod test { path_opt: 0, // Arbitrary result_opt, condition: None, + is_optional: false, }; let unfold_opr = pb::Unfold { tag: None, alias: None, meta_data: None }; @@ -1098,6 +1108,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let path_expand_opr = pb::PathExpand { @@ -1108,6 +1119,7 @@ mod test { path_opt: 1, // Simple result_opt: 2, // AllVE condition: None, + is_optional: false, }; let path_end = pb::GetV { diff --git a/interactive_engine/executor/ir/integrated/tests/sample_test.rs b/interactive_engine/executor/ir/integrated/tests/sample_test.rs index b47f2876cd8a..e18d377691ee 100644 --- a/interactive_engine/executor/ir/integrated/tests/sample_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/sample_test.rs @@ -86,6 +86,7 @@ mod test { expand_opt: 0, alias: None, meta_data: None, + is_optional: false, }; let mut job_builder = JobBuilder::default(); diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index aafb73b5dd7e..285897f80c84 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -64,9 +64,7 @@ impl FilterMapFunction for GetVertexOperator { EntryType::Edge => { let e = input .get(self.start_tag) - .ok_or_else(|| { - FnExecError::Unreachable - })? + .ok_or_else(|| FnExecError::Unreachable)? .as_edge() .ok_or_else(|| { FnExecError::unexpected_data_error(&format!( @@ -135,9 +133,7 @@ impl FilterMapFunction for GetVertexOperator { VOpt::End => { let graph_path = input .get(self.start_tag) - .ok_or_else(|| { - FnExecError::Unreachable - })? + .ok_or_else(|| FnExecError::Unreachable )? .as_graph_path() .ok_or_else(|| { FnExecError::unexpected_data_error(&format!("entry is not a path in GetV")) @@ -164,9 +160,7 @@ impl FilterMapFunction for GetVertexOperator { EntryType::Object => { let obj = input .get(self.start_tag) - .ok_or_else(|| { - FnExecError::Unreachable - })? + .ok_or_else(|| FnExecError::Unreachable)? .as_object() .ok_or_else(|| FnExecError::Unreachable)?; if Object::None.eq(obj) { From cb73f398191e34f369467af230298a7ce96065a3 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 19:58:07 +0800 Subject: [PATCH 09/17] [CI Tests] add ci tests for optional edge_expand --- .../integrated/tests/optional_expand_test.rs | 300 ++++++++++++++++++ .../runtime/src/process/operator/map/get_v.rs | 16 +- 2 files changed, 314 insertions(+), 2 deletions(-) create mode 100644 interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs diff --git a/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs new file mode 100644 index 000000000000..ac4a45324a51 --- /dev/null +++ b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs @@ -0,0 +1,300 @@ +// +//! Copyright 2021 Alibaba Group Holding Limited. +//! +//! Licensed under the Apache License, Version 2.0 (the "License"); +//! you may not use this file except in compliance with the License. +//! You may obtain a copy of the License at +//! +//! http://www.apache.org/licenses/LICENSE-2.0 +//! +//! Unless required by applicable law or agreed to in writing, software +//! distributed under the License is distributed on an "AS IS" BASIS, +//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//! See the License for the specific language governing permissions and +//! limitations under the License. +//! +//! + +mod common; + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use dyn_type::Object; + use graph_proxy::apis::{register_graph, GraphElement}; + use graph_proxy::create_exp_store; + use graph_store::ldbc::LDBCVertexParser; + use graph_store::prelude::DefaultId; + use ir_common::expr_parse::str_to_expr_pb; + use ir_common::generated::physical as pb; + use ir_common::KeyId; + use pegasus::api::{Map, Sink}; + use pegasus::result::ResultStream; + use pegasus::JobConf; + use runtime::process::entry::Entry; + use runtime::process::operator::flatmap::FlatMapFuncGen; + use runtime::process::operator::map::FilterMapFuncGen; + use runtime::process::operator::source::SourceOperator; + use runtime::process::record::Record; + + use crate::common::test::*; + + // g.V() + fn source_gen(alias: Option) -> Box + Send> { + source_gen_with_scan_opr(pb::Scan { + scan_opt: 0, + alias, + params: None, + idx_predicate: None, + is_count_only: false, + }) + } + + fn source_gen_with_scan_opr(scan_opr_pb: pb::Scan) -> Box + Send> { + let graph = create_exp_store(Arc::new(TestCluster {})); + register_graph(graph); + let source = SourceOperator::new(scan_opr_pb.into(), Arc::new(TestRouter::default())).unwrap(); + source.gen_source(0).unwrap() + } + + fn expand_test(expand: pb::EdgeExpand) -> ResultStream { + let conf = JobConf::new("expand_test"); + let result = pegasus::run(conf, || { + let expand = expand.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(None))?; + let flatmap_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flatmap_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + result + } + + // g.V().out() with optional out + #[test] + fn optional_expand_outv_test() { + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: None, + expand_opt: 0, + alias: None, + is_optional: true, + }; + let mut result = expand_test(expand_opr_pb); + let mut result_ids = vec![]; + let mut none_cnt = 0; + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let mut expected_ids = vec![v2, v3, v3, v3, v4, v5]; + while let Some(Ok(record)) = result.next() { + if let Some(element) = record.get(None).unwrap().as_vertex() { + result_ids.push(element.id() as usize) + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } + } + result_ids.sort(); + expected_ids.sort(); + assert_eq!(result_ids, expected_ids); + // v2, v3, v5 does not have out edges + assert_eq!(none_cnt, 3); + } + + // g.V().outE('knows', 'created') with optional out + #[test] + fn optional_expand_oute_with_many_labels_test() { + let query_param = query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(query_param), + expand_opt: 1, + alias: None, + is_optional: true, + }; + let mut result = expand_test(expand_opr_pb); + let mut result_edges = vec![]; + let mut none_cnt = 0; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let mut expected_edges = vec![(v1, v2), (v1, v3), (v1, v4), (v4, v3), (v4, v5), (v6, v3)]; + expected_edges.sort(); + while let Some(Ok(record)) = result.next() { + if let Some(e) = record.get(None).unwrap().as_edge() { + result_edges.push((e.src_id as usize, e.dst_id as usize)); + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } + } + result_edges.sort(); + assert_eq!(result_edges, expected_edges); + assert_eq!(none_cnt, 3); + } + + // g.V().out('knows').has('id',2) with optional out + // in this case, for the vertices, e.g., v2, v3, v4, v5, v6, that do not have out knows edges, they would be filtered out. + #[test] + fn optional_expand_outv_filter_test() { + let edge_query_param = query_params(vec![KNOWS_LABEL.into()], vec![], None); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(edge_query_param), + expand_opt: 0, + alias: None, + is_optional: true, + }; + let vertex_query_param = query_params(vec![], vec![], str_to_expr_pb("@.id == 2".to_string()).ok()); + let auxilia_opr_pb = pb::GetV { tag: None, opt: 4, params: Some(vertex_query_param), alias: None }; + + let conf = JobConf::new("expand_getv_test"); + let mut result = pegasus::run(conf, || { + let expand = expand_opr_pb.clone(); + let auxilia = auxilia_opr_pb.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(None))?; + let flatmap_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flatmap_func.exec(input))?; + let filter_map_func = auxilia.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + + let mut result_ids = vec![]; + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let expected_ids = vec![v2]; + while let Some(Ok(record)) = result.next() { + println!("record: {:?}", record); + let element = record.get(None).unwrap().as_vertex().unwrap(); + result_ids.push(element.id() as usize) + } + assert_eq!(result_ids, expected_ids) + } + + // g.V().outE('knows').inV() with optional outE + // in this case, for the vertices, e.g., v2, v3, v4, v5, v6, that do not have out knows edges, the result of inV() would also be regarded as None. + #[test] + fn optional_expand_oute_inv_test() { + let expand_opr = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 1, + alias: None, + is_optional: true, + }; + + let getv_opr = pb::GetV { + tag: None, + opt: 1, // EndV + params: Some(query_params(vec![], vec![], None)), + alias: None, + }; + + let conf = JobConf::new("expand_oute_inv_test"); + let mut result = pegasus::run(conf, || { + let expand = expand_opr.clone(); + let getv = getv_opr.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(None))?; + let flatmap_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flatmap_func.exec(input))?; + let filter_map_func = getv.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + + let expected_ids = vec![2, 4]; + let mut result_ids = vec![]; + let mut none_cnt = 0; + while let Some(Ok(record)) = result.next() { + if let Some(element) = record.get(None).unwrap().as_vertex() { + result_ids.push(element.id() as usize); + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } else { + unreachable!() + } + } + result_ids.sort(); + assert_eq!(result_ids, expected_ids); + assert_eq!(none_cnt, 5); + } + + // g.V().as(0).select(0).by(out().count().as(1)) with optional out + // in this case, the vertices that do not have out edges would also be taken into account in the count() + #[test] + fn optional_expand_out_degree_test() { + let conf = JobConf::new("expand_degree_fused_test"); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: None, + expand_opt: 2, + alias: Some(1.into()), + is_optional: true, + }; + let getv = pb::GetV { tag: None, opt: 4, params: None, alias: Some(TAG_A) }; + let project = pb::Project { + mappings: vec![pb::project::ExprAlias { + expr: str_to_expr_pb("@0".to_string()).ok(), + alias: None, + }], + is_append: true, + }; + + let mut pegasus_result = pegasus::run(conf, || { + let getv = getv.clone(); + let expand = expand_opr_pb.clone(); + let project = project.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(None))?; + let filter_map_func = getv.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + let flat_map_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flat_map_func.exec(input))?; + let filter_map_func = project.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + + let mut results = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let mut expected_results = vec![(v1, 3), (v2, 0), (v3, 0), (v4, 2), (v5, 0), (v6, 1)]; + while let Some(Ok(record)) = pegasus_result.next() { + if let Some(v) = record.get(None).unwrap().as_vertex() { + if let Some(degree_obj) = record.get(Some(1)).unwrap().as_object() { + results.push((v.id() as DefaultId, degree_obj.as_u64().unwrap())); + } + } + } + results.sort(); + expected_results.sort(); + + assert_eq!(results, expected_results) + } +} diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index 285897f80c84..b5d3e16ad204 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -164,8 +164,12 @@ impl FilterMapFunction for GetVertexOperator { .as_object() .ok_or_else(|| FnExecError::Unreachable)?; if Object::None.eq(obj) { + if self.query_labels.is_empty() { input.append(Object::None, self.alias); return Ok(Some(input)); + } else { + return Ok(None); + } } else { Err(FnExecError::unexpected_data_error( &format!( @@ -283,8 +287,16 @@ impl FilterMapFunction for AuxiliaOperator { .as_object() .ok_or_else(|| FnExecError::Unreachable)?; if Object::None.eq(obj) { - input.append(Object::None, self.alias); - return Ok(Some(input)); + if let Some(_predicate) = &self.query_params.filter { + // TODO: eval by predicate instead of directly regarding it as false + // let res = predicate + // .eval_bool(Some(&input)) + // .map_err(|e| FnExecError::from(e))?; + return Ok(None); + } else { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } } else { Err(FnExecError::unexpected_data_error(&format!( "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}", From c3f5f1cdd2fe00d3933bbeccc856d07ba532c770 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 26 Mar 2024 21:12:05 +0800 Subject: [PATCH 10/17] [GIE Runtime] refactor OptionalExpandOrIntersect impl, to support when all expanded edges are optional in intersection --- .../process/operator/map/expand_intersect.rs | 68 +++++++++---------- 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs index a242a79ae8e5..a94273c99dd0 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs @@ -208,6 +208,8 @@ impl FilterMapFunction for ExpandOrIntersect } /// An OptionalExpandOrIntersect operator to expand neighbor +/// e.g., based on a->b, we intersect optional edges of a-(opt)->c and b-(opt)->c, +/// then the results could be either matches of a->b->c + a->c if there exits matches of c, or just a->b, where the match for c is a Object::None. struct OptionalExpandOrIntersect { start_v_tag: Option, edge_or_end_v_tag: KeyId, @@ -225,29 +227,22 @@ impl FilterMapFunction for OptionalExpandOrI match entry.get_type() { EntryType::Vertex => { let id = entry.id(); - let mut iter = self - .stmt - .exec(id)? - .map(|e| { - if let Some(vertex) = e.as_vertex() { - vertex.id() as ID - } else if let Some(edge) = e.as_edge() { - edge.get_other_id() as ID - } else { - unreachable!() - } - }) - .peekable(); - if iter.peek().is_none() { - // if no neighbors found, append columns with an empty IntersectionEntry (without changing head) - let columns = input.get_columns_mut(); - columns.insert( - self.edge_or_end_v_tag as usize, - DynEntry::new(IntersectionEntry::default()), - ); - Ok(Some(input)) - } else if let Some(pre_entry) = input.get_mut(Some(self.edge_or_end_v_tag)) { + let iter = self.stmt.exec(id)?.map(|e| { + if let Some(vertex) = e.as_vertex() { + vertex.id() as ID + } else if let Some(edge) = e.as_edge() { + edge.get_other_id() as ID + } else { + unreachable!() + } + }); + if let Some(pre_entry) = input.get_mut(Some(self.edge_or_end_v_tag)) { // the case of expansion and intersection + // The behavior of intersection is to intersect with the previous intersection: + // 1. if the previous intersection is empty, return an record with the intersected vertex as an empty IntersectionEntry + // 2. if the previous intersection is not empty, intersect with the current neighbor_iter + // 2.1 if the intersected result is empty, return an record with the intersected vertex as an empty IntersectionEntry + // 2.2 if the intersected result is not empty, return an record with the intersected vertex let pre_intersection = pre_entry .as_any_mut() .downcast_mut::() @@ -256,20 +251,25 @@ impl FilterMapFunction for OptionalExpandOrI "entry is not a intersection in ExpandOrIntersect" )) })?; - pre_intersection.intersect(iter); - Ok(Some(input)) - } else { - // the case of expansion only - let neighbors_intersection = IntersectionEntry::from_iter(iter); - if neighbors_intersection.is_empty() { - Ok(None) - } else { - // append columns without changing head - let columns = input.get_columns_mut(); - columns - .insert(self.edge_or_end_v_tag as usize, DynEntry::new(neighbors_intersection)); + if pre_intersection.is_empty() { + // do nothing as the previous intersection is already an empty IntersectionEntry, denoting None Ok(Some(input)) + } else { + let mut pre_intersection_clone = pre_intersection.clone(); + pre_intersection_clone.intersect(iter); + if pre_intersection_clone.is_empty() { + Ok(Some(input)) + } else { + *pre_intersection = pre_intersection_clone; + Ok(Some(input)) + } } + } else { + let neighbors_intersection = IntersectionEntry::from_iter(iter); + // append columns without changing head + let columns = input.get_columns_mut(); + columns.insert(self.edge_or_end_v_tag as usize, DynEntry::new(neighbors_intersection)); + Ok(Some(input)) } } _ => Err(FnExecError::unsupported_error(&format!( From 2b15395355936b26f4a92e9d40effafb3ca4f0e5 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 27 Mar 2024 15:34:55 +0800 Subject: [PATCH 11/17] revert implementations in expand_intersect --- .../process/operator/map/expand_intersect.rs | 98 +------------------ 1 file changed, 5 insertions(+), 93 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs index a94273c99dd0..b2dd3096398c 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs @@ -45,7 +45,7 @@ struct ExpandOrIntersect { /// An optimized entry implementation for intersection, which denotes a collection of vertices; /// Specifically, vertex_vec records the unique vertex ids in the collection, /// and count_vec records the number of the corresponding vertex, since duplicated vertices are allowed. -#[derive(Debug, Clone, Hash, PartialEq, PartialOrd, Default)] +#[derive(Debug, Clone, Hash, PartialEq, PartialOrd)] pub struct IntersectionEntry { vertex_vec: Vec, count_vec: Vec, @@ -71,9 +71,6 @@ impl IntersectionEntry { fn intersect>(&mut self, seeker: Iter) { let len = self.vertex_vec.len(); - if len == 0 { - return; - } let mut s = vec![0; len]; for vid in seeker { if let Ok(idx) = self @@ -207,79 +204,6 @@ impl FilterMapFunction for ExpandOrIntersect } } -/// An OptionalExpandOrIntersect operator to expand neighbor -/// e.g., based on a->b, we intersect optional edges of a-(opt)->c and b-(opt)->c, -/// then the results could be either matches of a->b->c + a->c if there exits matches of c, or just a->b, where the match for c is a Object::None. -struct OptionalExpandOrIntersect { - start_v_tag: Option, - edge_or_end_v_tag: KeyId, - stmt: Box>, -} - -impl FilterMapFunction for OptionalExpandOrIntersect { - fn exec(&self, mut input: Record) -> FnResult> { - let entry = input.get(self.start_v_tag).ok_or_else(|| { - FnExecError::get_tag_error(&format!( - "get start_v_tag {:?} from record in `ExpandOrIntersect` operator, the record is {:?}", - self.start_v_tag, input - )) - })?; - match entry.get_type() { - EntryType::Vertex => { - let id = entry.id(); - let iter = self.stmt.exec(id)?.map(|e| { - if let Some(vertex) = e.as_vertex() { - vertex.id() as ID - } else if let Some(edge) = e.as_edge() { - edge.get_other_id() as ID - } else { - unreachable!() - } - }); - if let Some(pre_entry) = input.get_mut(Some(self.edge_or_end_v_tag)) { - // the case of expansion and intersection - // The behavior of intersection is to intersect with the previous intersection: - // 1. if the previous intersection is empty, return an record with the intersected vertex as an empty IntersectionEntry - // 2. if the previous intersection is not empty, intersect with the current neighbor_iter - // 2.1 if the intersected result is empty, return an record with the intersected vertex as an empty IntersectionEntry - // 2.2 if the intersected result is not empty, return an record with the intersected vertex - let pre_intersection = pre_entry - .as_any_mut() - .downcast_mut::() - .ok_or_else(|| { - FnExecError::unexpected_data_error(&format!( - "entry is not a intersection in ExpandOrIntersect" - )) - })?; - if pre_intersection.is_empty() { - // do nothing as the previous intersection is already an empty IntersectionEntry, denoting None - Ok(Some(input)) - } else { - let mut pre_intersection_clone = pre_intersection.clone(); - pre_intersection_clone.intersect(iter); - if pre_intersection_clone.is_empty() { - Ok(Some(input)) - } else { - *pre_intersection = pre_intersection_clone; - Ok(Some(input)) - } - } - } else { - let neighbors_intersection = IntersectionEntry::from_iter(iter); - // append columns without changing head - let columns = input.get_columns_mut(); - columns.insert(self.edge_or_end_v_tag as usize, DynEntry::new(neighbors_intersection)); - Ok(Some(input)) - } - } - _ => Err(FnExecError::unsupported_error(&format!( - "expand or intersect entry {:?} of tag {:?} failed in ExpandOrIntersect", - entry, self.edge_or_end_v_tag - )))?, - } - } -} - impl FilterMapFuncGen for pb::EdgeExpand { fn gen_filter_map(self) -> FnGenResult>> { let graph = graph_proxy::apis::get_graph().ok_or_else(|| FnGenError::NullGraphError)?; @@ -303,25 +227,13 @@ impl FilterMapFuncGen for pb::EdgeExpand { // Expand vertices with filters on edges. // This can be regarded as a combination of EdgeExpand (with expand_opt as Edge) + GetV let stmt = graph.prepare_explore_edge(direction, &query_params)?; - if self.is_optional { - let edge_expand_operator = - OptionalExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; - Ok(Box::new(edge_expand_operator)) - } else { - let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; - Ok(Box::new(edge_expand_operator)) - } + let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; + Ok(Box::new(edge_expand_operator)) } else { // Expand vertices without any filters let stmt = graph.prepare_explore_vertex(direction, &query_params)?; - if self.is_optional { - let edge_expand_operator = - OptionalExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; - Ok(Box::new(edge_expand_operator)) - } else { - let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; - Ok(Box::new(edge_expand_operator)) - } + let edge_expand_operator = ExpandOrIntersect { start_v_tag, edge_or_end_v_tag, stmt }; + Ok(Box::new(edge_expand_operator)) } } } From 743b673fc06a758dbd771994d640666ec691ec2c Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 3 Sep 2024 14:38:35 +0800 Subject: [PATCH 12/17] fix dealing with None entry in getv and unfold --- .../src/process/operator/flatmap/unfold.rs | 36 +++++++------ .../runtime/src/process/operator/map/get_v.rs | 53 ++++++++++++++----- 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs index 8ba0d44f3a6c..b8bd8b5bb6f3 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs @@ -58,11 +58,11 @@ impl FlatMapFunction for UnfoldOperator { .as_any_ref() .downcast_ref::() { - let len = intersection.len(); - if len == 0 { - input.append(Object::None, self.alias); - Ok(Box::new(vec![input].into_iter())) - } else { + let len = intersection.len(); + if len == 0 { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { let mut res = Vec::with_capacity(len); for item in intersection.iter().cloned() { let mut new_entry = input.clone(); @@ -70,23 +70,29 @@ impl FlatMapFunction for UnfoldOperator { res.push(new_entry); } Ok(Box::new(res.into_iter())) - } + } } else if let Some(general_intersection) = entry .as_any_ref() .downcast_ref::() { - let mut res = Vec::with_capacity(general_intersection.len()); - for (vid, matchings) in general_intersection.matchings_iter() { - for matching in matchings { - let mut new_entry = input.clone(); - for (column, tag) in matching { - new_entry.append(column.clone(), Some(tag)); + let len = general_intersection.len(); + if len == 0 { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + let mut res = Vec::with_capacity(len); + for (vid, matchings) in general_intersection.matchings_iter() { + for matching in matchings { + let mut new_entry = input.clone(); + for (column, tag) in matching { + new_entry.append(column.clone(), Some(tag)); + } + new_entry.append(Vertex::new(vid, None, DynDetails::default()), self.alias); + res.push(new_entry); } - new_entry.append(Vertex::new(vid, None, DynDetails::default()), self.alias); - res.push(new_entry); } + Ok(Box::new(res.into_iter())) } - Ok(Box::new(res.into_iter())) } else { Err(FnExecError::unexpected_data_error( "downcast intersection entry in UnfoldOperator", diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index dd5ba4d608a0..a9e85c71b65f 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -58,20 +58,8 @@ impl GetVertexOperator { impl FilterMapFunction for GetVertexOperator { fn exec(&self, mut input: Record) -> FnResult> { - let entry_type = - if let Some(entry) = input.get(self.start_tag) { entry.get_type() } else { return Ok(None) }; - match entry_type { - EntryType::Edge => { - let e = input - .get(self.start_tag) - .ok_or_else(|| FnExecError::Unreachable)? - .as_edge() - .ok_or_else(|| { - FnExecError::unexpected_data_error(&format!( - "entry is not an edge in GetV {:?}", - self - )) - })?; + if let Some(entry) = input.get(self.start_tag) { + if let Some(e) = entry.as_edge() { let (id, label) = match self.opt { VOpt::Start => (e.src_id, e.get_src_label()), VOpt::End => (e.dst_id, e.get_dst_label()), @@ -130,6 +118,20 @@ impl FilterMapFunction for GetVertexOperator { } else { Err(FnExecError::unexpected_data_error("unreachable path end entry in GetV"))? } + } else if let Some(obj) = entry.as_object() { + if Object::None.eq(obj) { + if self.query_labels.is_empty() { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } else { + return Ok(None); + } + } else { + Err(FnExecError::unexpected_data_error(&format!( + "Can only apply `GetV` on an object that is not None. The entry is {:?}", + entry + )))? + } } else { Err(FnExecError::unexpected_data_error( &format!( "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", entry @@ -253,6 +255,29 @@ impl FilterMapFunction for AuxiliaOperator { } else { return Ok(None); } + } else if let Some(obj) = entry.as_object() { + if Object::None.eq(obj) { + if let Some(predicate) = &self.query_params.filter { + // TODO: eval by predicate instead of directly regarding it as false + let res = predicate + .eval_bool(Some(&input)) + .map_err(|e| FnExecError::from(e))?; + if res { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } else { + return Ok(None); + } + } else { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } + } else { + Err(FnExecError::unexpected_data_error(&format!( + "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}", + entry + )))? + } } else { Err(FnExecError::unexpected_data_error(&format!( "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}", From 2980ce7a2d626fc472979954f562ab237560b7b0 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 3 Sep 2024 15:56:02 +0800 Subject: [PATCH 13/17] [ci tests] add e2e tests --- .../integration/suite/simple/SimpleMatchQueries.java | 10 ++++++++++ .../cypher/integration/ldbc/SimpleMatchTest.java | 7 +++++++ .../src/process/operator/flatmap/edge_expand.rs | 3 --- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java index bfedfda21f49..b5ed0797b1b0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java @@ -209,4 +209,14 @@ public static QueryContext get_simple_match_query_16_test() { List expected = Arrays.asList("Record<{$f0: 325593}>"); return new QueryContext(query, expected); } + + public static QueryContext get_simple_match_query_17_test() { + String query = + "MATCH (person:PERSON {id: 26388279067534})<-[:HASCREATOR]-(message: POST |" + + " COMMENT)\n" + + "OPTIONAL MATCH (message: POST | COMMENT)<-[like:LIKES]-(liker:PERSON)\n" + + " Return count(person);"; + List expected = Arrays.asList("Record<{$f0: 111}>"); + return new QueryContext(query, expected); + } } diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java index 49b42038d1b6..8f29b1287824 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java @@ -154,6 +154,13 @@ public void run_simple_match_16_test() { Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString()); } + @Test + public void run_simple_match_17_test() { + QueryContext testQuery = SimpleMatchQueries.get_simple_match_query_17_test(); + Result result = session.run(testQuery.getQuery()); + Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString()); + } + @AfterClass public static void afterClass() { if (session != null) { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs index 6c66a90082fe..4f4c347309e2 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs @@ -204,9 +204,6 @@ impl FlatMapFuncGen for pb::EdgeExpand { fn gen_flat_map( self, ) -> FnGenResult>>> { - if self.is_optional { - return Err(FnGenError::unsupported_error("optional edge expand in EdgeExpandOperator")); - } let graph = get_graph().ok_or_else(|| FnGenError::NullGraphError)?; let start_v_tag = self.v_tag; let edge_or_end_v_tag = self.alias; From 49d0fec32f446931459ae784fb2403c436b3394a Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 3 Sep 2024 19:09:02 +0800 Subject: [PATCH 14/17] [Doc] add documents about optional match --- docs/interactive_engine/neo4j/supported_cypher.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/interactive_engine/neo4j/supported_cypher.md b/docs/interactive_engine/neo4j/supported_cypher.md index 0a00bfbe9c9d..ac450fa1eecf 100644 --- a/docs/interactive_engine/neo4j/supported_cypher.md +++ b/docs/interactive_engine/neo4j/supported_cypher.md @@ -122,6 +122,14 @@ MATCH (a) -[]-> () -[]-> (b) # second MATCH clause RETURN a, b; ``` +Besides, we support `OPTIONAL MATCH`. For example, +the following query can be supported: +```Cypher +MATCH (a) -[]-> (b) +Optional MATCH (b) -[]-> (c) +RETURN a, b, c; +``` + | Keyword | Comments | Supported | Todo |:---|---|:---:|:---| | MATCH | only one Match clause is allowed | | From fe73696e49b16c5936131b15322da0b4ed313373 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Thu, 5 Sep 2024 21:16:24 +0800 Subject: [PATCH 15/17] more tests --- .../integrated/tests/optional_expand_test.rs | 68 +++++++++++++++---- 1 file changed, 55 insertions(+), 13 deletions(-) diff --git a/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs index ac4a45324a51..95abc4fee9d4 100644 --- a/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs @@ -63,7 +63,7 @@ mod test { let result = pegasus::run(conf, || { let expand = expand.clone(); |input, output| { - let mut stream = input.input_from(source_gen(None))?; + let mut stream = input.input_from(source_gen(Some(TAG_A)))?; let flatmap_func = expand.gen_flat_map().unwrap(); stream = stream.flat_map(move |input| flatmap_func.exec(input))?; stream.sink_into(output) @@ -74,6 +74,7 @@ mod test { } // g.V().out() with optional out + // v1, v4, v6 have out-neighbors; while v2, v3, v5 do not #[test] fn optional_expand_outv_test() { let expand_opr_pb = pb::EdgeExpand { @@ -107,6 +108,40 @@ mod test { assert_eq!(none_cnt, 3); } + // g.V().out('knows') with optional out + // v1 has out knows neighbors of v2, v4; while other vertices do not have out knows neighbors + #[test] + fn optional_expand_outv_test_2() { + let query_param = query_params(vec![KNOWS_LABEL.into()], vec![], None); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(query_param), + expand_opt: 0, + alias: None, + is_optional: true, + }; + let mut result = expand_test(expand_opr_pb); + let mut result_ids = vec![]; + let mut none_cnt = 0; + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let mut expected_ids = vec![v2, v4]; + while let Some(Ok(record)) = result.next() { + println!("record: {:?}", record); + if let Some(element) = record.get(None).unwrap().as_vertex() { + result_ids.push(element.id() as usize) + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } + } + result_ids.sort(); + expected_ids.sort(); + assert_eq!(result_ids, expected_ids); + assert_eq!(none_cnt, 5); + } + // g.V().outE('knows', 'created') with optional out #[test] fn optional_expand_oute_with_many_labels_test() { @@ -143,28 +178,28 @@ mod test { assert_eq!(none_cnt, 3); } - // g.V().out('knows').has('id',2) with optional out - // in this case, for the vertices, e.g., v2, v3, v4, v5, v6, that do not have out knows edges, they would be filtered out. + // g.V().out('knows').where(@ isnull) with optional out + // in this case, for the vertices v2, v3, v4, v5, v6, that do not have out knows edges. #[test] fn optional_expand_outv_filter_test() { - let edge_query_param = query_params(vec![KNOWS_LABEL.into()], vec![], None); + let query_param = query_params(vec![KNOWS_LABEL.into()], vec![], None); let expand_opr_pb = pb::EdgeExpand { v_tag: None, direction: 0, - params: Some(edge_query_param), + params: Some(query_param), expand_opt: 0, alias: None, is_optional: true, }; - let vertex_query_param = query_params(vec![], vec![], str_to_expr_pb("@.id == 2".to_string()).ok()); + let vertex_query_param = query_params(vec![], vec![], str_to_expr_pb("isnull @".to_string()).ok()); let auxilia_opr_pb = pb::GetV { tag: None, opt: 4, params: Some(vertex_query_param), alias: None }; - let conf = JobConf::new("expand_getv_test"); + let conf = JobConf::new("optional_expand_outv_filter_test"); let mut result = pegasus::run(conf, || { let expand = expand_opr_pb.clone(); let auxilia = auxilia_opr_pb.clone(); |input, output| { - let mut stream = input.input_from(source_gen(None))?; + let mut stream = input.input_from(source_gen(Some(TAG_A)))?; let flatmap_func = expand.gen_flat_map().unwrap(); stream = stream.flat_map(move |input| flatmap_func.exec(input))?; let filter_map_func = auxilia.gen_filter_map().unwrap(); @@ -174,13 +209,20 @@ mod test { }) .expect("build job failure"); - let mut result_ids = vec![]; + let mut result_ids: Vec = vec![]; let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); - let expected_ids = vec![v2]; + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let expected_ids = vec![v2, v3, v4, v5, v6]; while let Some(Ok(record)) = result.next() { - println!("record: {:?}", record); - let element = record.get(None).unwrap().as_vertex().unwrap(); - result_ids.push(element.id() as usize) + let vertex = record + .get(Some(TAG_A)) + .unwrap() + .as_vertex() + .unwrap(); + result_ids.push(vertex.id() as usize); } assert_eq!(result_ids, expected_ids) } From 77714456f37ef2549e75fa9b478020ea734f55f7 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Fri, 6 Sep 2024 13:35:36 +0800 Subject: [PATCH 16/17] refine the impl --- .../process/operator/flatmap/edge_expand.rs | 174 +++++------------- .../runtime/src/process/operator/map/get_v.rs | 9 +- 2 files changed, 43 insertions(+), 140 deletions(-) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs index 4f4c347309e2..3b9104b38711 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs @@ -34,92 +34,12 @@ pub struct EdgeExpandOperator { alias: Option, stmt: Box>, expand_opt: ExpandOpt, + is_optional: bool, } impl FlatMapFunction for EdgeExpandOperator { type Target = DynIter; - fn exec(&self, mut input: Record) -> FnResult { - if let Some(entry) = input.get(self.start_v_tag) { - match entry.get_type() { - EntryType::Vertex => { - let id = entry.id(); - let iter = self.stmt.exec(id)?; - match self.expand_opt { - // the case of expand edge, and get end vertex; - ExpandOpt::Vertex => { - let neighbors_iter = iter.map(|e| { - if let Some(e) = e.as_edge() { - Vertex::new( - e.get_other_id(), - e.get_other_label().cloned(), - DynDetails::default(), - ) - } else { - unreachable!() - } - }); - Ok(Box::new(RecordExpandIter::new( - input, - self.alias.as_ref(), - Box::new(neighbors_iter), - ))) - } - // the case of expand neighbors, including edges/vertices - ExpandOpt::Edge => { - Ok(Box::new(RecordExpandIter::new(input, self.alias.as_ref(), iter))) - } - // the case of get degree. TODO: this case should be a `Map` - ExpandOpt::Degree => { - let degree = iter.count(); - input.append(object!(degree), self.alias); - Ok(Box::new(vec![input].into_iter())) - } - } - } - EntryType::Path => { - let graph_path = entry - .as_graph_path() - .ok_or_else(|| FnExecError::Unreachable)?; - let iter = self.stmt.exec(graph_path.get_path_end().id())?; - let curr_path = graph_path.clone(); - Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter))) - } - EntryType::Object => { - let obj = entry - .as_object() - .ok_or_else(|| FnExecError::Unreachable)?; - if Object::None.eq(obj) { - input.append(Object::None, self.alias); - Ok(Box::new(vec![input].into_iter())) - } else { - Err(FnExecError::unexpected_data_error(&format!( - "Cannot Expand from current entry {:?}", - entry - )))? - } - } - _ => Err(FnExecError::unexpected_data_error(&format!( - "Cannot Expand from current entry {:?}", - entry - )))?, - } - } else { - Ok(Box::new(vec![].into_iter())) - } - } -} - -pub struct OptionalEdgeExpandOperator { - start_v_tag: Option, - alias: Option, - stmt: Box>, - expand_opt: ExpandOpt, -} - -impl FlatMapFunction for OptionalEdgeExpandOperator { - type Target = DynIter; - fn exec(&self, mut input: Record) -> FnResult { if let Some(entry) = input.get(self.start_v_tag) { match entry.get_type() { @@ -129,7 +49,7 @@ impl FlatMapFunction for OptionalEdgeExpandO match self.expand_opt { // the case of expand edge, and get end vertex; ExpandOpt::Vertex => { - if iter.peek().is_none() { + if self.is_optional && iter.peek().is_none() { input.append(Object::None, self.alias); Ok(Box::new(vec![input].into_iter())) } else { @@ -153,7 +73,7 @@ impl FlatMapFunction for OptionalEdgeExpandO } // the case of expand neighbors, including edges/vertices ExpandOpt::Edge => { - if iter.peek().is_none() { + if self.is_optional && iter.peek().is_none() { input.append(Object::None, self.alias); Ok(Box::new(vec![input].into_iter())) } else { @@ -164,7 +84,7 @@ impl FlatMapFunction for OptionalEdgeExpandO ))) } } - // the case of get degree. TODO: this case should be a `Map` + // the case of get degree. ExpandOpt::Degree => { let degree = iter.count(); input.append(object!(degree), self.alias); @@ -172,9 +92,20 @@ impl FlatMapFunction for OptionalEdgeExpandO } } } - EntryType::Path => Err(FnExecError::unsupported_error( - "Have not supported Optional Edge Expand in Path entry yet", - ))?, + EntryType::Path => { + if self.is_optional { + Err(FnExecError::unsupported_error( + "Have not supported Optional Edge Expand in Path entry yet", + ))? + } else { + let graph_path = entry + .as_graph_path() + .ok_or_else(|| FnExecError::Unreachable)?; + let iter = self.stmt.exec(graph_path.get_path_end().id())?; + let curr_path = graph_path.clone(); + Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter))) + } + } EntryType::Object => { let obj = entry .as_object() @@ -224,62 +155,39 @@ impl FlatMapFuncGen for pb::EdgeExpand { // Expand vertices with filters on edges. // This can be regarded as a combination of EdgeExpand (with is_edge = true) + GetV let stmt = graph.prepare_explore_edge(direction, &query_params)?; - if self.is_optional { - let edge_expand_operator = OptionalEdgeExpandOperator { - start_v_tag, - alias: edge_or_end_v_tag, - stmt, - expand_opt: ExpandOpt::Vertex, - }; - Ok(Box::new(edge_expand_operator)) - } else { - let edge_expand_operator = EdgeExpandOperator { - start_v_tag, - alias: edge_or_end_v_tag, - stmt, - expand_opt: ExpandOpt::Vertex, - }; - Ok(Box::new(edge_expand_operator)) - } + let edge_expand_operator = EdgeExpandOperator { + start_v_tag, + alias: edge_or_end_v_tag, + stmt, + expand_opt: ExpandOpt::Vertex, + is_optional: self.is_optional, + }; + Ok(Box::new(edge_expand_operator)) } else { // Expand vertices without any filters let stmt = graph.prepare_explore_vertex(direction, &query_params)?; - if self.is_optional { - let edge_expand_operator = OptionalEdgeExpandOperator { - start_v_tag, - alias: edge_or_end_v_tag, - stmt, - expand_opt: ExpandOpt::Edge, - }; - Ok(Box::new(edge_expand_operator)) - } else { - let edge_expand_operator = EdgeExpandOperator { - start_v_tag, - alias: edge_or_end_v_tag, - stmt, - expand_opt: ExpandOpt::Edge, - }; - Ok(Box::new(edge_expand_operator)) - } - } - } - _ => { - // Expand edges or degree - let stmt = graph.prepare_explore_edge(direction, &query_params)?; - if self.is_optional { - let edge_expand_operator = OptionalEdgeExpandOperator { + let edge_expand_operator = EdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, - expand_opt, + expand_opt: ExpandOpt::Edge, + is_optional: self.is_optional, }; Ok(Box::new(edge_expand_operator)) - } else { - let edge_expand_operator = - EdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, expand_opt }; - Ok(Box::new(edge_expand_operator)) } } + _ => { + // Expand edges or degree + let stmt = graph.prepare_explore_edge(direction, &query_params)?; + let edge_expand_operator = EdgeExpandOperator { + start_v_tag, + alias: edge_or_end_v_tag, + stmt, + expand_opt, + is_optional: self.is_optional, + }; + Ok(Box::new(edge_expand_operator)) + } } } } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index a9e85c71b65f..ef68632cab80 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -120,12 +120,8 @@ impl FilterMapFunction for GetVertexOperator { } } else if let Some(obj) = entry.as_object() { if Object::None.eq(obj) { - if self.query_labels.is_empty() { - input.append(Object::None, self.alias); - return Ok(Some(input)); - } else { - return Ok(None); - } + input.append(Object::None, self.alias); + Ok(Some(input)) } else { Err(FnExecError::unexpected_data_error(&format!( "Can only apply `GetV` on an object that is not None. The entry is {:?}", @@ -258,7 +254,6 @@ impl FilterMapFunction for AuxiliaOperator { } else if let Some(obj) = entry.as_object() { if Object::None.eq(obj) { if let Some(predicate) = &self.query_params.filter { - // TODO: eval by predicate instead of directly regarding it as false let res = predicate .eval_bool(Some(&input)) .map_err(|e| FnExecError::from(e))?; From 063163fb6f685d50839ddce03dad7ecea20428b2 Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Wed, 18 Sep 2024 11:15:04 +0800 Subject: [PATCH 17/17] fix ci, and format --- .../suite/simple/SimpleMatchQueries.java | 2 +- .../integration/ldbc/SimpleMatchTest.java | 1 + .../src/apis/graph/element/path.rs | 42 +++++++++++++------ .../executor/ir/runtime/src/assembly.rs | 4 +- 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java index b5ed0797b1b0..cb00ef88a020 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java @@ -216,7 +216,7 @@ public static QueryContext get_simple_match_query_17_test() { + " COMMENT)\n" + "OPTIONAL MATCH (message: POST | COMMENT)<-[like:LIKES]-(liker:PERSON)\n" + " Return count(person);"; - List expected = Arrays.asList("Record<{$f0: 111}>"); + List expected = Arrays.asList("Record<{$f0: 851}>"); return new QueryContext(query, expected); } } diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java index 8f29b1287824..aa503ae020cc 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java @@ -156,6 +156,7 @@ public void run_simple_match_16_test() { @Test public void run_simple_match_17_test() { + assumeTrue("pegasus".equals(System.getenv("ENGINE_TYPE"))); QueryContext testQuery = SimpleMatchQueries.get_simple_match_query_17_test(); Result result = session.run(testQuery.getQuery()); Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString()); diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs index 5a2a55eb2653..bb1c1a0d30f6 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs @@ -104,7 +104,7 @@ impl GraphPath { let entry = entry.into(); let id = entry.id(); GraphPath::SimpleEndV((entry, vec![id], 1)) - }, + } pb::path_expand::PathOpt::Trail => GraphPath::TrailAllPath(vec![entry.into()]), }, pb::path_expand::ResultOpt::AllV | pb::path_expand::ResultOpt::AllVE => match path_opt { @@ -167,21 +167,27 @@ impl GraphPath { pub fn get_path_start(&self) -> Option<&VertexOrEdge> { match self { - GraphPath::AllPath(ref p) | GraphPath::SimpleAllPath(ref p) | GraphPath::TrailAllPath(ref p) => p.first(), + GraphPath::AllPath(ref p) + | GraphPath::SimpleAllPath(ref p) + | GraphPath::TrailAllPath(ref p) => p.first(), GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None, } } pub fn get_path_end(&self) -> &VertexOrEdge { match self { - GraphPath::AllPath(ref p) | GraphPath::SimpleAllPath(ref p) | GraphPath::TrailAllPath(ref p) => p.last().unwrap(), + GraphPath::AllPath(ref p) + | GraphPath::SimpleAllPath(ref p) + | GraphPath::TrailAllPath(ref p) => p.last().unwrap(), GraphPath::EndV((ref e, _)) | GraphPath::SimpleEndV((ref e, _, _)) => e, } } pub fn get_path_end_mut(&mut self) -> &mut VertexOrEdge { match self { - GraphPath::AllPath(ref mut p) | GraphPath::SimpleAllPath(ref mut p) | GraphPath::TrailAllPath(ref mut p) => p.last_mut().unwrap(), + GraphPath::AllPath(ref mut p) + | GraphPath::SimpleAllPath(ref mut p) + | GraphPath::TrailAllPath(ref mut p) => p.last_mut().unwrap(), GraphPath::EndV((ref mut e, _)) | GraphPath::SimpleEndV((ref mut e, _, _)) => e, } } @@ -233,7 +239,9 @@ impl GraphPath { // pop the last element from the path, and return the element. pub fn pop(&mut self) -> Option { match self { - GraphPath::AllPath(ref mut p) | GraphPath::SimpleAllPath(ref mut p) | GraphPath::TrailAllPath(ref mut p) => p.pop(), + GraphPath::AllPath(ref mut p) + | GraphPath::SimpleAllPath(ref mut p) + | GraphPath::TrailAllPath(ref mut p) => p.pop(), GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None, } } @@ -241,7 +249,9 @@ impl GraphPath { // reverse the path. pub fn reverse(&mut self) { match self { - GraphPath::AllPath(ref mut p) | GraphPath::SimpleAllPath(ref mut p) | GraphPath::TrailAllPath(ref mut p) => { + GraphPath::AllPath(ref mut p) + | GraphPath::SimpleAllPath(ref mut p) + | GraphPath::TrailAllPath(ref mut p) => { p.reverse(); } GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => {} @@ -251,7 +261,9 @@ impl GraphPath { // get the element ids in the path, including both vertices and edges. pub fn get_elem_ids(&self) -> Vec { match self { - GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => p.iter().map(|e| e.id()).collect(), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => { + p.iter().map(|e| e.id()).collect() + } GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _, _)) => vec![e.id()], } } @@ -259,7 +271,9 @@ impl GraphPath { // get the element labels in the path, including both vertices and edges. pub fn get_elem_labels(&self) -> Vec> { match self { - GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => p.iter().map(|e| e.label()).collect(), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => { + p.iter().map(|e| e.label()).collect() + } GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _, _)) => vec![e.label()], } } @@ -409,10 +423,10 @@ impl PartialEq for GraphPath { | (GraphPath::AllPath(p1), GraphPath::SimpleAllPath(p2)) | (GraphPath::AllPath(p1), GraphPath::TrailAllPath(p2)) | (GraphPath::SimpleAllPath(p1), GraphPath::AllPath(p2)) - | (GraphPath::SimpleAllPath(p1), GraphPath::SimpleAllPath(p2)) - | (GraphPath::SimpleAllPath(p1), GraphPath::TrailAllPath(p2)) - | (GraphPath::TrailAllPath(p1), GraphPath::AllPath(p2)) - | (GraphPath::TrailAllPath(p1), GraphPath::SimpleAllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::SimpleAllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::TrailAllPath(p2)) + | (GraphPath::TrailAllPath(p1), GraphPath::AllPath(p2)) + | (GraphPath::TrailAllPath(p1), GraphPath::SimpleAllPath(p2)) | (GraphPath::TrailAllPath(p1), GraphPath::TrailAllPath(p2)) => p1.eq(p2), (GraphPath::EndV((p1, _)), GraphPath::EndV((p2, _))) | (GraphPath::EndV((p1, _)), GraphPath::SimpleEndV((p2, _, _))) @@ -571,7 +585,9 @@ impl TryFrom for GraphPath { impl Hash for GraphPath { fn hash(&self, state: &mut H) { match self { - GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => p.hash(state), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => { + p.hash(state) + } GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _, _)) => e.hash(state), } } diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index 9cef1c501579..d6ea8040a923 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -786,7 +786,9 @@ impl IRJobAssembly { base ))) })?; - if (pb::path_expand::ResultOpt::AllVE == unsafe { std::mem::transmute(path.result_opt) } || pb::path_expand::PathOpt::Trail == unsafe { std::mem::transmute(path.path_opt) }) + if (pb::path_expand::ResultOpt::AllVE + == unsafe { std::mem::transmute(path.result_opt) } + || pb::path_expand::PathOpt::Trail == unsafe { std::mem::transmute(path.path_opt) }) && pb::edge_expand::ExpandOpt::Vertex == unsafe { std::mem::transmute(edge_expand.expand_opt) } {