From 3794444b8a28b1c7c576e57e8f8a652c5bc598c7 Mon Sep 17 00:00:00 2001 From: Christos Katsakioris Date: Thu, 8 Oct 2020 19:59:45 +0300 Subject: [PATCH] Offload-ability, SchedulingResult, fixes, etc * Offload-ability now relies on TornadoVM's supported Driver classes (taking into account chained Flink operators within each JobVertex). * Introduce "empty"/"non-empty" E2dScheduler.SchedulingResult across multiple classes. * Add logging in HaierExecutionGraph. * Respond to Flink with only the offload-able SerializableScheduledJobVertex objects (possibly none) -- _actually_ time. * Fix multiGcd helper function of exhaustive time evaluation algorithm's implementation to for the case of a single task. Signed-off-by: Christos Katsakioris --- .../cslab/e2datascheduler/E2dScheduler.java | 67 +++++++- .../graph/HaierExecutionGraph.java | 161 ++++++++++++++++-- .../nsga/exhaustivetimeevaluation/Graph.java | 4 + .../e2datascheduler/ws/SchedulerService.java | 25 ++- 4 files changed, 236 insertions(+), 21 deletions(-) diff --git a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/E2dScheduler.java b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/E2dScheduler.java index b017495..58c9d17 100644 --- a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/E2dScheduler.java +++ b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/E2dScheduler.java @@ -107,7 +107,70 @@ public static E2dScheduler getInstance() { // -------------------------------------------------------------------------------------------- - public HaierExecutionGraph schedule(final JobGraph jobGraph, final OptimizationPolicy policy) { + /** + * A scheduling result produced by HAIER. If it is not "empty" (i.e., none of the JobVertex objects could be + * offloaded to one of the heterogeneous architectures supported by E2Data), it contains the optimized + * {@link HaierExecutionGraph} that has been selected among the execution plans that were generated. + */ + public static class SchedulingResult { + private final boolean empty; + private final HaierExecutionGraph result; + + /** + * Create an "empty" scheduling result, i.e., one in which none of the + * {@link org.apache.flink.runtime.jobgraph.JobVertex} objects for a given {@link JobGraph} could be + * offloaded to an accelerator at all. + */ + public SchedulingResult() { + this.empty = true; + this.result = null; + } + + /** + * Create a "non-empty" scheduling result that contains a {@link HaierExecutionGraph}. An "empty" scheduling + * result signifies that no {@link org.apache.flink.runtime.jobgraph.JobVertex} objects for a given + * {@link JobGraph} could be offloaded to an accelerator at all. + * + * @param haierExecutionGraph the scheduling result, or {@code null} to signify an "empty" result. + */ + public SchedulingResult(final HaierExecutionGraph haierExecutionGraph) { + this.empty = false; + this.result = haierExecutionGraph; + } + + /** + * @return {@code true} if the scheduling result is "empty"; {@code false} otherwise + */ + public boolean isEmpty() { + return this.empty; + } + + /** + * @return the result {@link HaierExecutionGraph} or {@code null} if the result is "empty" + * + * FIXME(ckatsak): Is it possible to return null for a "non-empty" result (e.g., in the case of an + * internal error)? Until this is thoroughly reviewed, always use isEmpty() before this. + */ + public HaierExecutionGraph getResult() { + return this.result; + } + } + + /** + * TODO(ckatsak): Documentation + * + * @param jobGraph + * @param policy + * @return + */ + public SchedulingResult schedule(final JobGraph jobGraph, final OptimizationPolicy policy) { + // If the given JobGraph does not contain any JobVertex that can be offloaded to an accelerator at all, + // return fast. + if (!HaierExecutionGraph.containsAnyComputationalJobVertex(jobGraph)) { + logger.info("Early exiting because none of the JobVertex objects can be offloaded to accelerators."); + return new SchedulingResult(); + } + // If GUI is enabled, register the JobID to the SelectionQueue, // to respond to GET /e2data/nsga2/{jobId}/plans appropriately. if (GUI_ENABLED) { @@ -128,7 +191,7 @@ public HaierExecutionGraph schedule(final JobGraph jobGraph, final OptimizationP //return this.optimizer.optimize(jobGraph, policy, selectedModel); final List paretoHaierExecutionGraphs = this.optimizer.optimize(jobGraph, policy, selectedModel); - return this.pickPlan(jobGraph, policy, paretoHaierExecutionGraphs); + return new SchedulingResult(this.pickPlan(jobGraph, policy, paretoHaierExecutionGraphs)); } /** diff --git a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/graph/HaierExecutionGraph.java b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/graph/HaierExecutionGraph.java index ae6f611..3df57fb 100644 --- a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/graph/HaierExecutionGraph.java +++ b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/graph/HaierExecutionGraph.java @@ -3,6 +3,9 @@ import gr.ntua.ece.cslab.e2datascheduler.beans.cluster.HwResource; import gr.ntua.ece.cslab.e2datascheduler.beans.graph.SerializableScheduledJobVertex; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.IntermediateDataSet; import org.apache.flink.runtime.jobgraph.JobEdge; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -18,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.logging.Logger; /** @@ -26,6 +30,8 @@ */ public class HaierExecutionGraph { + private static final Logger logger = Logger.getLogger(HaierExecutionGraph.class.getCanonicalName()); + private final JobGraph jobGraph; private final JobVertex[] jobVertices; @@ -159,13 +165,15 @@ private void auxSubDAGInclusiveRecursive(final ScheduledJobVertex scheduledJobVe }}; /** - * Returns {@code true} if the given {@link JobVertex} represents a task that can be offloaded to some - * heterogeneous architecture supported by E2Data; {@code false} otherwise. + * Check whether the given {@link JobVertex} can be offloaded to some heterogeneous architecture supported by + * E2Data, based only on its name. * - * @param jobVertex The given {@link JobVertex}. - * @return {@code true} if it can be offloaded; {@code false} otherwise. + * FIXME(ckatsak): This is probably wrong and obsolete, but check cross-checking the new technique is pending. + * + * @param jobVertex the {@link JobVertex} at hand + * @return {@code true} if it can be offloaded; {@code false} otherwise */ - public static boolean isComputational(final JobVertex jobVertex) { + private static boolean isComputationalBasedOnName(final JobVertex jobVertex) { for (String name : HaierExecutionGraph.NON_COMPUTATIONAL_OPERATOR_NAMES) { if (jobVertex.getName().startsWith(name)) { return false; @@ -174,6 +182,18 @@ public static boolean isComputational(final JobVertex jobVertex) { return true; } + /** + * Check whether the given {@link JobVertex} can be offloaded to some heterogeneous architecture supported by + * E2Data. + * + * @param jobVertex the {@link JobVertex} at hand + * @return {@code true} if it can be offloaded; {@code false} otherwise + */ + public static boolean isComputational(final JobVertex jobVertex) { + return isComputationalBasedOnDriverClass(jobVertex); +// return isComputationalBasedOnName(jobVertex); + } + /** * Returns {@code true} if the given {@link ScheduledJobVertex} represents a task that can be offloaded to some * heterogeneous architecture supported by E2Data; {@code false} otherwise. @@ -185,6 +205,126 @@ public static boolean isComputational(final ScheduledJobVertex scheduledJobVerte return HaierExecutionGraph.isComputational(scheduledJobVertex.getJobVertex()); } + /** + * An exhaustive list of the Driver classes that are actually supported by TornadoVM at this time. + */ + private static final List TORNADOVM_SUPPORTED_DRIVER_CLASSES = new ArrayList() {{ + add("ChainedMapDriver"); + add("ChainedAllReduceDriver"); + add("ChainedReduceCombineDriver"); + add("MapDriver"); + add("ReduceDriver"); + }}; + + /** + * Check whether the given {@link JobVertex} can be offloaded to some heterogeneous architecture supported by + * E2Data, based on Flink Driver classes reported in it. + * + * @param jobVertex the {@link JobVertex} at hand + * @return {@code true} if it can be offloaded; {@code false} otherwise + */ + private static boolean isComputationalBasedOnDriverClass(final JobVertex jobVertex) { + final Configuration config = jobVertex.getConfiguration(); + final ConfigOption chainingNumOption = ConfigOptions + .key("chaining.num") + .defaultValue(0); + final ConfigOption driverClassOption = ConfigOptions + .key("driver.class") + .noDefaultValue(); + + final int chainingNum = config.getInteger(chainingNumOption); + + // If the JobVertex does not contain chained operators, then just look for the value of the entry under the + // "driver.class" key (note that it may be absent, in which case it is probably a "non-acceleratable" vertex) + // to check whether it is one of the driver classes supported by TornadoVM. + if (chainingNum == 0) { + if (!config.contains(driverClassOption)) { + return false; + } + final String driverClassCanonicalName = config.getString(driverClassOption); + final String[] splitDriverClass = driverClassCanonicalName.split("\\."); + final String driverClassBaseName = splitDriverClass[splitDriverClass.length - 1]; + return TORNADOVM_SUPPORTED_DRIVER_CLASSES.contains(driverClassBaseName); + } + // + // Now, if the JobVertex at hand contains chained operators: + // + // Check whether the first operator in the chain has a Driver class. If it does not, then the whole JobVertex + // cannot be offloaded to an accelerator. + // FIXME(ckatsak): Make sure that the assumption in the comment above is true. E.g., is a JobVertex like + // `CHAIN DataSource -> Map -> Map` really non-acceleratable? + if (!config.contains(driverClassOption)) { + return false; + } + // If the first operator in the chain does have a Driver class, then if this Driver class is not supported by + // TornadoVM, the whole JobVertex is non-acceleratable. FIXME?(ckatsak): Related to the FIXME above. + { + final String driverClassCanonicalName = config.getString(driverClassOption); + final String[] splitDriverClass = driverClassCanonicalName.split("\\."); + final String driverClassBaseName = splitDriverClass[splitDriverClass.length - 1]; + if (!TORNADOVM_SUPPORTED_DRIVER_CLASSES.contains(driverClassBaseName)) { + return false; + } + } + // Now, check the Driver classes of all the operators in the chain of the JobVertex one by one, and only + // conclude that the JobVertex is acceleratable if all of them are supported by TornadoVM. + for (int i = 0; i < chainingNum; i++) { + // If chained operator's Driver class is missing, the JobVertex is not acceleratable. + // FIXME(ckatsak): ^^ Another similar assumption to cross-check ^^ + if (!config.containsKey("chaining.task." + i)) { + return false; + } + final String driverClassCanonicalName = config.getString("chaining.task." + i, ""); + final String[] splitDriverClass = driverClassCanonicalName.split("\\."); + final String driverClassBaseName = splitDriverClass[splitDriverClass.length - 1]; + if (!TORNADOVM_SUPPORTED_DRIVER_CLASSES.contains(driverClassBaseName)) { + return false; + } + } + + return true; + } + + /** + * Check whether the given {@link JobGraph} contains *any* {@link JobVertex} that can be offloaded to some + * heterogeneous architecture supported by E2Data. + * + * @param jobGraph + * @return + */ + public static boolean containsAnyComputationalJobVertex(final JobGraph jobGraph) { + for (JobVertex jobVertex : jobGraph.getVertices()) { + if (HaierExecutionGraph.isComputational(jobVertex)) { + return true; + } + } + return false; + } + + /** + * Go through all {@link JobVertex} objects in the given {@link JobGraph} and log for each of them whether it is + * offloadable to the heterogeneous architectures supported by E2Data or not. + * + * @param jobGraph the given {@link JobGraph} to examine + */ + public static void logOffloadability(final JobGraph jobGraph) { + String msg = "Checking \"offloadability\" of JobVertices in '" + jobGraph.toString() + "':\n"; + int total = 0; + int acceleratable = 0; + for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) { + total++; + if (HaierExecutionGraph.isComputational(jobVertex)) { + acceleratable++; + msg += total + ". JobVertex '" + jobVertex.toString() + "' is acceleratable\n"; + } else { + msg += total + ". JobVertex '" + jobVertex.toString() + "' is NOT acceleratable\n"; + } + } + msg += "Offload-ability Summary: " + acceleratable + " out of " + total +" JobVertices in " + + jobGraph.toString() + " are acceleratable!"; + logger.finest(msg); + } + // -------------------------------------------------------------------------------------------- @@ -238,14 +378,11 @@ public boolean checkCoLocationConstraints() { */ public List toSerializableScheduledJobVertexList() { final List schedulableIndices = new ArrayList<>(); - outer: - for (int i = 0; i < this.jobVertices.length; i++) { - for (String name : HaierExecutionGraph.NON_COMPUTATIONAL_OPERATOR_NAMES) { - if (this.jobVertices[i].getName().startsWith(name)) { - continue outer; - } + for (int jobVertexIndex = 0; jobVertexIndex < this.jobVertices.length; jobVertexIndex++) { + if (!HaierExecutionGraph.isComputational(this.jobVertices[jobVertexIndex])) { + continue; } - schedulableIndices.add(i); + schedulableIndices.add(jobVertexIndex); } final List ret = new ArrayList<>(schedulableIndices.size()); diff --git a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/optimizer/nsga/exhaustivetimeevaluation/Graph.java b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/optimizer/nsga/exhaustivetimeevaluation/Graph.java index 538e1af..f3cd9af 100644 --- a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/optimizer/nsga/exhaustivetimeevaluation/Graph.java +++ b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/optimizer/nsga/exhaustivetimeevaluation/Graph.java @@ -88,6 +88,10 @@ private static int gcd(int x, int y) { * @return the GCD of the given list of integers. */ private static int multiGcd(final List durations) { + if (durations.size() == 1) { + return durations.get(0); + } + int totalGcd = gcd(durations.get(0), durations.get(1)); if (durations.size() > 2) { for (int i = 2; i < durations.size(); i++) { diff --git a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java index 272f2a9..789fde8 100644 --- a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java +++ b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java @@ -29,9 +29,9 @@ import java.util.Base64; import java.util.LinkedList; import java.util.List; +import java.util.ResourceBundle; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; -import java.util.ResourceBundle; import java.io.File; import java.io.FileInputStream; @@ -222,8 +222,8 @@ public Response flink_schedule( } } - // FIXME(ckatsak): For now, a default OptimizationPolicy object is - // hardcoded here instead of being sent by Flink. + // TODO(ckatsak): For now, a default OptimizationPolicy object is + // hardcoded here instead of being sent by Flink. //final String policyStr = "{\"policy\": {\"objectives\": [ {\"name\":\"execTime\", \"targetFunction\":\"MIN\", \"combineFunction\":\"MAX\"}, {\"name\":\"powerCons\", \"targetFunction\":\"MIN\", \"combineFunction\":\"SUM\"} ]}}"; final String policyStr = "{" + "\"policy\": {" + @@ -242,17 +242,28 @@ public Response flink_schedule( "}" + "}"; - final HaierExecutionGraph result = this.scheduler.schedule(jobGraph, OptimizationPolicy.parseJSON(policyStr)); + final E2dScheduler.SchedulingResult result = + this.scheduler.schedule(jobGraph, OptimizationPolicy.parseJSON(policyStr)); if (null == result) { - logger.finer("scheduler.schedule() returned null!"); + logger.severe("E2dScheduler.SchedulingResult is null; something serious is going on!"); + return generateResponse(Response.Status.INTERNAL_SERVER_ERROR, ""); + } + if (result.isEmpty()) { + logger.info("It looks like none of the JobVertices in '" + jobGraph.toString() + + "' can be offloaded to an accelerator."); + return generateResponse(Response.Status.OK, new ArrayList<>(0)); + } + final HaierExecutionGraph resultGraph = result.getResult(); + if (null == resultGraph) { + logger.severe("E2dScheduler.SchedulingResult.getResult() returned null!"); logger.severe("Error allocating resources for '" + jobGraph.toString() + "'"); return generateResponse(Response.Status.INTERNAL_SERVER_ERROR, ""); } logger.info("Scheduling result for '" + jobGraph.toString() + "':\n" + result); - final List responseBody = result.toSerializableScheduledJobVertexList(); - logger.info("Response body returned to Flink for '" + jobGraph.toString() + "':\n" + + final List responseBody = resultGraph.toSerializableScheduledJobVertexList(); + logger.finest("Response body returned to Flink for '" + jobGraph.toString() + "':\n" + new GsonBuilder().setPrettyPrinting().create().toJson(responseBody)); return generateResponse(Response.Status.OK, responseBody); }