diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java index a6cff80461b6..f027b644fcf1 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/cypher/integration/suite/simple/SimpleMatchQueries.java @@ -219,4 +219,15 @@ public static QueryContext get_simple_match_query_17_test() { List expected = Arrays.asList("Record<{$f0: 851}>"); return new QueryContext(query, expected); } + + public static QueryContext get_simple_match_query_18_test() { + String query = + "MATCH (country:PLACE {name:" + + " \"India\"})<-[:ISPARTOF]-(:PLACE)<-[:ISLOCATEDIN]-(zombie:PERSON)\n" + + "OPTIONAL MATCH (zombie)<-[:HASCREATOR]-(message)\n" + + "WHERE message.length < 100\n" + + " Return count(country);"; + List expected = Arrays.asList("Record<{$f0: 39783}>"); + return new QueryContext(query, expected); + } } diff --git a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java index aa503ae020cc..48b8e60fcfbf 100644 --- a/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java +++ b/interactive_engine/compiler/src/test/java/com/alibaba/graphscope/cypher/integration/ldbc/SimpleMatchTest.java @@ -162,6 +162,13 @@ public void run_simple_match_17_test() { Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString()); } + @Test + public void run_simple_match_18_test() { + QueryContext testQuery = SimpleMatchQueries.get_simple_match_query_18_test(); + Result result = session.run(testQuery.getQuery()); + Assert.assertEquals(testQuery.getExpectedResult().toString(), result.list().toString()); + } + @AfterClass public static void afterClass() { if (session != null) { diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs index ecd0837e5c1e..fe58fee9bfdc 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs @@ -30,6 +30,8 @@ use crate::utils::expr::eval_pred::PEvaluator; pub mod element; pub type ID = i64; +// a special id for Null graph elements. +pub const NULL_ID: ID = ID::MAX; pub fn read_id(reader: &mut R) -> io::Result { reader.read_i64() diff --git a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs index 2b27f09c1751..7de63b7da164 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs @@ -833,7 +833,7 @@ impl InnerOpr { mod tests { use ahash::HashMap; use dyn_type::DateTimeFormats; - use ir_common::{expr_parse::str_to_expr_pb, generated::physical::physical_opr::operator}; + use ir_common::expr_parse::str_to_expr_pb; use super::*; use crate::apis::{DynDetails, Vertex}; diff --git a/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs index 95abc4fee9d4..81a064d20541 100644 --- a/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/optional_expand_test.rs @@ -21,7 +21,6 @@ mod common; mod test { use std::sync::Arc; - use dyn_type::Object; use graph_proxy::apis::{register_graph, GraphElement}; use graph_proxy::create_exp_store; use graph_store::ldbc::LDBCVertexParser; @@ -96,8 +95,7 @@ mod test { while let Some(Ok(record)) = result.next() { if let Some(element) = record.get(None).unwrap().as_vertex() { result_ids.push(element.id() as usize) - } else if let Some(obj) = record.get(None).unwrap().as_object() { - assert_eq!(obj, &Object::None); + } else if record.get(None).unwrap().is_none() { none_cnt += 1; } } @@ -131,8 +129,7 @@ mod test { println!("record: {:?}", record); if let Some(element) = record.get(None).unwrap().as_vertex() { result_ids.push(element.id() as usize) - } else if let Some(obj) = record.get(None).unwrap().as_object() { - assert_eq!(obj, &Object::None); + } else if record.get(None).unwrap().is_none() { none_cnt += 1; } } @@ -168,8 +165,7 @@ mod test { while let Some(Ok(record)) = result.next() { if let Some(e) = record.get(None).unwrap().as_edge() { result_edges.push((e.src_id as usize, e.dst_id as usize)); - } else if let Some(obj) = record.get(None).unwrap().as_object() { - assert_eq!(obj, &Object::None); + } else if record.get(None).unwrap().is_none() { none_cnt += 1; } } @@ -268,11 +264,8 @@ mod test { while let Some(Ok(record)) = result.next() { if let Some(element) = record.get(None).unwrap().as_vertex() { result_ids.push(element.id() as usize); - } else if let Some(obj) = record.get(None).unwrap().as_object() { - assert_eq!(obj, &Object::None); + } else if record.get(None).unwrap().is_none() { none_cnt += 1; - } else { - unreachable!() } } result_ids.sort(); diff --git a/interactive_engine/executor/ir/runtime/src/process/entry.rs b/interactive_engine/executor/ir/runtime/src/process/entry.rs index ef73406c2c9d..3cd47355ed69 100644 --- a/interactive_engine/executor/ir/runtime/src/process/entry.rs +++ b/interactive_engine/executor/ir/runtime/src/process/entry.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use ahash::HashMap; use dyn_type::{BorrowObject, Object}; +use graph_proxy::apis::graph::NULL_ID; use graph_proxy::apis::VertexOrEdge; use graph_proxy::apis::{Edge, Element, GraphElement, GraphPath, PropertyValue, Vertex, ID}; use ir_common::error::ParsePbError; @@ -51,6 +52,8 @@ pub enum EntryType { Intersection, /// Type of collection consisting of entries Collection, + /// A Null graph element entry type + Null, } pub trait Entry: Debug + Send + Sync + AsAny + Element { @@ -104,6 +107,7 @@ impl DynEntry { .as_object() .map(|obj| obj.eq(&Object::None)) .unwrap_or(false), + EntryType::Null => true, _ => false, } } @@ -184,6 +188,9 @@ impl Encode for DynEntry { .unwrap() .write_to(writer)?; } + EntryType::Null => { + writer.write_u8(9)?; + } } Ok(()) } @@ -225,6 +232,7 @@ impl Decode for DynEntry { let general_intersect = GeneralIntersectionEntry::read_from(reader)?; Ok(DynEntry::new(general_intersect)) } + 9 => Ok(DynEntry::new(NullEntry)), _ => unreachable!(), } } @@ -247,7 +255,7 @@ impl Element for DynEntry { impl GraphElement for DynEntry { fn id(&self) -> ID { match self.get_type() { - EntryType::Vertex | EntryType::Edge | EntryType::Path => { + EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => { self.inner.as_graph_element().unwrap().id() } _ => unreachable!(), @@ -256,7 +264,7 @@ impl GraphElement for DynEntry { fn label(&self) -> Option { match self.get_type() { - EntryType::Vertex | EntryType::Edge | EntryType::Path => { + EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => { self.inner.as_graph_element().unwrap().label() } _ => unreachable!(), @@ -265,7 +273,7 @@ impl GraphElement for DynEntry { fn get_property(&self, key: &NameOrId) -> Option { match self.get_type() { - EntryType::Vertex | EntryType::Edge | EntryType::Path => self + EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => self .inner .as_graph_element() .unwrap() @@ -276,7 +284,7 @@ impl GraphElement for DynEntry { fn get_all_properties(&self) -> Option> { match self.get_type() { - EntryType::Vertex | EntryType::Edge | EntryType::Path => self + EntryType::Vertex | EntryType::Edge | EntryType::Path | EntryType::Null => self .inner .as_graph_element() .unwrap() @@ -306,6 +314,7 @@ impl Hash for DynEntry { .as_any_ref() .downcast_ref::() .hash(state), + EntryType::Null => self.hash(state), } } } @@ -335,6 +344,7 @@ impl PartialEq for DynEntry { .as_any_ref() .downcast_ref::() .eq(&other.as_any_ref().downcast_ref::()), + EntryType::Null => other.get_type() == EntryType::Null, } } else { false @@ -373,6 +383,7 @@ impl PartialOrd for DynEntry { .as_any_ref() .downcast_ref::() .partial_cmp(&other.as_any_ref().downcast_ref::()), + EntryType::Null => None, } } else { None @@ -548,6 +559,50 @@ impl Decode for CollectionEntry { } } +// NullEntry represents a null graph element, e.g., a null vertex generated by optional edge_expand. +#[derive(Debug, Clone, Default, PartialEq, PartialOrd, Eq, Hash)] +pub struct NullEntry; + +impl_as_any!(NullEntry); + +impl Entry for NullEntry { + fn get_type(&self) -> EntryType { + EntryType::Null + } +} + +impl Element for NullEntry { + fn as_graph_element(&self) -> Option<&dyn GraphElement> { + Some(self) + } + + fn len(&self) -> usize { + 0 + } + + fn as_borrow_object(&self) -> BorrowObject { + BorrowObject::None + } +} + +impl GraphElement for NullEntry { + fn id(&self) -> ID { + NULL_ID + } + + fn label(&self) -> Option { + None + } + + fn get_property(&self, _key: &NameOrId) -> Option { + None + } + + fn get_all_properties(&self) -> Option> { + None + } +} + impl TryFrom for DynEntry { type Error = ParsePbError; fn try_from(e: result_pb::Element) -> Result { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs index 3b9104b38711..4675c99d596a 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs @@ -25,7 +25,7 @@ use ir_common::KeyId; use pegasus::api::function::{DynIter, FlatMapFunction, FnResult}; use crate::error::{FnExecError, FnGenError, FnGenResult}; -use crate::process::entry::{Entry, EntryType}; +use crate::process::entry::{Entry, EntryType, NullEntry}; use crate::process::operator::flatmap::FlatMapFuncGen; use crate::process::record::{Record, RecordExpandIter, RecordPathExpandIter}; @@ -50,7 +50,7 @@ impl FlatMapFunction for EdgeExpandOperator< // the case of expand edge, and get end vertex; ExpandOpt::Vertex => { if self.is_optional && iter.peek().is_none() { - input.append(Object::None, self.alias); + input.append(NullEntry, self.alias); Ok(Box::new(vec![input].into_iter())) } else { let neighbors_iter = iter.map(|e| { @@ -74,7 +74,7 @@ impl FlatMapFunction for EdgeExpandOperator< // the case of expand neighbors, including edges/vertices ExpandOpt::Edge => { if self.is_optional && iter.peek().is_none() { - input.append(Object::None, self.alias); + input.append(NullEntry, self.alias); Ok(Box::new(vec![input].into_iter())) } else { Ok(Box::new(RecordExpandIter::new( diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index ef68632cab80..5d44af1b7d40 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -15,7 +15,6 @@ use std::convert::TryInto; -use dyn_type::Object; use graph_proxy::apis::GraphElement; use graph_proxy::apis::{get_graph, DynDetails, GraphPath, QueryParams, Vertex}; use graph_proxy::utils::expr::eval_pred::EvalPred; @@ -26,7 +25,7 @@ use ir_common::{KeyId, LabelId}; use pegasus::api::function::{FilterMapFunction, FnResult}; use crate::error::{FnExecError, FnExecResult, FnGenError, FnGenResult}; -use crate::process::entry::{DynEntry, Entry}; +use crate::process::entry::{DynEntry, Entry, NullEntry}; use crate::process::operator::map::FilterMapFuncGen; use crate::process::record::Record; @@ -118,16 +117,9 @@ impl FilterMapFunction for GetVertexOperator { } else { Err(FnExecError::unexpected_data_error("unreachable path end entry in GetV"))? } - } else if let Some(obj) = entry.as_object() { - if Object::None.eq(obj) { - input.append(Object::None, self.alias); - Ok(Some(input)) - } else { - Err(FnExecError::unexpected_data_error(&format!( - "Can only apply `GetV` on an object that is not None. The entry is {:?}", - entry - )))? - } + } else if entry.is_none() { + input.append(NullEntry, self.alias); + Ok(Some(input)) } else { Err(FnExecError::unexpected_data_error( &format!( "Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", entry @@ -251,27 +243,20 @@ impl FilterMapFunction for AuxiliaOperator { } else { return Ok(None); } - } else if let Some(obj) = entry.as_object() { - if Object::None.eq(obj) { - if let Some(predicate) = &self.query_params.filter { - let res = predicate - .eval_bool(Some(&input)) - .map_err(|e| FnExecError::from(e))?; - if res { - input.append(Object::None, self.alias); - return Ok(Some(input)); - } else { - return Ok(None); - } - } else { - input.append(Object::None, self.alias); + } else if entry.is_none() { + if let Some(predicate) = &self.query_params.filter { + let res = predicate + .eval_bool(Some(&input)) + .map_err(|e| FnExecError::from(e))?; + if res { + input.append(NullEntry, self.alias); return Ok(Some(input)); + } else { + return Ok(None); } } else { - Err(FnExecError::unexpected_data_error(&format!( - "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}", - entry - )))? + input.append(NullEntry, self.alias); + return Ok(Some(input)); } } else { Err(FnExecError::unexpected_data_error(&format!( diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs index 426a2c2e782a..9fee81e91205 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs @@ -252,6 +252,7 @@ impl RecordSinkEncoder { EntryType::Pair => { unreachable!() } + EntryType::Null => Some(result_pb::element::Inner::Object(Object::None.into())), }; result_pb::Element { inner } }