Skip to content

Commit

Permalink
- Implement support for some edge-cases in GraphFusionTool
Browse files Browse the repository at this point in the history
- Adjust JobExecutor to use new GraphFusionTool

Signed-off-by: Benjamin Rögner <[email protected]>
  • Loading branch information
roegi committed Dec 13, 2024
1 parent 6d0dff8 commit 2a06771
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,15 @@ public StepGraph withParallel(boolean parallel) {
*/
@Override
public boolean isEquivalentTo(StepExecution other) {
if (other instanceof Step otherStep)
if (executions.size() == 1)
return executions.get(0).isEquivalentTo(otherStep);
else
return false;

if (!(other instanceof StepGraph otherGraph)
|| otherGraph.isParallel() != isParallel()
|| executions.size() != otherGraph.executions.size())
|| executions.size() != otherGraph.executions.size()
|| otherGraph.isParallel() != isParallel() && executions.size() > 1) //NOTE: For graphs with 0 or 1 executions, the parallelity has no effect
return false;

if (isParallel()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,17 @@ protected static StepGraph fuseGraphs(String newJobId, StepGraph newGraph, StepG
* @return A graph that is equivalent to the new step graph, but contains as many DelegateSteps as possible
*/
private static CompilationStepGraph replaceByDelegations(StepGraph newStepGraph, StepGraph oldStepGraph) {
//TODO: Handle different parallelities correctly? Also what about stepgraphs of size 1
if (newStepGraph.isParallel() != oldStepGraph.isParallel())
return null;
if (newStepGraph.isParallel() != oldStepGraph.isParallel()) {
if (newStepGraph.isParallel())
//Just wrap the sequential old graph into a parallel one and continue
oldStepGraph = new StepGraph().withParallel(true).withExecutions(List.of(oldStepGraph));
else
//Wrap the sequential new graph into a parallel one, do replacements parallel, and unwrap the result again
//TODO: Handle returned single-steps
return (CompilationStepGraph) unwrap(replaceByDelegationsParallelly(new StepGraph().withParallel(true).withExecutions(List.of(newStepGraph)), oldStepGraph));
}
if (newStepGraph.isParallel() != oldStepGraph.isParallel() && !(newStepGraph.getExecutions().size() == 1 && oldStepGraph.getExecutions().size() == 1))
return (CompilationStepGraph) new CompilationStepGraph().withParallel(newStepGraph.isParallel()).withExecutions(newStepGraph.getExecutions());
return newStepGraph.isParallel()
? replaceByDelegationsParallelly(newStepGraph, oldStepGraph) : replaceByDelegationsSequentially(newStepGraph, oldStepGraph);
}
Expand All @@ -80,7 +88,6 @@ private static CompilationStepGraph replaceByDelegations(StepGraph newStepGraph,
*/
private static CompilationStepGraph replaceByDelegationsSequentially(StepGraph newStepGraph, StepGraph oldStepGraph) {
CompilationStepGraph result = new CompilationStepGraph();
//TODO: Take into account the length of the step graphs
for (int i = 0; i < Math.min(newStepGraph.getExecutions().size(), oldStepGraph.getExecutions().size()); i++) {
StepExecution newExecution = newStepGraph.getExecutions().get(i);
StepExecution oldExecution = oldStepGraph.getExecutions().get(i);
Expand All @@ -89,13 +96,18 @@ private static CompilationStepGraph replaceByDelegationsSequentially(StepGraph n
result.addExecution(replaceByDelegations(newSubGraph, oldSubGraph));
else if (oldExecution.isEquivalentTo(newExecution))
//Replace the new execution by a pseudo delegating step that has a reference to the old step
result.addExecution(delegateToOldStep((Step) newExecution, (Step) oldExecution));
//NOTE: Even if one of the executions is a step, it could be that the other one is a StepGraph containing one step
result.addExecution(replaceByDelegationsInBranch(newExecution, oldExecution));
else {
//Keep all further (remaining) new executions as they are
result.getExecutions().addAll(newStepGraph.getExecutions().subList(i, newStepGraph.getExecutions().size()));
break;
}
}
//In case the new graph was longer than the old graph, add the rest of the steps
if (result.getExecutions().size() < newStepGraph.getExecutions().size())
result.getExecutions().addAll(newStepGraph.getExecutions().subList(result.getExecutions().size(), newStepGraph.getExecutions().size()));

return result;
}

Expand All @@ -114,7 +126,7 @@ private static CompilationStepGraph replaceByDelegationsParallelly(StepGraph new
int maxMatchCount = 0;
StepExecution largestMatchingBranch = null;
for (StepExecution oldBranch : oldStepGraph.getExecutions()) {
StepExecution branchCandidate = replaceByDelegationsAsFarAsPossible(newBranch, oldBranch);
StepExecution branchCandidate = replaceByDelegationsInBranch(newBranch, oldBranch);
int matchCount = matchCount(branchCandidate);
if (matchCount > maxMatchCount) {
maxMatchCount = matchCount;
Expand All @@ -140,18 +152,30 @@ private static CompilationStepGraph replaceByDelegationsParallelly(StepGraph new
* @param oldBranch
* @return
*/
private static StepExecution replaceByDelegationsAsFarAsPossible(StepExecution newBranch, StepExecution oldBranch) {
if (newBranch instanceof StepGraph newBranchGraph && oldBranch instanceof StepGraph oldBranchGraph) {
private static StepExecution replaceByDelegationsInBranch(StepExecution newBranch, StepExecution oldBranch) {
if (newBranch instanceof StepGraph newBranchGraph && oldBranch instanceof StepGraph oldBranchGraph)
return replaceByDelegations(newBranchGraph, oldBranchGraph);
}
else if (newBranch instanceof Step newStep && oldBranch instanceof Step oldStep) {
else if (newBranch instanceof Step newStep && oldBranch instanceof Step oldStep)
return newStep.isEquivalentTo(oldStep) ? delegateToOldStep(newStep, oldStep) : newStep;
}
//TODO: Compare also graphs with only one step to one step
//NOTE: The following is an edge-case which applies when comparing a step to a graph with only one execution
else if (newBranch instanceof StepGraph newBranchGraph && newBranchGraph.getExecutions().size() == 1
|| oldBranch instanceof StepGraph oldBranchGraph && oldBranchGraph.getExecutions().size() == 1)
return replaceByDelegationsInBranch(unwrap(newBranch), unwrap(oldBranch));
else
return newBranch;
}

/**
* Unwraps graphs that have only one execution
* @param oneExecution
* @return
*/
private static StepExecution unwrap(StepExecution oneExecution) {
if (oneExecution instanceof StepGraph graph && graph.getExecutions().size() != 1)
throw new IllegalArgumentException("Can not unwrap graphs with a number of executions different than 1.");
return oneExecution instanceof StepGraph singletonStepGraph ? singletonStepGraph.getExecutions().get(0) : oneExecution;
}

/**
* Creates a {@link DelegateStep} to be used to replace a step from the new graph with one equivalent step from the old graph.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static com.here.xyz.jobs.RuntimeInfo.State.PENDING;
import static com.here.xyz.jobs.RuntimeInfo.State.RUNNING;
import static com.here.xyz.jobs.RuntimeInfo.State.SUCCEEDED;
import static com.here.xyz.jobs.steps.execution.GraphFusionTool.fuseGraphs;
import static java.util.Comparator.comparingLong;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand All @@ -41,7 +43,6 @@
import com.here.xyz.util.service.Initializable;
import io.vertx.core.Future;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -77,10 +78,16 @@ public final Future<Void> startExecution(Job job, String formerExecutionId) {
//TODO: Care about concurrency between nodes when it comes to resource-load calculation within this thread
return Future.succeededFuture()
.compose(v -> formerExecutionId == null ? reuseExistingJobIfPossible(job) : Future.succeededFuture())
.compose(v -> mayExecute(job))
.compose(executionAllowed -> {
if (!executionAllowed)
//The Job remains in PENDING state and will be checked later again if it may be executed
.compose(v -> needsExecution(job))
.compose(executionNeeded -> executionNeeded ? mayExecute(job) : Future.succeededFuture(false))
.compose(shouldExecute -> {
if (!shouldExecute)
/*
Two different cases:
1. The job is complete already because all its steps are re-using old steps. In that case, the job is succeeded already.
2. The job may currently not be executed because of short resources.
It remains in PENDING state and will be checked later again if it may be executed.
*/
return Future.succeededFuture();

//Update the job status atomically to RUNNING to make sure no other node will try to start it.
Expand Down Expand Up @@ -125,7 +132,7 @@ private void checkPendingJobs() {
.compose(pendingJobs -> {
Future<Void> taskChain = Future.succeededFuture();
try {
pendingJobs.sort(Comparator.comparingLong(Job::getCreatedAt));
pendingJobs.sort(comparingLong(Job::getCreatedAt));
logger.info("Checking {} PENDING jobs if they can be executed ...", pendingJobs.size());
if (stopRequested)
return Future.succeededFuture();
Expand Down Expand Up @@ -176,7 +183,7 @@ private Future<Void> updatePendingTimeEstimations(List<Job> pendingJobs) {

//NOTE: The following is an algorithm that only provides a very rough estimation of the start time
for (Job pendingJob : pendingJobs) {
Job earliestCompletingJob = activeJobs.stream().min(Comparator.comparingLong(job -> job.getStatus().getEstimatedEndTime())).get();
Job earliestCompletingJob = activeJobs.stream().min(comparingLong(job -> job.getStatus().getEstimatedEndTime())).get();
long estimatedStartTime = earliestCompletingJob.getStatus().getEstimatedEndTime();
pendingJob.getStatus()
.withEstimatedStartTime(estimatedStartTime)
Expand Down Expand Up @@ -316,6 +323,19 @@ private Future<Boolean> mayExecute(Job job) {
}));
}

private Future<Boolean> needsExecution(Job job) {
return Future.succeededFuture()
.compose(v -> {
if (job.getSteps().stepStream().anyMatch(step -> !step.getStatus().getState().equals(SUCCEEDED)))
return Future.succeededFuture(true);

//All Steps are already succeeded - No need to execute the job
job.getStatus().setState(SUCCEEDED);
job.getStatus().setSucceededSteps(job.getStatus().getOverallStepCount());
return job.store().map(false);
});
}

/**
* Tries to find other existing jobs, that are completed and that already performed parts of the
* tasks that the provided job would have to perform.
Expand All @@ -326,31 +346,23 @@ private Future<Boolean> mayExecute(Job job) {
* @return An empty future (NOTE: If the job's graph was adjusted / shrunk, it will be also stored)
*/
private static Future<Void> reuseExistingJobIfPossible(Job job) {
if(job.getResourceKey() == null)
if (job.getResourceKey() == null || job.getSteps().stepStream().anyMatch(step -> step instanceof DelegateStep))
return Future.succeededFuture();
return JobConfigClient.getInstance().loadJobs(job.getResourceKey(), job.getSecondaryResourceKey(), SUCCEEDED)
.compose(candidates -> shrinkGraphByReusingOtherGraph(job, candidates.stream()
.compose(candidates -> Future.succeededFuture(candidates.stream()
.filter(candidate -> !job.getId().equals(candidate.getId())) //Do not try to compare the job to itself
.map(candidate -> candidate.getSteps().findConnectedEquivalentSubGraph(job.getSteps()))
.filter(candidateGraph -> !candidateGraph.isEmpty())
.max(Comparator.comparingInt(candidateGraph -> candidateGraph.size())) //Take the candidate with the largest matching subgraph
.map(candidate -> fuseGraphs(job, candidate.getSteps()))
.max(comparingLong(candidateGraph -> candidateGraph.stepStream().filter(step -> step instanceof DelegateStep).count())) //Take the candidate with the largest matching subgraph
.orElse(null)))
.compose(shrunkGraph ->{
//Todo: move out
if(shrunkGraph != null){
boolean allStepsSucceeded = shrunkGraph.stepStream().allMatch(s -> s.getStatus().getState().equals(SUCCEEDED));

if(allStepsSucceeded) {
//All Steps are already succeeded - Reused Graph matches new Graph.
job.getStatus().setState(SUCCEEDED);
job.getStatus().setSucceededSteps(job.getStatus().getOverallStepCount());
}
return job.withSteps(shrunkGraph).store();
}
return Future.succeededFuture();
});
.compose(newGraphWithReusedSteps -> newGraphWithReusedSteps == null
? Future.succeededFuture()
: job.withSteps(newGraphWithReusedSteps).store());
}

//TODO: emrParamReplacement(shrunkGraph, collectDelegateReplacements(shrunkGraph)); ... Use InputSets instead?

//TODO: Use new fuseGraphs() method instead for the tests
@Deprecated
public static Future<StepGraph> shrinkGraphByReusingOtherGraph(Job job, StepGraph reusedGraph) {
if (reusedGraph == null || reusedGraph.isEmpty())
return Future.succeededFuture();
Expand All @@ -376,6 +388,7 @@ public static Future<StepGraph> shrinkGraphByReusingOtherGraph(Job job, StepGrap
* @param executions StepExecutions of current Graph
* @param reusedExecutions StepExecutions which should get reused
*/
@Deprecated
private static void replaceWithDelegateOutputSteps(List<StepExecution> executions, List<StepExecution> reusedExecutions) {
for (int i = 0; i < reusedExecutions.size(); i++) {
StepExecution execution = executions.get(i);
Expand Down Expand Up @@ -410,6 +423,7 @@ else if (execution instanceof Step step && reusedExecution instanceof Step reuse
* @return a list containing a single {@code StepGraph} object that represents the updated execution graph
* with reusable portions replaced and dependencies adjusted.
*/
@Deprecated
private static List<StepExecution> calculateShrunkForest(Job job, StepGraph reusedGraph) {
List<StepExecution> executions = job.getSteps().getExecutions();
//Replace all executions, which can get reused form reusedGraph, in current graph with DelegateOutputSteps.
Expand Down Expand Up @@ -452,6 +466,7 @@ private static List<StepExecution> calculateShrunkForest(Job job, StepGraph reus
*
* @param graph
*/
@Deprecated
static void resolveReusedInputs(StepGraph graph) {
graph.stepStream().forEach(step -> resolveReusedInputs(step, graph));
}
Expand All @@ -462,6 +477,7 @@ static void resolveReusedInputs(StepGraph graph) {
* @param step
* @param containingStepGraph
*/
@Deprecated
private static void resolveReusedInputs(Step step, StepGraph containingStepGraph) {
List<InputSet> newInputSets = new ArrayList<>();
for (InputSet compiledInputSet : (List<InputSet>) step.getInputSets()) {
Expand Down Expand Up @@ -493,6 +509,7 @@ private static void resolveReusedInputs(Step step, StepGraph containingStepGraph
* @param previousStepIdsOfFirstNewNode the set of previous step IDs from the first new execution node, used for replacement.
* @param stepIdsOfLastReusedNodes the set of step IDs from the leaf nodes of the reused graph, used to replace previous step IDs.
*/
@Deprecated
private static void updateExecutionDependencies(List<StepExecution> executions, Set<String> previousStepIdsOfFirstNewNode,
Set<String> stepIdsOfLastReusedNodes) {
for (StepExecution executionNode : executions) {
Expand All @@ -510,6 +527,7 @@ else if (executionNode instanceof Step<?> step) {
}
}

@Deprecated
private static List<String[]> collectDelegateReplacements(StepGraph stepGraph) {
List<String[]> replacements = new ArrayList<>();

Expand All @@ -526,25 +544,25 @@ private static List<String[]> collectDelegateReplacements(StepGraph stepGraph) {
return replacements;
}

protected static void emrParamReplacement(StepGraph stepGraph, List<String[]> replacements){
for (StepExecution stepExecution : stepGraph.getExecutions()) {
if (stepExecution instanceof StepGraph graph) {
emrParamReplacement(graph, replacements);
} else if (stepExecution instanceof RunEmrJob emr) {
List<String> scriptParams = emr.getScriptParams();
@Deprecated
protected static void emrParamReplacement(StepGraph stepGraph, List<String[]> replacements) {
stepGraph.stepStream().forEach(step -> {
if (step instanceof RunEmrJob emrJobStep) {
List<String> scriptParams = emrJobStep.getScriptParams();
for (int i = 0; i < scriptParams.size(); i++) {
String param = scriptParams.get(i);

for (String[] replacement : replacements) {
for (String[] replacement : replacements)
param = param.replaceAll(replacement[0], replacement[1]);
}

scriptParams.set(i, param);
}
emr.setScriptParams(scriptParams);
emrJobStep.setScriptParams(scriptParams);
}
}
});
}

@Deprecated
private static Step getFirstExecutionNode(StepExecution executionNode) {
if (executionNode instanceof StepGraph previousNodeGraph) {
//Get the first node of the graph
Expand All @@ -567,6 +585,7 @@ private static Step getFirstExecutionNode(StepExecution executionNode) {
* This may be a {@code Step} or a {@code StepGraph}.
* @return a list of {@code Step} objects representing all leaf nodes in the execution graph.
*/
@Deprecated
public static List<Step> getAllLeafExecutionNodes(StepExecution executionNode) {
List<Step> leafNodes = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,23 @@ public class GraphFusionTests {
public static final String SOME_CONSUMER = "SomeConsumer";
public static final String NEW_JOB_ID = "newJob";
public static final String OLD_JOB_ID = "oldJob";
private static final boolean printGraphs = true;

private static final boolean printGraphs = false;

static {
new Config();
Config.instance.PARALLEL_STEPS_SUPPORTED = true;
}

/*
TODO: Add edge case tests:
- compare one single step with one graph that only has a single step (both directions)
- compare different length of sequential graphs (both direction)
- compare different length of sequential branches (both direction)
- compare graphs which have different parallelity
*/

@Test
public void simpleSequentialGraphFullyReusable() {
Step oldProducer = new SimpleTestStepWithOutput(SOME_EXPORT);
Expand Down Expand Up @@ -101,8 +111,6 @@ public void simpleSequentialGraphPartiallyReusable() {
checkOutputs(fusedGraph, OLD_JOB_ID);
}

//TODO: FIXME forward the inputs of the producer to the correct consumer for all graphs in the following tests

@Test
public void simpleSequentialGraphNotReusable() {
SimpleTestStepWithOutput oldProducer = new SimpleTestStepWithOutput(SOME_EXPORT);
Expand Down
Loading

0 comments on commit 2a06771

Please sign in to comment.