Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): support project properties of a path #3213

Merged
merged 7 commits into from
Sep 13, 2023
11 changes: 10 additions & 1 deletion docs/interactive_engine/tinkerpop/supported_gremlin_steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ The following steps are extended to denote more complex situations.
In Graph querying, expanding a multiple-hops path from a starting point is called `PathExpand`, which is commonly used in graph scenarios. In addition, there are different requirements for expanding strategies in different scenarios, i.e. it is required to output a simple path or all vertices explored along the expanding path. We introduce the with()-step to configure the corresponding behaviors of the `PathExpand`-step.

#### out()
Expand a multiple-hops path along the outgoing edges, which length is within the given range.
Expand a multiple-hops path along the outgoing edges, which length is within the given range.

Parameters: </br>
lengthRange - the lower and the upper bounds of the path length, </br> edgeLabels - the edge labels to traverse.
Expand All @@ -603,6 +603,9 @@ g.V().out("1..10", "knows")
# expand hops within the range of [1, 10) along the outgoing edges which label is `knows` or `created`,
# vertices can be duplicated and only the end vertex should be kept
g.V().out("1..10", "knows", "created")
# expand hops within the range of [1, 10) along the outgoing edges,
# and project the properties "id" and "name" of every vertex along the path
g.V().out("1..10").with('RESULT_OPT', 'ALL_V').values("name")
```
Running Example:
```bash
Expand All @@ -615,6 +618,12 @@ gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V_E')
gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'END_V').endV()
==>v[2]
==>v[4]
gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V').values("name")
==>[marko, vadas]
==>[marko, josh]
gremlin> g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V').valueMap("id","name")
==>{id=[[1, 2]], name=[[marko, vadas]]}
==>{id=[[1, 4]], name=[[marko, josh]]}
```
#### in()
Expand a multiple-hops path along the incoming edges, which length is within the given range.
Expand Down
4 changes: 0 additions & 4 deletions interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1295,10 +1295,6 @@ impl AsLogical for pb::PathExpand {
let tag_id = get_or_set_tag_id(alias, plan_meta)?;
plan_meta.set_tag_nodes(tag_id, vec![plan_meta.get_curr_node()]);
}
// PathExpand would never require adding columns
plan_meta
.curr_node_meta_mut()
.set_columns_opt(ColumnsOpt::None);

Ok(())
}
Expand Down
230 changes: 224 additions & 6 deletions interactive_engine/executor/ir/core/src/plan/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,16 @@ impl AsPhysical for pb::PathExpand {
if range.upper <= range.lower || range.lower < 0 || range.upper <= 0 {
Err(IrError::InvalidRange(range.lower, range.upper))?
}
// post_process for path_expand, including add repartition, and the properties need to cache, if necessary.
let mut path_expand = self.clone();
path_expand.post_process(builder, plan_meta)?;
// PathExpand includes cases of:
// 1) EdgeExpand(Opt=Edge) + GetV(NoFilter),
// 1) EdgeExpand(Opt=Edge) + GetV(NoFilterNorColumn),
// This would be translated into EdgeExpand(Opt=Vertex);
// 2) EdgeExpand(Opt=Edge) + GetV(WithFilter),
// 2) EdgeExpand(Opt=Edge) + GetV(WithFilterOrColumn),
// This would be translated into EdgeExpand(Opt=Vertex) + GetV(Opt=Self);
// 3) EdgeExpand(Opt=Vertex) + GetV(WithFilter and Opt=Self) TODO: would this case exist after match?
// This would be remain unchanged.
let mut path_expand = self.clone();
if let Some(expand_base) = path_expand.base.as_mut() {
let edge_expand = expand_base.edge_expand.as_mut();
let getv = expand_base.get_v.as_mut();
Expand Down Expand Up @@ -286,8 +288,6 @@ impl AsPhysical for pb::PathExpand {
edge_expand, getv
)));
}

path_expand.post_process(builder, plan_meta)?;
builder.path_expand(path_expand);

Ok(())
Expand All @@ -297,9 +297,142 @@ impl AsPhysical for pb::PathExpand {
}

fn post_process(&mut self, builder: &mut PlanBuilder, plan_meta: &mut PlanMeta) -> IrResult<()> {
post_process_vars(builder, plan_meta, false)?;
if plan_meta.is_partition() {
builder.shuffle(self.start_tag.clone());
if let Some(node_meta) = plan_meta.get_curr_node_meta() {
let columns = node_meta.get_columns();
let is_all_columns = node_meta.is_all_columns();
if !columns.is_empty() || is_all_columns {
let new_params = pb::QueryParams {
tables: vec![],
columns: columns
.clone()
.into_iter()
.map(|column| column.into())
.collect(),
is_all_columns,
limit: None,
predicate: None,
sample_ratio: 1.0,
extra: Default::default(),
};
// Notice that, when properties of a `Path` is needed, we need to cache the properties of the vertices/edges in the path.
// For example, `g.V().out("1..3").with("RESULT_OPT, "ALL_V").values("name")`, we need to cache the property of "name" in all the vertices in the path.
// If "RESULT_OPT" is "ALL_V_E", we assume the property of the edges in the path is also needed.

// first, cache properties on the path start vertex.
let start_auxilia = pb::GetV {
tag: self.start_tag.clone(),
opt: 4, //ItSelf
params: Some(new_params.clone()),
alias: self.start_tag.clone(),
meta_data: None,
};
builder.get_v(start_auxilia);

// then, cache properties during the path expanding.
let result_opt: pb::path_expand::ResultOpt =
unsafe { std::mem::transmute(self.result_opt) };
let expand_base = self
.base
.as_mut()
.ok_or(IrError::MissingData("PathExpand::base".to_string()))?;
let getv = expand_base.get_v.as_mut();
let edge_expand = expand_base
.edge_expand
.as_mut()
.ok_or(IrError::MissingData("PathExpand::base.edge_expand".to_string()))?;
match result_opt {
pb::path_expand::ResultOpt::EndV => {
// do nothing
}
// if the result_opt is ALL_V or ALL_V_E, we need to cache the properties of the vertices, or vertices and edges, in the path.
pb::path_expand::ResultOpt::AllV => {
if let Some(getv) = getv {
// case 1:expand (edge) + getv, then cache properties in getv
if let Some(params) = getv.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
getv.params = Some(new_params.clone());
}
} else {
// case 2: expand (vertex) + no getv, then cache properties with an extra getv (self)
if edge_expand.expand_opt != pb::edge_expand::ExpandOpt::Vertex as i32 {
return Err(IrError::ParsePbError(
format!("Unexpected ExpandBase in PathExpand {:?}", expand_base)
.into(),
));
}

let auxilia = pb::GetV {
tag: None,
opt: 4, //ItSelf
params: Some(new_params.clone()),
alias: edge_expand.alias.clone(),
meta_data: edge_expand.meta_data.clone(),
};
expand_base.get_v = Some(auxilia);
}
}
pb::path_expand::ResultOpt::AllVE => {
if let Some(getv) = getv {
// case 1:expand (edge) + getv, then cache properties in both expand and getv.
if let Some(params) = getv.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
getv.params = Some(new_params.clone());
}
if let Some(params) = edge_expand.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
edge_expand.params = Some(new_params.clone());
}
} else {
// case 2: expand (vertex) + no getv, then cache properties of edges in expand, and properties of vertices with an extra getv (self)
if edge_expand.expand_opt != pb::edge_expand::ExpandOpt::Vertex as i32 {
return Err(IrError::ParsePbError(
format!("Unexpected ExpandBase in PathExpand {:?}", expand_base)
.into(),
));
}
if let Some(params) = edge_expand.params.as_mut() {
params.columns = columns
.clone()
.into_iter()
.map(|column| column.into())
.collect();
params.is_all_columns = is_all_columns;
} else {
edge_expand.params = Some(new_params.clone());
}
let auxilia = pb::GetV {
tag: None,
opt: 4, //ItSelf
params: Some(new_params.clone()),
alias: edge_expand.alias.clone(),
meta_data: edge_expand.meta_data.clone(),
};
expand_base.get_v = Some(auxilia);
}
}
}
}
}
}
Ok(())
}
Expand Down Expand Up @@ -2917,4 +3050,89 @@ mod test {

assert_eq!(builder, expected_builder);
}

