From 36313cfd856a26a07be5fbfcbafdd385422d4b0b Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Thu, 19 Sep 2024 15:58:52 +0800 Subject: [PATCH] feat(interactive): Support optional expand in GIE Runtime (#4213) ## What do these changes do? As titled. Support optional edge expand, and support dealing with null value in expand, getv, unfold. ## Related issue number #3668 --- .../neo4j/supported_cypher.md | 8 + .../suite/simple/SimpleMatchQueries.java | 10 + .../integration/ldbc/SimpleMatchTest.java | 8 + .../src/apis/graph/element/path.rs | 42 ++- .../integrated/tests/optional_expand_test.rs | 342 ++++++++++++++++++ .../executor/ir/runtime/src/assembly.rs | 4 +- .../process/operator/flatmap/edge_expand.rs | 98 +++-- .../src/process/operator/flatmap/unfold.rs | 43 ++- .../runtime/src/process/operator/map/get_v.rs | 33 ++ 9 files changed, 530 insertions(+), 58 deletions(-) create mode 100644 interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs diff --git a/docs/interactive_engine/neo4j/supported_cypher.md b/docs/interactive_engine/neo4j/supported_cypher.md index faf0c0319c9f..fb1574d1b465 100644 --- a/docs/interactive_engine/neo4j/supported_cypher.md +++ b/docs/interactive_engine/neo4j/supported_cypher.md @@ -122,6 +122,14 @@ MATCH (a) -[]-> () -[]-> (b) # second MATCH clause RETURN a, b; ``` +Besides, we support `OPTIONAL MATCH`. For example, +the following query can be supported: +```Cypher +MATCH (a) -[]-> (b) +Optional MATCH (b) -[]-> (c) +RETURN a, b, c; +``` + | Keyword | Comments | Supported | Todo |:---|---|:---:|:---| | MATCH | only one Match clause is allowed | | diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java index bfedfda21f49..cb00ef88a020 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java @@ -209,4 +209,14 @@ public static QueryContext get_simple_match_query_16_test() { List expected = Arrays.asList("Record<{$f0: 325593}>"); return new QueryContext(query, expected); } + + public static QueryContext get_simple_match_query_17_test() { + String query = + "MATCH (person:PERSON {id: 26388279067534})<-[:HASCREATOR]-(message: POST |" + + " COMMENT)\n" + + "OPTIONAL MATCH (message: POST | COMMENT)<-[like:LIKES]-(liker:PERSON)\n" + + " Return count(person);"; + List expected = Arrays.asList("Record<{$f0: 851}>"); + return new QueryContext(query, expected); + } } diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java index 49b42038d1b6..aa503ae020cc 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java @@ -154,6 +154,14 @@ public void run_simple_match_16_test() { Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString()); } + @Test + public void run_simple_match_17_test() { + assumeTrue("pegasus".equals(System.getenv("ENGINE_TYPE"))); + QueryContext testQuery = SimpleMatchQueries.get_simple_match_query_17_test(); + Result result = session.run(testQuery.getQuery()); + Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString()); + } + @AfterClass public static void afterClass() { if (session != null) { diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs index 5a2a55eb2653..bb1c1a0d30f6 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs @@ -104,7 +104,7 @@ impl GraphPath { let entry = entry.into(); let id = entry.id(); GraphPath::SimpleEndV((entry, vec![id], 1)) - }, + } pb::path_expand::PathOpt::Trail => GraphPath::TrailAllPath(vec![entry.into()]), }, pb::path_expand::ResultOpt::AllV | pb::path_expand::ResultOpt::AllVE => match path_opt { @@ -167,21 +167,27 @@ impl GraphPath { pub fn get_path_start(&self) -> Option<&VertexOrEdge> { match self { - GraphPath::AllPath(ref p) | GraphPath::SimpleAllPath(ref p) | GraphPath::TrailAllPath(ref p) => p.first(), + GraphPath::AllPath(ref p) + | GraphPath::SimpleAllPath(ref p) + | GraphPath::TrailAllPath(ref p) => p.first(), GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None, } } pub fn get_path_end(&self) -> &VertexOrEdge { match self { - GraphPath::AllPath(ref p) | GraphPath::SimpleAllPath(ref p) | GraphPath::TrailAllPath(ref p) => p.last().unwrap(), + GraphPath::AllPath(ref p) + | GraphPath::SimpleAllPath(ref p) + | GraphPath::TrailAllPath(ref p) => p.last().unwrap(), GraphPath::EndV((ref e, _)) | GraphPath::SimpleEndV((ref e, _, _)) => e, } } pub fn get_path_end_mut(&mut self) -> &mut VertexOrEdge { match self { - GraphPath::AllPath(ref mut p) | GraphPath::SimpleAllPath(ref mut p) | GraphPath::TrailAllPath(ref mut p) => p.last_mut().unwrap(), + GraphPath::AllPath(ref mut p) + | GraphPath::SimpleAllPath(ref mut p) + | GraphPath::TrailAllPath(ref mut p) => p.last_mut().unwrap(), GraphPath::EndV((ref mut e, _)) | GraphPath::SimpleEndV((ref mut e, _, _)) => e, } } @@ -233,7 +239,9 @@ impl GraphPath { // pop the last element from the path, and return the element. pub fn pop(&mut self) -> Option { match self { - GraphPath::AllPath(ref mut p) | GraphPath::SimpleAllPath(ref mut p) | GraphPath::TrailAllPath(ref mut p) => p.pop(), + GraphPath::AllPath(ref mut p) + | GraphPath::SimpleAllPath(ref mut p) + | GraphPath::TrailAllPath(ref mut p) => p.pop(), GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None, } } @@ -241,7 +249,9 @@ impl GraphPath { // reverse the path. pub fn reverse(&mut self) { match self { - GraphPath::AllPath(ref mut p) | GraphPath::SimpleAllPath(ref mut p) | GraphPath::TrailAllPath(ref mut p) => { + GraphPath::AllPath(ref mut p) + | GraphPath::SimpleAllPath(ref mut p) + | GraphPath::TrailAllPath(ref mut p) => { p.reverse(); } GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => {} @@ -251,7 +261,9 @@ impl GraphPath { // get the element ids in the path, including both vertices and edges. pub fn get_elem_ids(&self) -> Vec { match self { - GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => p.iter().map(|e| e.id()).collect(), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => { + p.iter().map(|e| e.id()).collect() + } GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _, _)) => vec![e.id()], } } @@ -259,7 +271,9 @@ impl GraphPath { // get the element labels in the path, including both vertices and edges. pub fn get_elem_labels(&self) -> Vec> { match self { - GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => p.iter().map(|e| e.label()).collect(), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => { + p.iter().map(|e| e.label()).collect() + } GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _, _)) => vec![e.label()], } } @@ -409,10 +423,10 @@ impl PartialEq for GraphPath { | (GraphPath::AllPath(p1), GraphPath::SimpleAllPath(p2)) | (GraphPath::AllPath(p1), GraphPath::TrailAllPath(p2)) | (GraphPath::SimpleAllPath(p1), GraphPath::AllPath(p2)) - | (GraphPath::SimpleAllPath(p1), GraphPath::SimpleAllPath(p2)) - | (GraphPath::SimpleAllPath(p1), GraphPath::TrailAllPath(p2)) - | (GraphPath::TrailAllPath(p1), GraphPath::AllPath(p2)) - | (GraphPath::TrailAllPath(p1), GraphPath::SimpleAllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::SimpleAllPath(p2)) + | (GraphPath::SimpleAllPath(p1), GraphPath::TrailAllPath(p2)) + | (GraphPath::TrailAllPath(p1), GraphPath::AllPath(p2)) + | (GraphPath::TrailAllPath(p1), GraphPath::SimpleAllPath(p2)) | (GraphPath::TrailAllPath(p1), GraphPath::TrailAllPath(p2)) => p1.eq(p2), (GraphPath::EndV((p1, _)), GraphPath::EndV((p2, _))) | (GraphPath::EndV((p1, _)), GraphPath::SimpleEndV((p2, _, _))) @@ -571,7 +585,9 @@ impl TryFrom for GraphPath { impl Hash for GraphPath { fn hash(&self, state: &mut H) { match self { - GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => p.hash(state), + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) | GraphPath::TrailAllPath(p) => { + p.hash(state) + } GraphPath::EndV((e, _)) | GraphPath::SimpleEndV((e, _, _)) => e.hash(state), } } diff --git a/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs new file mode 100644 index 000000000000..95abc4fee9d4 --- /dev/null +++ b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs @@ -0,0 +1,342 @@ +// +//! Copyright 2021 Alibaba Group Holding Limited. +//! +//! Licensed under the Apache License, Version 2.0 (the "License"); +//! you may not use this file except in compliance with the License. +//! You may obtain a copy of the License at +//! +//! http://www.apache.org/licenses/LICENSE-2.0 +//! +//! Unless required by applicable law or agreed to in writing, software +//! distributed under the License is distributed on an "AS IS" BASIS, +//! WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +//! See the License for the specific language governing permissions and +//! limitations under the License. +//! +//! + +mod common; + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use dyn_type::Object; + use graph_proxy::apis::{register_graph, GraphElement}; + use graph_proxy::create_exp_store; + use graph_store::ldbc::LDBCVertexParser; + use graph_store::prelude::DefaultId; + use ir_common::expr_parse::str_to_expr_pb; + use ir_common::generated::physical as pb; + use ir_common::KeyId; + use pegasus::api::{Map, Sink}; + use pegasus::result::ResultStream; + use pegasus::JobConf; + use runtime::process::entry::Entry; + use runtime::process::operator::flatmap::FlatMapFuncGen; + use runtime::process::operator::map::FilterMapFuncGen; + use runtime::process::operator::source::SourceOperator; + use runtime::process::record::Record; + + use crate::common::test::*; + + // g.V() + fn source_gen(alias: Option) -> Box + Send> { + source_gen_with_scan_opr(pb::Scan { + scan_opt: 0, + alias, + params: None, + idx_predicate: None, + is_count_only: false, + }) + } + + fn source_gen_with_scan_opr(scan_opr_pb: pb::Scan) -> Box + Send> { + let graph = create_exp_store(Arc::new(TestCluster {})); + register_graph(graph); + let source = SourceOperator::new(scan_opr_pb.into(), Arc::new(TestRouter::default())).unwrap(); + source.gen_source(0).unwrap() + } + + fn expand_test(expand: pb::EdgeExpand) -> ResultStream { + let conf = JobConf::new("expand_test"); + let result = pegasus::run(conf, || { + let expand = expand.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(Some(TAG_A)))?; + let flatmap_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flatmap_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + result + } + + // g.V().out() with optional out + // v1, v4, v6 have out-neighbors; while v2, v3, v5 do not + #[test] + fn optional_expand_outv_test() { + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: None, + expand_opt: 0, + alias: None, + is_optional: true, + }; + let mut result = expand_test(expand_opr_pb); + let mut result_ids = vec![]; + let mut none_cnt = 0; + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let mut expected_ids = vec![v2, v3, v3, v3, v4, v5]; + while let Some(Ok(record)) = result.next() { + if let Some(element) = record.get(None).unwrap().as_vertex() { + result_ids.push(element.id() as usize) + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } + } + result_ids.sort(); + expected_ids.sort(); + assert_eq!(result_ids, expected_ids); + // v2, v3, v5 does not have out edges + assert_eq!(none_cnt, 3); + } + + // g.V().out('knows') with optional out + // v1 has out knows neighbors of v2, v4; while other vertices do not have out knows neighbors + #[test] + fn optional_expand_outv_test_2() { + let query_param = query_params(vec![KNOWS_LABEL.into()], vec![], None); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(query_param), + expand_opt: 0, + alias: None, + is_optional: true, + }; + let mut result = expand_test(expand_opr_pb); + let mut result_ids = vec![]; + let mut none_cnt = 0; + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let mut expected_ids = vec![v2, v4]; + while let Some(Ok(record)) = result.next() { + println!("record: {:?}", record); + if let Some(element) = record.get(None).unwrap().as_vertex() { + result_ids.push(element.id() as usize) + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } + } + result_ids.sort(); + expected_ids.sort(); + assert_eq!(result_ids, expected_ids); + assert_eq!(none_cnt, 5); + } + + // g.V().outE('knows', 'created') with optional out + #[test] + fn optional_expand_oute_with_many_labels_test() { + let query_param = query_params(vec![KNOWS_LABEL.into(), CREATED_LABEL.into()], vec![], None); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(query_param), + expand_opt: 1, + alias: None, + is_optional: true, + }; + let mut result = expand_test(expand_opr_pb); + let mut result_edges = vec![]; + let mut none_cnt = 0; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let mut expected_edges = vec![(v1, v2), (v1, v3), (v1, v4), (v4, v3), (v4, v5), (v6, v3)]; + expected_edges.sort(); + while let Some(Ok(record)) = result.next() { + if let Some(e) = record.get(None).unwrap().as_edge() { + result_edges.push((e.src_id as usize, e.dst_id as usize)); + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } + } + result_edges.sort(); + assert_eq!(result_edges, expected_edges); + assert_eq!(none_cnt, 3); + } + + // g.V().out('knows').where(@ isnull) with optional out + // in this case, for the vertices v2, v3, v4, v5, v6, that do not have out knows edges. + #[test] + fn optional_expand_outv_filter_test() { + let query_param = query_params(vec![KNOWS_LABEL.into()], vec![], None); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(query_param), + expand_opt: 0, + alias: None, + is_optional: true, + }; + let vertex_query_param = query_params(vec![], vec![], str_to_expr_pb("isnull @".to_string()).ok()); + let auxilia_opr_pb = pb::GetV { tag: None, opt: 4, params: Some(vertex_query_param), alias: None }; + + let conf = JobConf::new("optional_expand_outv_filter_test"); + let mut result = pegasus::run(conf, || { + let expand = expand_opr_pb.clone(); + let auxilia = auxilia_opr_pb.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(Some(TAG_A)))?; + let flatmap_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flatmap_func.exec(input))?; + let filter_map_func = auxilia.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + + let mut result_ids: Vec = vec![]; + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let expected_ids = vec![v2, v3, v4, v5, v6]; + while let Some(Ok(record)) = result.next() { + let vertex = record + .get(Some(TAG_A)) + .unwrap() + .as_vertex() + .unwrap(); + result_ids.push(vertex.id() as usize); + } + assert_eq!(result_ids, expected_ids) + } + + // g.V().outE('knows').inV() with optional outE + // in this case, for the vertices, e.g., v2, v3, v4, v5, v6, that do not have out knows edges, the result of inV() would also be regarded as None. + #[test] + fn optional_expand_oute_inv_test() { + let expand_opr = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 1, + alias: None, + is_optional: true, + }; + + let getv_opr = pb::GetV { + tag: None, + opt: 1, // EndV + params: Some(query_params(vec![], vec![], None)), + alias: None, + }; + + let conf = JobConf::new("expand_oute_inv_test"); + let mut result = pegasus::run(conf, || { + let expand = expand_opr.clone(); + let getv = getv_opr.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(None))?; + let flatmap_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flatmap_func.exec(input))?; + let filter_map_func = getv.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + + let expected_ids = vec![2, 4]; + let mut result_ids = vec![]; + let mut none_cnt = 0; + while let Some(Ok(record)) = result.next() { + if let Some(element) = record.get(None).unwrap().as_vertex() { + result_ids.push(element.id() as usize); + } else if let Some(obj) = record.get(None).unwrap().as_object() { + assert_eq!(obj, &Object::None); + none_cnt += 1; + } else { + unreachable!() + } + } + result_ids.sort(); + assert_eq!(result_ids, expected_ids); + assert_eq!(none_cnt, 5); + } + + // g.V().as(0).select(0).by(out().count().as(1)) with optional out + // in this case, the vertices that do not have out edges would also be taken into account in the count() + #[test] + fn optional_expand_out_degree_test() { + let conf = JobConf::new("expand_degree_fused_test"); + let expand_opr_pb = pb::EdgeExpand { + v_tag: None, + direction: 0, + params: None, + expand_opt: 2, + alias: Some(1.into()), + is_optional: true, + }; + let getv = pb::GetV { tag: None, opt: 4, params: None, alias: Some(TAG_A) }; + let project = pb::Project { + mappings: vec![pb::project::ExprAlias { + expr: str_to_expr_pb("@0".to_string()).ok(), + alias: None, + }], + is_append: true, + }; + + let mut pegasus_result = pegasus::run(conf, || { + let getv = getv.clone(); + let expand = expand_opr_pb.clone(); + let project = project.clone(); + |input, output| { + let mut stream = input.input_from(source_gen(None))?; + let filter_map_func = getv.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + let flat_map_func = expand.gen_flat_map().unwrap(); + stream = stream.flat_map(move |input| flat_map_func.exec(input))?; + let filter_map_func = project.gen_filter_map().unwrap(); + stream = stream.filter_map(move |input| filter_map_func.exec(input))?; + stream.sink_into(output) + } + }) + .expect("build job failure"); + + let mut results = vec![]; + let v1: DefaultId = LDBCVertexParser::to_global_id(1, 0); + let v2: DefaultId = LDBCVertexParser::to_global_id(2, 0); + let v3: DefaultId = LDBCVertexParser::to_global_id(3, 1); + let v4: DefaultId = LDBCVertexParser::to_global_id(4, 0); + let v5: DefaultId = LDBCVertexParser::to_global_id(5, 1); + let v6: DefaultId = LDBCVertexParser::to_global_id(6, 0); + let mut expected_results = vec![(v1, 3), (v2, 0), (v3, 0), (v4, 2), (v5, 0), (v6, 1)]; + while let Some(Ok(record)) = pegasus_result.next() { + if let Some(v) = record.get(None).unwrap().as_vertex() { + if let Some(degree_obj) = record.get(Some(1)).unwrap().as_object() { + results.push((v.id() as DefaultId, degree_obj.as_u64().unwrap())); + } + } + } + results.sort(); + expected_results.sort(); + + assert_eq!(results, expected_results) + } +} diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index 9cef1c501579..d6ea8040a923 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -786,7 +786,9 @@ impl IRJobAssembly { base ))) })?; - if (pb::path_expand::ResultOpt::AllVE == unsafe { std::mem::transmute(path.result_opt) } || pb::path_expand::PathOpt::Trail == unsafe { std::mem::transmute(path.path_opt) }) + if (pb::path_expand::ResultOpt::AllVE + == unsafe { std::mem::transmute(path.result_opt) } + || pb::path_expand::PathOpt::Trail == unsafe { std::mem::transmute(path.path_opt) }) && pb::edge_expand::ExpandOpt::Vertex == unsafe { std::mem::transmute(edge_expand.expand_opt) } { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs index 8c5446f709e8..3b9104b38711 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs @@ -15,6 +15,7 @@ use std::convert::TryInto; +use dyn_type::Object; use graph_proxy::apis::{ get_graph, Direction, DynDetails, GraphElement, QueryParams, Statement, Vertex, ID, }; @@ -33,6 +34,7 @@ pub struct EdgeExpandOperator { alias: Option, stmt: Box>, expand_opt: ExpandOpt, + is_optional: bool, } impl FlatMapFunction for EdgeExpandOperator { @@ -43,32 +45,46 @@ impl FlatMapFunction for EdgeExpandOperator< match entry.get_type() { EntryType::Vertex => { let id = entry.id(); - let iter = self.stmt.exec(id)?; + let mut iter = self.stmt.exec(id)?.peekable(); match self.expand_opt { // the case of expand edge, and get end vertex; ExpandOpt::Vertex => { - let neighbors_iter = iter.map(|e| { - if let Some(e) = e.as_edge() { - Vertex::new( - e.get_other_id(), - e.get_other_label().cloned(), - DynDetails::default(), - ) - } else { - unreachable!() - } - }); - Ok(Box::new(RecordExpandIter::new( - input, - self.alias.as_ref(), - Box::new(neighbors_iter), - ))) + if self.is_optional && iter.peek().is_none() { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + let neighbors_iter = iter.map(|e| { + if let Some(e) = e.as_edge() { + Vertex::new( + e.get_other_id(), + e.get_other_label().cloned(), + DynDetails::default(), + ) + } else { + unreachable!() + } + }); + Ok(Box::new(RecordExpandIter::new( + input, + self.alias.as_ref(), + Box::new(neighbors_iter), + ))) + } } // the case of expand neighbors, including edges/vertices ExpandOpt::Edge => { - Ok(Box::new(RecordExpandIter::new(input, self.alias.as_ref(), iter))) + if self.is_optional && iter.peek().is_none() { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + Ok(Box::new(RecordExpandIter::new( + input, + self.alias.as_ref(), + Box::new(iter), + ))) + } } - // the case of get degree. TODO: this case should be a `Map` + // the case of get degree. ExpandOpt::Degree => { let degree = iter.count(); input.append(object!(degree), self.alias); @@ -77,12 +93,32 @@ impl FlatMapFunction for EdgeExpandOperator< } } EntryType::Path => { - let graph_path = entry - .as_graph_path() + if self.is_optional { + Err(FnExecError::unsupported_error( + "Have not supported Optional Edge Expand in Path entry yet", + ))? + } else { + let graph_path = entry + .as_graph_path() + .ok_or_else(|| FnExecError::Unreachable)?; + let iter = self.stmt.exec(graph_path.get_path_end().id())?; + let curr_path = graph_path.clone(); + Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter))) + } + } + EntryType::Object => { + let obj = entry + .as_object() .ok_or_else(|| FnExecError::Unreachable)?; - let iter = self.stmt.exec(graph_path.get_path_end().id())?; - let curr_path = graph_path.clone(); - Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter))) + if Object::None.eq(obj) { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + Err(FnExecError::unexpected_data_error(&format!( + "Cannot Expand from current entry {:?}", + entry + )))? + } } _ => Err(FnExecError::unexpected_data_error(&format!( "Cannot Expand from current entry {:?}", @@ -99,9 +135,6 @@ impl FlatMapFuncGen for pb::EdgeExpand { fn gen_flat_map( self, ) -> FnGenResult>>> { - if self.is_optional { - return Err(FnGenError::unsupported_error("optional edge expand in EdgeExpandOperator")); - } let graph = get_graph().ok_or_else(|| FnGenError::NullGraphError)?; let start_v_tag = self.v_tag; let edge_or_end_v_tag = self.alias; @@ -127,6 +160,7 @@ impl FlatMapFuncGen for pb::EdgeExpand { alias: edge_or_end_v_tag, stmt, expand_opt: ExpandOpt::Vertex, + is_optional: self.is_optional, }; Ok(Box::new(edge_expand_operator)) } else { @@ -137,6 +171,7 @@ impl FlatMapFuncGen for pb::EdgeExpand { alias: edge_or_end_v_tag, stmt, expand_opt: ExpandOpt::Edge, + is_optional: self.is_optional, }; Ok(Box::new(edge_expand_operator)) } @@ -144,8 +179,13 @@ impl FlatMapFuncGen for pb::EdgeExpand { _ => { // Expand edges or degree let stmt = graph.prepare_explore_edge(direction, &query_params)?; - let edge_expand_operator = - EdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, expand_opt }; + let edge_expand_operator = EdgeExpandOperator { + start_v_tag, + alias: edge_or_end_v_tag, + stmt, + expand_opt, + is_optional: self.is_optional, + }; Ok(Box::new(edge_expand_operator)) } } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs index 400dac9e39aa..b8bd8b5bb6f3 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs @@ -13,6 +13,7 @@ //! See the License for the specific language governing permissions and //! limitations under the License. +use dyn_type::Object; use graph_proxy::apis::{DynDetails, Element, Vertex}; use ir_common::generated::physical as pb; use ir_common::KeyId; @@ -57,29 +58,41 @@ impl FlatMapFunction for UnfoldOperator { .as_any_ref() .downcast_ref::() { - let mut res = Vec::with_capacity(intersection.len()); - for item in intersection.iter().cloned() { - let mut new_entry = input.clone(); - new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias); - res.push(new_entry); + let len = intersection.len(); + if len == 0 { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + let mut res = Vec::with_capacity(len); + for item in intersection.iter().cloned() { + let mut new_entry = input.clone(); + new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias); + res.push(new_entry); + } + Ok(Box::new(res.into_iter())) } - Ok(Box::new(res.into_iter())) } else if let Some(general_intersection) = entry .as_any_ref() .downcast_ref::() { - let mut res = Vec::with_capacity(general_intersection.len()); - for (vid, matchings) in general_intersection.matchings_iter() { - for matching in matchings { - let mut new_entry = input.clone(); - for (column, tag) in matching { - new_entry.append(column.clone(), Some(tag)); + let len = general_intersection.len(); + if len == 0 { + input.append(Object::None, self.alias); + Ok(Box::new(vec![input].into_iter())) + } else { + let mut res = Vec::with_capacity(len); + for (vid, matchings) in general_intersection.matchings_iter() { + for matching in matchings { + let mut new_entry = input.clone(); + for (column, tag) in matching { + new_entry.append(column.clone(), Some(tag)); + } + new_entry.append(Vertex::new(vid, None, DynDetails::default()), self.alias); + res.push(new_entry); } - new_entry.append(Vertex::new(vid, None, DynDetails::default()), self.alias); - res.push(new_entry); } + Ok(Box::new(res.into_iter())) } - Ok(Box::new(res.into_iter())) } else { Err(FnExecError::unexpected_data_error( "downcast intersection entry in UnfoldOperator", diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index b07a23310bad..ef68632cab80 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -15,6 +15,7 @@ use std::convert::TryInto; +use dyn_type::Object; use graph_proxy::apis::GraphElement; use graph_proxy::apis::{get_graph, DynDetails, GraphPath, QueryParams, Vertex}; use graph_proxy::utils::expr::eval_pred::EvalPred; @@ -117,6 +118,16 @@ impl FilterMapFunction for GetVertexOperator { } else { Err(FnExecError::unexpected_data_error("unreachable path end entry in GetV"))? } + } else if let Some(obj) = entry.as_object() { + if Object::None.eq(obj) { + input.append(Object::None, self.alias); + Ok(Some(input)) + } else { + Err(FnExecError::unexpected_data_error(&format!( + "Can only apply `GetV` on an object that is not None. The entry is {:?}", + entry + )))? + } } else { Err(FnExecError::unexpected_data_error( &format!( "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", entry @@ -240,6 +251,28 @@ impl FilterMapFunction for AuxiliaOperator { } else { return Ok(None); } + } else if let Some(obj) = entry.as_object() { + if Object::None.eq(obj) { + if let Some(predicate) = &self.query_params.filter { + let res = predicate + .eval_bool(Some(&input)) + .map_err(|e| FnExecError::from(e))?; + if res { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } else { + return Ok(None); + } + } else { + input.append(Object::None, self.alias); + return Ok(Some(input)); + } + } else { + Err(FnExecError::unexpected_data_error(&format!( + "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}", + entry + )))? + } } else { Err(FnExecError::unexpected_data_error(&format!( "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}",