diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index d2bf8773dcc7..fb61c8accdaf 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -25,8 +25,8 @@ use ir_common::generated::physical as pb; use ir_common::generated::physical::physical_opr::operator::OpKind; use pegasus::api::function::*; use pegasus::api::{ - Collect, CorrelatedSubTask, Count, Dedup, EmitKind, Filter, Fold, FoldByKey, HasAny, IterCondition, - Iteration, Join, KeyBy, Limit, Map, Merge, Sink, SortBy, SortLimitBy, + Collect, CorrelatedSubTask, Count, Dedup, Filter, Fold, FoldByKey, HasAny, IterCondition, Iteration, + Join, KeyBy, Limit, Map, Merge, Sink, SortBy, SortLimitBy, }; use pegasus::stream::Stream; use pegasus::{BuildJobError, Worker}; @@ -604,8 +604,8 @@ impl IRJobAssembly { } let times = range.upper - range.lower - 1; if times > 0 { - let mut until = IterCondition::max_iters(times as u32); if let Some(condition) = path.condition.as_ref() { + let mut until = IterCondition::max_iters(times as u32); let func = self .udf_gen .gen_filter(algebra_pb::Select { predicate: Some(condition.clone()) })?; @@ -614,9 +614,14 @@ impl IRJobAssembly { stream = stream .iterate_until(until, |start| self.install(start, &base_expand_plan[..]))?; } else { - stream = stream.iterate_emit_until(until, EmitKind::Before, |start| { - self.install(start, &base_expand_plan[..]) - })?; + let (mut hop_stream, copied_stream) = stream.copied()?; + stream = copied_stream; + for _ in 0..times { + hop_stream = self.install(hop_stream, &base_expand_plan[..])?; + let copied = hop_stream.copied()?; + hop_stream = copied.0; + stream = stream.merge(copied.1)?; + } } } // path end to add path_alias if exists