#[test]
fn path_expand_project_as_physical() {
let source_opr = pb::Scan {
scan_opt: 0,
alias: None,
params: Some(query_params(vec!["person".into()], vec![])),
idx_predicate: None,
meta_data: None,
};

let edge_expand = pb::EdgeExpand {
v_tag: None,
direction: 0,
params: Some(query_params(vec!["knows".into()], vec![])),
expand_opt: 0, // vertex
alias: None,
meta_data: None,
};

let path_opr = pb::PathExpand {
base: Some(edge_expand.clone().into()),
start_tag: None,
alias: None,
hop_range: Some(pb::Range { lower: 1, upper: 4 }),
path_opt: 0, // ARBITRARY
result_opt: 1, // ALL_V
condition: None,
};

let project_opr = pb::Project {
mappings: vec![ExprAlias {
expr: Some(str_to_expr_pb("@.name".to_string()).unwrap()),
alias: None,
}],
is_append: true,
meta_data: vec![],
};

let mut logical_plan = LogicalPlan::with_node(Node::new(0, source_opr.clone().into()));
logical_plan
.append_operator_as_node(path_opr.clone().into(), vec![0])
.unwrap(); // node 1
logical_plan
.append_operator_as_node(project_opr.clone().into(), vec![1])
.unwrap(); // node 2

// Case without partition
let mut builder = PlanBuilder::default();
let mut plan_meta = logical_plan.get_meta().clone();
logical_plan
.add_job_builder(&mut builder, &mut plan_meta)
.unwrap();

let mut expected_builder = PlanBuilder::default();
expected_builder.add_scan_source(source_opr.clone());
expected_builder.path_expand(path_opr.clone());
expected_builder.project(project_opr.clone());

assert_eq!(builder, expected_builder);

// Case with partition
let mut builder = PlanBuilder::default();
let mut plan_meta = logical_plan.get_meta().clone().with_partition();
logical_plan
.add_job_builder(&mut builder, &mut plan_meta)
.unwrap();

// translate `PathExpand(out("knows"))` to `auxilia("name") + PathExpand() with ExpandBase of out("knows")+auxilia("name")`
let mut path_expand = path_opr.clone();
path_expand.base.as_mut().unwrap().get_v =
Some(build_auxilia_with_tag_alias_columns(None, None, vec!["name".into()]));

let mut expected_builder = PlanBuilder::default();
expected_builder.add_scan_source(source_opr);
expected_builder.shuffle(None);
// post process for path expand: 1. cache properties of path start vertex; 2.
expected_builder.get_v(build_auxilia_with_tag_alias_columns(None, None, vec!["name".into()]));
expected_builder.path_expand(path_expand);
// postprocess for project
expected_builder.shuffle(None);
expected_builder.get_v(build_auxilia_with_tag_alias_columns(None, None, vec![]));
expected_builder.project(project_opr);
assert_eq!(builder, expected_builder);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use pegasus_common::downcast::AsAny;
use pegasus_common::impl_as_any;

use crate::apis::{Edge, Element, GraphElement, PropertyValue, Vertex, ID};
use crate::utils::expr::eval::Context;

#[derive(Clone, Debug, Hash, PartialEq, PartialOrd)]
pub enum VertexOrEdge {
Expand Down Expand Up @@ -219,6 +220,12 @@ impl GraphElement for VertexOrEdge {
}
}

impl Context<VertexOrEdge> for VertexOrEdge {
fn get(&self, _tag: Option<&NameOrId>) -> Option<&VertexOrEdge> {
Some(&self)
}
}

impl Element for GraphPath {
fn as_graph_element(&self) -> Option<&dyn GraphElement> {
Some(self)
Expand Down Expand Up @@ -256,11 +263,34 @@ impl GraphElement for GraphPath {
}

fn get_property(&self, key: &NameOrId) -> Option<PropertyValue> {
self.get_path_end().get_property(key)
match self {
GraphPath::AllPath(path) | GraphPath::SimpleAllPath(path) => {
let mut properties = vec![];
for v_or_e in path {
if let Some(p) = v_or_e.get_property(key) {
properties.push(p.try_to_owned().unwrap());
}
}
Some(PropertyValue::Owned(Object::Vector(properties)))
}

GraphPath::EndV((v_or_e, _)) | GraphPath::SimpleEndV((v_or_e, _, _)) => {
v_or_e.get_property(key)
}
}
}

fn get_all_properties(&self) -> Option<HashMap<NameOrId, Object>> {
self.get_path_end().get_all_properties()
match self {
GraphPath::AllPath(_) | GraphPath::SimpleAllPath(_) => {
// not supported yet.
None
}

GraphPath::EndV((v_or_e, _)) | GraphPath::SimpleEndV((v_or_e, _, _)) => {
v_or_e.get_all_properties()
}
}
}
}

Expand Down
Loading