IRJobAssembly {
base
)))
})?;
- if (pb::path_expand::ResultOpt::AllVE == unsafe { std::mem::transmute(path.result_opt) } || pb::path_expand::PathOpt::Trail == unsafe { std::mem::transmute(path.path_opt) })
+ if (pb::path_expand::ResultOpt::AllVE
+ == unsafe { std::mem::transmute(path.result_opt) }
+ || pb::path_expand::PathOpt::Trail == unsafe { std::mem::transmute(path.path_opt) })
&& pb::edge_expand::ExpandOpt::Vertex
== unsafe { std::mem::transmute(edge_expand.expand_opt) }
{
diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs
index 8c5446f709e8..3b9104b38711 100644
--- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs
+++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs
@@ -15,6 +15,7 @@
use std::convert::TryInto;
+use dyn_type::Object;
use graph_proxy::apis::{
get_graph, Direction, DynDetails, GraphElement, QueryParams, Statement, Vertex, ID,
};
@@ -33,6 +34,7 @@ pub struct EdgeExpandOperator {
alias: Option,
stmt: Box>,
expand_opt: ExpandOpt,
+ is_optional: bool,
}
impl FlatMapFunction for EdgeExpandOperator {
@@ -43,32 +45,46 @@ impl FlatMapFunction for EdgeExpandOperator<
match entry.get_type() {
EntryType::Vertex => {
let id = entry.id();
- let iter = self.stmt.exec(id)?;
+ let mut iter = self.stmt.exec(id)?.peekable();
match self.expand_opt {
// the case of expand edge, and get end vertex;
ExpandOpt::Vertex => {
- let neighbors_iter = iter.map(|e| {
- if let Some(e) = e.as_edge() {
- Vertex::new(
- e.get_other_id(),
- e.get_other_label().cloned(),
- DynDetails::default(),
- )
- } else {
- unreachable!()
- }
- });
- Ok(Box::new(RecordExpandIter::new(
- input,
- self.alias.as_ref(),
- Box::new(neighbors_iter),
- )))
+ if self.is_optional && iter.peek().is_none() {
+ input.append(Object::None, self.alias);
+ Ok(Box::new(vec![input].into_iter()))
+ } else {
+ let neighbors_iter = iter.map(|e| {
+ if let Some(e) = e.as_edge() {
+ Vertex::new(
+ e.get_other_id(),
+ e.get_other_label().cloned(),
+ DynDetails::default(),
+ )
+ } else {
+ unreachable!()
+ }
+ });
+ Ok(Box::new(RecordExpandIter::new(
+ input,
+ self.alias.as_ref(),
+ Box::new(neighbors_iter),
+ )))
+ }
}
// the case of expand neighbors, including edges/vertices
ExpandOpt::Edge => {
- Ok(Box::new(RecordExpandIter::new(input, self.alias.as_ref(), iter)))
+ if self.is_optional && iter.peek().is_none() {
+ input.append(Object::None, self.alias);
+ Ok(Box::new(vec![input].into_iter()))
+ } else {
+ Ok(Box::new(RecordExpandIter::new(
+ input,
+ self.alias.as_ref(),
+ Box::new(iter),
+ )))
+ }
}
- // the case of get degree. TODO: this case should be a `Map`
+ // the case of get degree.
ExpandOpt::Degree => {
let degree = iter.count();
input.append(object!(degree), self.alias);
@@ -77,12 +93,32 @@ impl FlatMapFunction for EdgeExpandOperator<
}
}
EntryType::Path => {
- let graph_path = entry
- .as_graph_path()
+ if self.is_optional {
+ Err(FnExecError::unsupported_error(
+ "Have not supported Optional Edge Expand in Path entry yet",
+ ))?
+ } else {
+ let graph_path = entry
+ .as_graph_path()
+ .ok_or_else(|| FnExecError::Unreachable)?;
+ let iter = self.stmt.exec(graph_path.get_path_end().id())?;
+ let curr_path = graph_path.clone();
+ Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter)))
+ }
+ }
+ EntryType::Object => {
+ let obj = entry
+ .as_object()
.ok_or_else(|| FnExecError::Unreachable)?;
- let iter = self.stmt.exec(graph_path.get_path_end().id())?;
- let curr_path = graph_path.clone();
- Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter)))
+ if Object::None.eq(obj) {
+ input.append(Object::None, self.alias);
+ Ok(Box::new(vec![input].into_iter()))
+ } else {
+ Err(FnExecError::unexpected_data_error(&format!(
+ "Cannot Expand from current entry {:?}",
+ entry
+ )))?
+ }
}
_ => Err(FnExecError::unexpected_data_error(&format!(
"Cannot Expand from current entry {:?}",
@@ -99,9 +135,6 @@ impl FlatMapFuncGen for pb::EdgeExpand {
fn gen_flat_map(
self,
) -> FnGenResult>>> {
- if self.is_optional {
- return Err(FnGenError::unsupported_error("optional edge expand in EdgeExpandOperator"));
- }
let graph = get_graph().ok_or_else(|| FnGenError::NullGraphError)?;
let start_v_tag = self.v_tag;
let edge_or_end_v_tag = self.alias;
@@ -127,6 +160,7 @@ impl FlatMapFuncGen for pb::EdgeExpand {
alias: edge_or_end_v_tag,
stmt,
expand_opt: ExpandOpt::Vertex,
+ is_optional: self.is_optional,
};
Ok(Box::new(edge_expand_operator))
} else {
@@ -137,6 +171,7 @@ impl FlatMapFuncGen for pb::EdgeExpand {
alias: edge_or_end_v_tag,
stmt,
expand_opt: ExpandOpt::Edge,
+ is_optional: self.is_optional,
};
Ok(Box::new(edge_expand_operator))
}
@@ -144,8 +179,13 @@ impl FlatMapFuncGen for pb::EdgeExpand {
_ => {
// Expand edges or degree
let stmt = graph.prepare_explore_edge(direction, &query_params)?;
- let edge_expand_operator =
- EdgeExpandOperator { start_v_tag, alias: edge_or_end_v_tag, stmt, expand_opt };
+ let edge_expand_operator = EdgeExpandOperator {
+ start_v_tag,
+ alias: edge_or_end_v_tag,
+ stmt,
+ expand_opt,
+ is_optional: self.is_optional,
+ };
Ok(Box::new(edge_expand_operator))
}
}
diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs
index 400dac9e39aa..b8bd8b5bb6f3 100644
--- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs
+++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs
@@ -13,6 +13,7 @@
//! See the License for the specific language governing permissions and
//! limitations under the License.
+use dyn_type::Object;
use graph_proxy::apis::{DynDetails, Element, Vertex};
use ir_common::generated::physical as pb;
use ir_common::KeyId;
@@ -57,29 +58,41 @@ impl FlatMapFunction for UnfoldOperator {
.as_any_ref()
.downcast_ref::()
{
- let mut res = Vec::with_capacity(intersection.len());
- for item in intersection.iter().cloned() {
- let mut new_entry = input.clone();
- new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias);
- res.push(new_entry);
+ let len = intersection.len();
+ if len == 0 {
+ input.append(Object::None, self.alias);
+ Ok(Box::new(vec![input].into_iter()))
+ } else {
+ let mut res = Vec::with_capacity(len);
+ for item in intersection.iter().cloned() {
+ let mut new_entry = input.clone();
+ new_entry.append(Vertex::new(item, None, DynDetails::default()), self.alias);
+ res.push(new_entry);
+ }
+ Ok(Box::new(res.into_iter()))
}
- Ok(Box::new(res.into_iter()))
} else if let Some(general_intersection) = entry
.as_any_ref()
.downcast_ref::()
{
- let mut res = Vec::with_capacity(general_intersection.len());
- for (vid, matchings) in general_intersection.matchings_iter() {
- for matching in matchings {
- let mut new_entry = input.clone();
- for (column, tag) in matching {
- new_entry.append(column.clone(), Some(tag));
+ let len = general_intersection.len();
+ if len == 0 {
+ input.append(Object::None, self.alias);
+ Ok(Box::new(vec![input].into_iter()))
+ } else {
+ let mut res = Vec::with_capacity(len);
+ for (vid, matchings) in general_intersection.matchings_iter() {
+ for matching in matchings {
+ let mut new_entry = input.clone();
+ for (column, tag) in matching {
+ new_entry.append(column.clone(), Some(tag));
+ }
+ new_entry.append(Vertex::new(vid, None, DynDetails::default()), self.alias);
+ res.push(new_entry);
}
- new_entry.append(Vertex::new(vid, None, DynDetails::default()), self.alias);
- res.push(new_entry);
}
+ Ok(Box::new(res.into_iter()))
}
- Ok(Box::new(res.into_iter()))
} else {
Err(FnExecError::unexpected_data_error(
"downcast intersection entry in UnfoldOperator",
diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs
index b07a23310bad..ef68632cab80 100644
--- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs
+++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs
@@ -15,6 +15,7 @@
use std::convert::TryInto;
+use dyn_type::Object;
use graph_proxy::apis::GraphElement;
use graph_proxy::apis::{get_graph, DynDetails, GraphPath, QueryParams, Vertex};
use graph_proxy::utils::expr::eval_pred::EvalPred;
@@ -117,6 +118,16 @@ impl FilterMapFunction for GetVertexOperator {
} else {
Err(FnExecError::unexpected_data_error("unreachable path end entry in GetV"))?
}
+ } else if let Some(obj) = entry.as_object() {
+ if Object::None.eq(obj) {
+ input.append(Object::None, self.alias);
+ Ok(Some(input))
+ } else {
+ Err(FnExecError::unexpected_data_error(&format!(
+ "Can only apply `GetV` on an object that is not None. The entry is {:?}",
+ entry
+ )))?
+ }
} else {
Err(FnExecError::unexpected_data_error( &format!(
"Can only apply `GetV` (`Auxilia` instead) on an edge or path entry, while the entry is {:?}", entry
@@ -240,6 +251,28 @@ impl FilterMapFunction for AuxiliaOperator {
} else {
return Ok(None);
}
+ } else if let Some(obj) = entry.as_object() {
+ if Object::None.eq(obj) {
+ if let Some(predicate) = &self.query_params.filter {
+ let res = predicate
+ .eval_bool(Some(&input))
+ .map_err(|e| FnExecError::from(e))?;
+ if res {
+ input.append(Object::None, self.alias);
+ return Ok(Some(input));
+ } else {
+ return Ok(None);
+ }
+ } else {
+ input.append(Object::None, self.alias);
+ return Ok(Some(input));
+ }
+ } else {
+ Err(FnExecError::unexpected_data_error(&format!(
+ "neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}",
+ entry
+ )))?
+ }
} else {
Err(FnExecError::unexpected_data_error(&format!(
"neither Vertex nor Edge entry is accessed in `Auxilia` operator, the entry is {:?}",