diff --git a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java index dc9082c841..827badf26b 100644 --- a/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java +++ b/digdag-core/src/main/java/io/digdag/core/workflow/TaskControl.java @@ -6,6 +6,7 @@ import java.util.HashSet; import java.util.ArrayList; import java.util.stream.Collectors; + import com.google.common.base.Optional; import com.google.common.base.Strings; import com.google.common.collect.*; @@ -182,6 +183,27 @@ private static long addTasks(TaskControlStore store, firstTask = false; } + Map taskNameAndIds = tasks.stream() + .collect(Collectors.toMap( + WorkflowTask::getFullName, + task -> indexToId.get(tasks.indexOf(task)) + )); + + resumingTasks + .stream() + .filter(resumingTask -> !taskNameAndIds.keySet().contains(resumingTask.getFullName()) + && resumingTask.getFullName().endsWith("^sub")) + .forEach(resumingSubtask -> { + String parentTaskName = resumingSubtask.getFullName().replaceAll("\\^sub$", ""); + + store.addResumedSubtask(attemptId, + taskNameAndIds.get(parentTaskName), + resumingSubtask.getTaskType(), + TaskStateCode.SUCCESS, + (isInitialTask ? TaskStateFlags.empty().withInitialTask() : TaskStateFlags.empty()), + resumingSubtask); + }); + return rootTaskId; }