diff --git a/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md b/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md index cb2bb60237c6..31e9ee94f1b7 100644 --- a/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md +++ b/docs/interactive_engine/tinkerpop/supported_gremlin_steps.md @@ -571,6 +571,7 @@ The unfold()-step unrolls an iterator, iterable or map into a linear form. g.V().fold().unfold().values("id") g.V().fold().as("a").unfold().values("id") g.V().has("name", "marko").fold().as("a").select("a").unfold().values("id") +g.V().out("1..3", "knows").with('RESULT_OPT', 'ALL_V').unfold() ``` ## Syntactic Sugars The following steps are extended to denote more complex situations. diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/process/SinkOutputProcessor.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/process/SinkOutputProcessor.java index eb21db41bacc..28060558c1a4 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/process/SinkOutputProcessor.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/common/intermediate/process/SinkOutputProcessor.java @@ -64,7 +64,8 @@ private SinkArg getSinkArg(InterOpCollection opCollection) { continue; } else if (cur instanceof ExpandOp || cur instanceof ScanFusionOp - || cur instanceof GetVOp) { + || cur instanceof GetVOp + || cur instanceof UnfoldOp) { sinkArg.addColumnName(ArgUtils.asNoneNameOrId()); break; } else if (cur instanceof ProjectOp) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java index 5e87bba24d28..c51d5c10d8fe 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/step/PathExpandStep.java @@ -103,7 +103,7 @@ public void configure(final Object... keyValues) { throw new ExtendGremlinStepException( "value " + originalVal - + " is invalid, use ALL_V, END_V, ALL_VE instead (case" + + " is invalid, use ALL_V, END_V, ALL_V_E instead (case" + " insensitive)"); } } else if (key.equals("Until")) { diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs index f123cfc1f582..298dd430c6a2 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs @@ -159,6 +159,13 @@ impl GraphPath { } } + pub fn get_path(&self) -> Option<&Vec> { + match self { + GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) => Some(p), + GraphPath::EndV(_) | GraphPath::SimpleEndV(_) => None, + } + } + pub fn take_path(self) -> Option> { match self { GraphPath::AllPath(p) | GraphPath::SimpleAllPath(p) => Some(p), @@ -360,6 +367,8 @@ impl Decode for VertexOrEdge { } } +impl_as_any!(VertexOrEdge); + impl Encode for GraphPath { fn write_to(&self, writer: &mut W) -> std::io::Result<()> { match self { diff --git a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs index 60cf06645a97..532485542738 100644 --- a/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs +++ b/interactive_engine/executor/ir/integrated/tests/pathxd_test.rs @@ -955,4 +955,122 @@ mod test { result_collection.sort(); assert_eq!(result_collection, expected_result_paths); } + + // g.V().hasLabel("person").both("2..3", "knows").unfold() + fn init_path_expand_unfold_request(result_opt: i32) -> JobRequest { + let source_opr = pb::Scan { + scan_opt: 0, + alias: None, + params: Some(query_params(vec![PERSON_LABEL.into()], vec![], None)), + idx_predicate: None, + meta_data: None, + }; + + let edge_expand = pb::EdgeExpand { + v_tag: None, + direction: 2, + params: Some(query_params(vec![KNOWS_LABEL.into()], vec![], None)), + expand_opt: 0, + alias: None, + meta_data: None, + }; + + let path_expand_opr = pb::PathExpand { + base: Some(edge_expand.into()), + start_tag: None, + alias: None, + hop_range: Some(pb::Range { lower: 2, upper: 3 }), + path_opt: 0, // Arbitrary + result_opt, + condition: None, + }; + + let unfold_opr = pb::Unfold { tag: None, alias: None, meta_data: None }; + + let mut job_builder = JobBuilder::default(); + job_builder.add_scan_source(source_opr); + job_builder.shuffle(None); + job_builder.path_expand(path_expand_opr); + job_builder.unfold(unfold_opr); + job_builder.sink(default_sink_pb()); + + job_builder.build().unwrap() + } + + #[test] + fn path_expand_allv_unfold_test() { + initialize(); + let request = init_path_expand_unfold_request(1); // all v + let mut results = submit_query(request, 2); + + let mut expected_result_collection: Vec = vec![ + vec!["v1", "v2", "v1"], + vec!["v1", "v4", "v1"], + vec!["v2", "v1", "v2"], + vec!["v2", "v1", "v4"], + vec!["v4", "v1", "v2"], + vec!["v4", "v1", "v4"], + ] + .into_iter() + .flat_map(|ids| ids.into_iter().map(|id| id.to_string())) + .collect(); + let mut result_collection = vec![]; + + while let Some(result) = results.next() { + match result { + Ok(res) => { + let entry = parse_result(res).unwrap(); + if let Some(v) = entry.get(None).unwrap().as_vertex() { + result_collection.push(format!("v{}", v.id())); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + expected_result_collection.sort(); + result_collection.sort(); + assert_eq!(result_collection, expected_result_collection); + } + + #[test] + fn path_expand_allve_unfold_test() { + initialize(); + let request = init_path_expand_unfold_request(2); // all ve + let mut results = submit_query(request, 2); + + let mut expected_result_collection: Vec = vec![ + vec!["v1", "e[1->2]", "v2", "e[1->2]", "v1"], + vec!["v1", "e[1->4]", "v4", "e[1->4]", "v1"], + vec!["v2", "e[1->2]", "v1", "e[1->2]", "v2"], + vec!["v2", "e[1->2]", "v1", "e[1->4]", "v4"], + vec!["v4", "e[1->4]", "v1", "e[1->2]", "v2"], + vec!["v4", "e[1->4]", "v1", "e[1->4]", "v4"], + ] + .into_iter() + .flat_map(|ids| ids.into_iter().map(|id| id.to_string())) + .collect(); + + let mut result_collection = vec![]; + + while let Some(result) = results.next() { + match result { + Ok(res) => { + let entry = parse_result(res).unwrap(); + if let Some(v) = entry.get(None).unwrap().as_vertex() { + result_collection.push(format!("v{}", v.id())); + } else if let Some(e) = entry.get(None).unwrap().as_edge() { + result_collection.push(format!("e[{}->{}]", e.src_id, e.dst_id)); + } + } + Err(e) => { + panic!("err result {:?}", e); + } + } + } + expected_result_collection.sort(); + result_collection.sort(); + assert_eq!(result_collection, expected_result_collection); + } } diff --git a/interactive_engine/executor/ir/runtime/src/process/entry.rs b/interactive_engine/executor/ir/runtime/src/process/entry.rs index 48ffadd5e35d..659c88f7c907 100644 --- a/interactive_engine/executor/ir/runtime/src/process/entry.rs +++ b/interactive_engine/executor/ir/runtime/src/process/entry.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use ahash::HashMap; use dyn_type::{BorrowObject, Object}; +use graph_proxy::apis::VertexOrEdge; use graph_proxy::apis::{Edge, Element, GraphElement, GraphPath, PropertyValue, Vertex, ID}; use ir_common::error::ParsePbError; use ir_common::generated::results as result_pb; @@ -362,6 +363,21 @@ impl Entry for Edge { } } +impl Entry for VertexOrEdge { + fn get_type(&self) -> EntryType { + match self { + VertexOrEdge::V(_) => EntryType::Vertex, + VertexOrEdge::E(_) => EntryType::Edge, + } + } + fn as_vertex(&self) -> Option<&Vertex> { + self.as_vertex() + } + fn as_edge(&self) -> Option<&Edge> { + self.as_edge() + } +} + impl Entry for Object { fn get_type(&self) -> EntryType { EntryType::Object @@ -492,6 +508,12 @@ impl From for DynEntry { } } +impl From for DynEntry { + fn from(e: VertexOrEdge) -> Self { + DynEntry::new(e) + } +} + impl From for DynEntry { fn from(p: GraphPath) -> Self { DynEntry::new(p) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs index f1d3c9b72526..e47c5d6ce6e7 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs @@ -81,10 +81,24 @@ impl FlatMapFunction for UnfoldOperator { } Ok(Box::new(res.into_iter())) } - EntryType::Path => Err(FnExecError::unsupported_error(&format!( - "unfold path entry {:?} in UnfoldOperator", - input.get(self.tag), - )))?, + EntryType::Path => { + let entry = input.get(self.tag).unwrap(); + let path = entry + .as_graph_path() + .ok_or(FnExecError::unexpected_data_error("downcast path entry in UnfoldOperatro"))?; + let path_vec = if let Some(path) = path.get_path() { + path.clone() + } else { + vec![path.get_path_end().clone()] + }; + let mut res = Vec::with_capacity(path_vec.len()); + for item in path_vec { + let mut new_entry = input.clone(); + new_entry.append(item, self.alias); + res.push(new_entry) + } + Ok(Box::new(res.into_iter())) + } _ => Err(FnExecError::unexpected_data_error(&format!( "unfold entry {:?} in UnfoldOperator", input.get(self.tag) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs index 30c064c4fd0f..360706b21e5e 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs @@ -254,6 +254,7 @@ impl MapFunction> for RecordSinkEncoder { } let record_pb = result_pb::Record { columns: sink_columns }; + debug!("sink record_pb {:?}", record_pb); let results = result_pb::Results { inner: Some(result_pb::results::Inner::Record(record_pb)) }; Ok(results.encode_to_vec()) }