Skip to content

Commit

Permalink
refactor(interactive): refactor the implementation of PathExpand (#…
Browse files Browse the repository at this point in the history
…3159)

<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?

<!-- Please give a short brief about these changes. -->

As titled. Reimplement `PathExpand` by union results of multi-hops,
instead of using the `iterate_emit_until()`. In this case, the example
query in #3158 won't get stuck.

## Related issue number

<!-- Are there any issues opened that will be resolved by merging this
change? -->

#3158

Co-authored-by: Longbin Lai <[email protected]>
  • Loading branch information
BingqingLyu and longbinlai authored Sep 1, 2023
1 parent 9de14a9 commit 20be508
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions interactive_engine/executor/ir/runtime/src/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use ir_common::generated::physical as pb;
use ir_common::generated::physical::physical_opr::operator::OpKind;
use pegasus::api::function::*;
use pegasus::api::{
Collect, CorrelatedSubTask, Count, Dedup, EmitKind, Filter, Fold, FoldByKey, HasAny, IterCondition,
Iteration, Join, KeyBy, Limit, Map, Merge, Sink, SortBy, SortLimitBy,
Collect, CorrelatedSubTask, Count, Dedup, Filter, Fold, FoldByKey, HasAny, IterCondition, Iteration,
Join, KeyBy, Limit, Map, Merge, Sink, SortBy, SortLimitBy,
};
use pegasus::stream::Stream;
use pegasus::{BuildJobError, Worker};
Expand Down Expand Up @@ -604,8 +604,8 @@ impl<P: PartitionInfo, C: ClusterInfo> IRJobAssembly<P, C> {
}
let times = range.upper - range.lower - 1;
if times > 0 {
let mut until = IterCondition::max_iters(times as u32);
if let Some(condition) = path.condition.as_ref() {
let mut until = IterCondition::max_iters(times as u32);
let func = self
.udf_gen
.gen_filter(algebra_pb::Select { predicate: Some(condition.clone()) })?;
Expand All @@ -614,9 +614,14 @@ impl<P: PartitionInfo, C: ClusterInfo> IRJobAssembly<P, C> {
stream = stream
.iterate_until(until, |start| self.install(start, &base_expand_plan[..]))?;
} else {
stream = stream.iterate_emit_until(until, EmitKind::Before, |start| {
self.install(start, &base_expand_plan[..])
})?;
let (mut hop_stream, copied_stream) = stream.copied()?;
stream = copied_stream;
for _ in 0..times {
hop_stream = self.install(hop_stream, &base_expand_plan[..])?;
let copied = hop_stream.copied()?;
hop_stream = copied.0;
stream = stream.merge(copied.1)?;
}
}
}
// path end to add path_alias if exists
Expand Down

0 comments on commit 20be508

Please sign in to comment.