Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): support unfold path entry in GIE Runtime #3236

Merged
merged 10 commits into from
Sep 22, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,7 @@ The unfold()-step unrolls an iterator, iterable or map into a linear form.
g.V().fold().unfold().values("id")
g.V().fold().as("a").unfold().values("id")
g.V().has("name", "marko").fold().as("a").select("a").unfold().values("id")
g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V').unfold()
```
## Syntactic Sugars
The following steps are extended to denote more complex situations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ private SinkArg getSinkArg(InterOpCollection opCollection) {
continue;
} else if (cur instanceof ExpandOp
|| cur instanceof ScanFusionOp
|| cur instanceof GetVOp) {
|| cur instanceof GetVOp
|| cur instanceof UnfoldOp) {
sinkArg.addColumnName(ArgUtils.asNoneNameOrId());
break;
} else if (cur instanceof ProjectOp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void configure(final Object... keyValues) {
throw new ExtendGremlinStepException(
"value "
+ originalVal
+ " is invalid, use ALL_V, END_V, ALL_VE instead (case"
+ " is invalid, use ALL_V, END_V, ALL_V_E instead (case"
+ " insensitive)");
}
} else if (key.equals("Until")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ impl GraphPath {
}
}

pub fn get_path(&self) -> Option<&Vec<VertexOrEdge>> {
match self {
GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) => Some(p),
GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None,
}
}

pub fn take_path(self) -> Option<Vec<VertexOrEdge>> {
match self {
GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) => Some(p),
Expand Down Expand Up @@ -360,6 +367,8 @@ impl Decode for VertexOrEdge {
}
}

impl_as_any!(VertexOrEdge);

impl Encode for GraphPath {
fn write_to<W: WriteExt>(&self, writer: &mut W) -> std::io::Result<()> {
match self {
Expand Down
118 changes: 118 additions & 0 deletions interactive_engine/executor/ir/integrated/tests/pathxd_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,4 +955,122 @@ mod test {
result_collection.sort();
assert_eq!(result_collection, expected_result_paths);
}

// g.V().hasLabel("person").both("2..3", "knows").unfold()
fn init_path_expand_unfold_request(result_opt: i32) -> JobRequest {
let source_opr = pb::Scan {
scan_opt: 0,
alias: None,
params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)),
idx_predicate: None,
meta_data: None,
};

let edge_expand = pb::EdgeExpand {
v_tag: None,
direction: 2,
params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)),
expand_opt: 0,
alias: None,
meta_data: None,
};

let path_expand_opr = pb::PathExpand {
base: Some(edge_expand.into()),
start_tag: None,
alias: None,
hop_range: Some(pb::Range { lower: 2, upper: 3 }),
path_opt: 0, // Arbitrary
result_opt,
condition: None,
};

let unfold_opr = pb::Unfold { tag: None, alias: None, meta_data: None };

let mut job_builder = JobBuilder::default();
job_builder.add_scan_source(source_opr);
job_builder.shuffle(None);
job_builder.path_expand(path_expand_opr);
job_builder.unfold(unfold_opr);
job_builder.sink(default_sink_pb());

job_builder.build().unwrap()
}

#[test]
fn path_expand_allv_unfold_test() {
initialize();
let request = init_path_expand_unfold_request(1); // all v
let mut results = submit_query(request, 2);

let mut expected_result_collection: Vec<String> = vec![
vec!["v1", "v2", "v1"],
vec!["v1", "v4", "v1"],
vec!["v2", "v1", "v2"],
vec!["v2", "v1", "v4"],
vec!["v4", "v1", "v2"],
vec!["v4", "v1", "v4"],
]
.into_iter()
.flat_map(|ids| ids.into_iter().map(|id| id.to_string()))
.collect();
let mut result_collection = vec![];

while let Some(result) = results.next() {
match result {
Ok(res) => {
let entry = parse_result(res).unwrap();
if let Some(v) = entry.get(None).unwrap().as_vertex() {
result_collection.push(format!("v{}", v.id()));
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}
expected_result_collection.sort();
result_collection.sort();
assert_eq!(result_collection, expected_result_collection);
}

#[test]
fn path_expand_allve_unfold_test() {
initialize();
let request = init_path_expand_unfold_request(2); // all ve
let mut results = submit_query(request, 2);

let mut expected_result_collection: Vec<String> = vec![
vec!["v1", "e[1->2]", "v2", "e[1->2]", "v1"],
vec!["v1", "e[1->4]", "v4", "e[1->4]", "v1"],
vec!["v2", "e[1->2]", "v1", "e[1->2]", "v2"],
vec!["v2", "e[1->2]", "v1", "e[1->4]", "v4"],
vec!["v4", "e[1->4]", "v1", "e[1->2]", "v2"],
vec!["v4", "e[1->4]", "v1", "e[1->4]", "v4"],
]
.into_iter()
.flat_map(|ids| ids.into_iter().map(|id| id.to_string()))
.collect();

let mut result_collection = vec![];

while let Some(result) = results.next() {
match result {
Ok(res) => {
let entry = parse_result(res).unwrap();
if let Some(v) = entry.get(None).unwrap().as_vertex() {
result_collection.push(format!("v{}", v.id()));
} else if let Some(e) = entry.get(None).unwrap().as_edge() {
result_collection.push(format!("e[{}->{}]", e.src_id, e.dst_id));
}
}
Err(e) => {
panic!("err result {:?}", e);
}
}
}
expected_result_collection.sort();
result_collection.sort();
assert_eq!(result_collection, expected_result_collection);
}
}
22 changes: 22 additions & 0 deletions interactive_engine/executor/ir/runtime/src/process/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::sync::Arc;

use ahash::HashMap;
use dyn_type::{BorrowObject, Object};
use graph_proxy::apis::VertexOrEdge;
use graph_proxy::apis::{Edge, Element, GraphElement, GraphPath, PropertyValue, Vertex, ID};
use ir_common::error::ParsePbError;
use ir_common::generated::results as result_pb;
Expand Down Expand Up @@ -362,6 +363,21 @@ impl Entry for Edge {
}
}

impl Entry for VertexOrEdge {
fn get_type(&self) -> EntryType {
match self {
VertexOrEdge::V(_) => EntryType::Vertex,
VertexOrEdge::E(_) => EntryType::Edge,
}
}
fn as_vertex(&self) -> Option<&Vertex> {
self.as_vertex()
}
fn as_edge(&self) -> Option<&Edge> {
self.as_edge()
}
}

impl Entry for Object {
fn get_type(&self) -> EntryType {
EntryType::Object
Expand Down Expand Up @@ -492,6 +508,12 @@ impl From<Edge> for DynEntry {
}
}

impl From<VertexOrEdge> for DynEntry {
fn from(e: VertexOrEdge) -> Self {
DynEntry::new(e)
}
}

impl From<GraphPath> for DynEntry {
fn from(p: GraphPath) -> Self {
DynEntry::new(p)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,24 @@ impl FlatMapFunction<Record, Record> for UnfoldOperator {
}
Ok(Box::new(res.into_iter()))
}
EntryType::Path => Err(FnExecError::unsupported_error(&format!(
"unfold path entry {:?} in UnfoldOperator",
input.get(self.tag),
)))?,
EntryType::Path => {
let entry = input.get(self.tag).unwrap();
let path = entry
.as_graph_path()
.ok_or(FnExecError::unexpected_data_error("downcast path entry in UnfoldOperatro"))?;
let path_vec = if let Some(path) = path.get_path() {
path.clone()
} else {
vec![path.get_path_end().clone()]
};
let mut res = Vec::with_capacity(path_vec.len());
for item in path_vec {
let mut new_entry = input.clone();
new_entry.append(item, self.alias);
res.push(new_entry)
}
Ok(Box::new(res.into_iter()))
}
_ => Err(FnExecError::unexpected_data_error(&format!(
"unfold entry {:?} in UnfoldOperator",
input.get(self.tag)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ impl MapFunction<Record, Vec<u8>> for RecordSinkEncoder {
}

let record_pb = result_pb::Record { columns: sink_columns };
debug!("sink record_pb {:?}", record_pb);
let results = result_pb::Results { inner: Some(result_pb::results::Inner::Record(record_pb)) };
Ok(results.encode_to_vec())
}
Expand Down