Skip to content

Commit

Permalink
fix in parallel scan
Browse files Browse the repository at this point in the history
Committed-by: bingqing.lbq from Dev container
  • Loading branch information
BingqingLyu committed Oct 27, 2023
1 parent 1942a67 commit 8085661
Showing 1 changed file with 4 additions and 9 deletions.
13 changes: 4 additions & 9 deletions interactive_engine/executor/ir/runtime/src/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -685,8 +685,7 @@ impl<P: PartitionInfo, C: ClusterInfo> IRJobAssembly<P, C> {
}
}
OpKind::Root(_) => {
// this would be processed in assemble, and can not be reached when install.
Err(FnGenError::unsupported_error("unreachable root in install"))?
// do nothing, as it is a dummy node
}
OpKind::Sink(_) => {
// this would be processed in assemble, and can not be reached when install.
Expand All @@ -704,17 +703,13 @@ impl<P: PartitionInfo, C: ClusterInfo> JobAssembly<Record> for IRJobAssembly<P,
fn assemble(&self, plan: &JobDesc, worker: &mut Worker<Record, Vec<u8>>) -> Result<(), BuildJobError> {
worker.dataflow(move |input, output| {
let physical_plan = decode::<pb::PhysicalPlan>(&plan.plan)?;
let source_opr = physical_plan
.plan
.first()
.ok_or(FnGenError::from(ParsePbError::EmptyFieldError("empty job plan".to_string())))?;
let source_iter = self.udf_gen.gen_source(source_opr.clone())?;
let source = input.input_from(source_iter)?;
if log_enabled!(log::Level::Debug) && pegasus::get_current_worker().index == 0 {
debug!("{:#?}", physical_plan);
}
// input from a dummy record to trigger the computation
let source = input.input_from(vec![Record::default()])?;
let plan_len = physical_plan.plan.len();
let stream = self.install(source, &physical_plan.plan[1..plan_len - 1])?;
let stream = self.install(source, &physical_plan.plan[0..plan_len - 1])?;
let sink_opr = physical_plan
.plan
.last()
Expand Down

0 comments on commit 8085661

Please sign in to comment.