Skip to content

Commit

Permalink
Offload-ability, SchedulingResult, fixes, etc
Browse files Browse the repository at this point in the history
* Offload-ability now relies on TornadoVM's supported Driver classes
(taking into account chained Flink operators within each JobVertex).

* Introduce "empty"/"non-empty"  E2dScheduler.SchedulingResult across
multiple classes.

* Add logging in HaierExecutionGraph.

* Respond to Flink with only the offload-able
SerializableScheduledJobVertex objects (possibly none) -- _actually_
time.

* Fix multiGcd helper function of exhaustive time evaluation algorithm's
implementation to for the case of a single task.

Signed-off-by: Christos Katsakioris <[email protected]>
  • Loading branch information
ckatsak committed Oct 8, 2020
1 parent e5f6a63 commit 3794444
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 21 deletions.
67 changes: 65 additions & 2 deletions src/main/java/gr/ntua/ece/cslab/e2datascheduler/E2dScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,70 @@ public static E2dScheduler getInstance() {
// --------------------------------------------------------------------------------------------


public HaierExecutionGraph schedule(final JobGraph jobGraph, final OptimizationPolicy policy) {
/**
* A scheduling result produced by HAIER. If it is not "empty" (i.e., none of the JobVertex objects could be
* offloaded to one of the heterogeneous architectures supported by E2Data), it contains the optimized
* {@link HaierExecutionGraph} that has been selected among the execution plans that were generated.
*/
public static class SchedulingResult {
private final boolean empty;
private final HaierExecutionGraph result;

/**
* Create an "empty" scheduling result, i.e., one in which none of the
* {@link org.apache.flink.runtime.jobgraph.JobVertex} objects for a given {@link JobGraph} could be
* offloaded to an accelerator at all.
*/
public SchedulingResult() {
this.empty = true;
this.result = null;
}

/**
* Create a "non-empty" scheduling result that contains a {@link HaierExecutionGraph}. An "empty" scheduling
* result signifies that no {@link org.apache.flink.runtime.jobgraph.JobVertex} objects for a given
* {@link JobGraph} could be offloaded to an accelerator at all.
*
* @param haierExecutionGraph the scheduling result, or {@code null} to signify an "empty" result.
*/
public SchedulingResult(final HaierExecutionGraph haierExecutionGraph) {
this.empty = false;
this.result = haierExecutionGraph;
}

/**
* @return {@code true} if the scheduling result is "empty"; {@code false} otherwise
*/
public boolean isEmpty() {
return this.empty;
}

/**
* @return the result {@link HaierExecutionGraph} or {@code null} if the result is "empty"
*
* FIXME(ckatsak): Is it possible to return null for a "non-empty" result (e.g., in the case of an
* internal error)? Until this is thoroughly reviewed, always use isEmpty() before this.
*/
public HaierExecutionGraph getResult() {
return this.result;
}
}

/**
* TODO(ckatsak): Documentation
*
* @param jobGraph
* @param policy
* @return
*/
public SchedulingResult schedule(final JobGraph jobGraph, final OptimizationPolicy policy) {
// If the given JobGraph does not contain any JobVertex that can be offloaded to an accelerator at all,
// return fast.
if (!HaierExecutionGraph.containsAnyComputationalJobVertex(jobGraph)) {
logger.info("Early exiting because none of the JobVertex objects can be offloaded to accelerators.");
return new SchedulingResult();
}

// If GUI is enabled, register the JobID to the SelectionQueue,
// to respond to GET /e2data/nsga2/{jobId}/plans appropriately.
if (GUI_ENABLED) {
Expand All @@ -128,7 +191,7 @@ public HaierExecutionGraph schedule(final JobGraph jobGraph, final OptimizationP
//return this.optimizer.optimize(jobGraph, policy, selectedModel);
final List<HaierExecutionGraph> paretoHaierExecutionGraphs =
this.optimizer.optimize(jobGraph, policy, selectedModel);
return this.pickPlan(jobGraph, policy, paretoHaierExecutionGraphs);
return new SchedulingResult(this.pickPlan(jobGraph, policy, paretoHaierExecutionGraphs));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import gr.ntua.ece.cslab.e2datascheduler.beans.cluster.HwResource;
import gr.ntua.ece.cslab.e2datascheduler.beans.graph.SerializableScheduledJobVertex;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
Expand All @@ -18,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.logging.Logger;


/**
Expand All @@ -26,6 +30,8 @@
*/
public class HaierExecutionGraph {

private static final Logger logger = Logger.getLogger(HaierExecutionGraph.class.getCanonicalName());

private final JobGraph jobGraph;
private final JobVertex[] jobVertices;

Expand Down Expand Up @@ -159,13 +165,15 @@ private void auxSubDAGInclusiveRecursive(final ScheduledJobVertex scheduledJobVe
}};

/**
* Returns {@code true} if the given {@link JobVertex} represents a task that can be offloaded to some
* heterogeneous architecture supported by E2Data; {@code false} otherwise.
* Check whether the given {@link JobVertex} can be offloaded to some heterogeneous architecture supported by
* E2Data, based only on its name.
*
* @param jobVertex The given {@link JobVertex}.
* @return {@code true} if it can be offloaded; {@code false} otherwise.
* FIXME(ckatsak): This is probably wrong and obsolete, but check cross-checking the new technique is pending.
*
* @param jobVertex the {@link JobVertex} at hand
* @return {@code true} if it can be offloaded; {@code false} otherwise
*/
public static boolean isComputational(final JobVertex jobVertex) {
private static boolean isComputationalBasedOnName(final JobVertex jobVertex) {
for (String name : HaierExecutionGraph.NON_COMPUTATIONAL_OPERATOR_NAMES) {
if (jobVertex.getName().startsWith(name)) {
return false;
Expand All @@ -174,6 +182,18 @@ public static boolean isComputational(final JobVertex jobVertex) {
return true;
}

/**
* Check whether the given {@link JobVertex} can be offloaded to some heterogeneous architecture supported by
* E2Data.
*
* @param jobVertex the {@link JobVertex} at hand
* @return {@code true} if it can be offloaded; {@code false} otherwise
*/
public static boolean isComputational(final JobVertex jobVertex) {
return isComputationalBasedOnDriverClass(jobVertex);
// return isComputationalBasedOnName(jobVertex);
}

/**
* Returns {@code true} if the given {@link ScheduledJobVertex} represents a task that can be offloaded to some
* heterogeneous architecture supported by E2Data; {@code false} otherwise.
Expand All @@ -185,6 +205,126 @@ public static boolean isComputational(final ScheduledJobVertex scheduledJobVerte
return HaierExecutionGraph.isComputational(scheduledJobVertex.getJobVertex());
}

/**
* An exhaustive list of the Driver classes that are actually supported by TornadoVM at this time.
*/
private static final List<String> TORNADOVM_SUPPORTED_DRIVER_CLASSES = new ArrayList<String>() {{
add("ChainedMapDriver");
add("ChainedAllReduceDriver");
add("ChainedReduceCombineDriver");
add("MapDriver");
add("ReduceDriver");
}};

/**
* Check whether the given {@link JobVertex} can be offloaded to some heterogeneous architecture supported by
* E2Data, based on Flink Driver classes reported in it.
*
* @param jobVertex the {@link JobVertex} at hand
* @return {@code true} if it can be offloaded; {@code false} otherwise
*/
private static boolean isComputationalBasedOnDriverClass(final JobVertex jobVertex) {
final Configuration config = jobVertex.getConfiguration();
final ConfigOption<Integer> chainingNumOption = ConfigOptions
.key("chaining.num")
.defaultValue(0);
final ConfigOption<String> driverClassOption = ConfigOptions
.key("driver.class")
.noDefaultValue();

final int chainingNum = config.getInteger(chainingNumOption);

// If the JobVertex does not contain chained operators, then just look for the value of the entry under the
// "driver.class" key (note that it may be absent, in which case it is probably a "non-acceleratable" vertex)
// to check whether it is one of the driver classes supported by TornadoVM.
if (chainingNum == 0) {
if (!config.contains(driverClassOption)) {
return false;
}
final String driverClassCanonicalName = config.getString(driverClassOption);
final String[] splitDriverClass = driverClassCanonicalName.split("\\.");
final String driverClassBaseName = splitDriverClass[splitDriverClass.length - 1];
return TORNADOVM_SUPPORTED_DRIVER_CLASSES.contains(driverClassBaseName);
}
//
// Now, if the JobVertex at hand contains chained operators:
//
// Check whether the first operator in the chain has a Driver class. If it does not, then the whole JobVertex
// cannot be offloaded to an accelerator.
// FIXME(ckatsak): Make sure that the assumption in the comment above is true. E.g., is a JobVertex like
// `CHAIN DataSource -> Map -> Map` really non-acceleratable?
if (!config.contains(driverClassOption)) {
return false;
}
// If the first operator in the chain does have a Driver class, then if this Driver class is not supported by
// TornadoVM, the whole JobVertex is non-acceleratable. FIXME?(ckatsak): Related to the FIXME above.
{
final String driverClassCanonicalName = config.getString(driverClassOption);
final String[] splitDriverClass = driverClassCanonicalName.split("\\.");
final String driverClassBaseName = splitDriverClass[splitDriverClass.length - 1];
if (!TORNADOVM_SUPPORTED_DRIVER_CLASSES.contains(driverClassBaseName)) {
return false;
}
}
// Now, check the Driver classes of all the operators in the chain of the JobVertex one by one, and only
// conclude that the JobVertex is acceleratable if all of them are supported by TornadoVM.
for (int i = 0; i < chainingNum; i++) {
// If chained operator's Driver class is missing, the JobVertex is not acceleratable.
// FIXME(ckatsak): ^^ Another similar assumption to cross-check ^^
if (!config.containsKey("chaining.task." + i)) {
return false;
}
final String driverClassCanonicalName = config.getString("chaining.task." + i, "");
final String[] splitDriverClass = driverClassCanonicalName.split("\\.");
final String driverClassBaseName = splitDriverClass[splitDriverClass.length - 1];
if (!TORNADOVM_SUPPORTED_DRIVER_CLASSES.contains(driverClassBaseName)) {
return false;
}
}

return true;
}

/**
* Check whether the given {@link JobGraph} contains *any* {@link JobVertex} that can be offloaded to some
* heterogeneous architecture supported by E2Data.
*
* @param jobGraph
* @return
*/
public static boolean containsAnyComputationalJobVertex(final JobGraph jobGraph) {
for (JobVertex jobVertex : jobGraph.getVertices()) {
if (HaierExecutionGraph.isComputational(jobVertex)) {
return true;
}
}
return false;
}

/**
* Go through all {@link JobVertex} objects in the given {@link JobGraph} and log for each of them whether it is
* offloadable to the heterogeneous architectures supported by E2Data or not.
*
* @param jobGraph the given {@link JobGraph} to examine
*/
public static void logOffloadability(final JobGraph jobGraph) {
String msg = "Checking \"offloadability\" of JobVertices in '" + jobGraph.toString() + "':\n";
int total = 0;
int acceleratable = 0;
for (JobVertex jobVertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
total++;
if (HaierExecutionGraph.isComputational(jobVertex)) {
acceleratable++;
msg += total + ". JobVertex '" + jobVertex.toString() + "' is acceleratable\n";
} else {
msg += total + ". JobVertex '" + jobVertex.toString() + "' is NOT acceleratable\n";
}
}
msg += "Offload-ability Summary: " + acceleratable + " out of " + total +" JobVertices in " +
jobGraph.toString() + " are acceleratable!";
logger.finest(msg);
}


// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -238,14 +378,11 @@ public boolean checkCoLocationConstraints() {
*/
public List<SerializableScheduledJobVertex> toSerializableScheduledJobVertexList() {
final List<Integer> schedulableIndices = new ArrayList<>();
outer:
for (int i = 0; i < this.jobVertices.length; i++) {
for (String name : HaierExecutionGraph.NON_COMPUTATIONAL_OPERATOR_NAMES) {
if (this.jobVertices[i].getName().startsWith(name)) {
continue outer;
}
for (int jobVertexIndex = 0; jobVertexIndex < this.jobVertices.length; jobVertexIndex++) {
if (!HaierExecutionGraph.isComputational(this.jobVertices[jobVertexIndex])) {
continue;
}
schedulableIndices.add(i);
schedulableIndices.add(jobVertexIndex);
}

final List<SerializableScheduledJobVertex> ret = new ArrayList<>(schedulableIndices.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ private static int gcd(int x, int y) {
* @return the GCD of the given list of integers.
*/
private static int multiGcd(final List<Integer> durations) {
if (durations.size() == 1) {
return durations.get(0);
}

int totalGcd = gcd(durations.get(0), durations.get(1));
if (durations.size() > 2) {
for (int i = 2; i < durations.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import java.util.Base64;
import java.util.LinkedList;
import java.util.List;
import java.util.ResourceBundle;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
import java.util.ResourceBundle;

import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -222,8 +222,8 @@ public Response flink_schedule(
}
}

// FIXME(ckatsak): For now, a default OptimizationPolicy object is
// hardcoded here instead of being sent by Flink.
// TODO(ckatsak): For now, a default OptimizationPolicy object is
// hardcoded here instead of being sent by Flink.
//final String policyStr = "{\"policy\": {\"objectives\": [ {\"name\":\"execTime\", \"targetFunction\":\"MIN\", \"combineFunction\":\"MAX\"}, {\"name\":\"powerCons\", \"targetFunction\":\"MIN\", \"combineFunction\":\"SUM\"} ]}}";
final String policyStr = "{" +
"\"policy\": {" +
Expand All @@ -242,17 +242,28 @@ public Response flink_schedule(
"}" +
"}";

final HaierExecutionGraph result = this.scheduler.schedule(jobGraph, OptimizationPolicy.parseJSON(policyStr));
final E2dScheduler.SchedulingResult result =
this.scheduler.schedule(jobGraph, OptimizationPolicy.parseJSON(policyStr));
if (null == result) {
logger.finer("scheduler.schedule() returned null!");
logger.severe("E2dScheduler.SchedulingResult is null; something serious is going on!");
return generateResponse(Response.Status.INTERNAL_SERVER_ERROR, "");
}
if (result.isEmpty()) {
logger.info("It looks like none of the JobVertices in '" + jobGraph.toString() +
"' can be offloaded to an accelerator.");
return generateResponse(Response.Status.OK, new ArrayList<>(0));
}
final HaierExecutionGraph resultGraph = result.getResult();
if (null == resultGraph) {
logger.severe("E2dScheduler.SchedulingResult.getResult() returned null!");
logger.severe("Error allocating resources for '" + jobGraph.toString() + "'");
return generateResponse(Response.Status.INTERNAL_SERVER_ERROR, "");
}

logger.info("Scheduling result for '" + jobGraph.toString() + "':\n" + result);

final List<SerializableScheduledJobVertex> responseBody = result.toSerializableScheduledJobVertexList();
logger.info("Response body returned to Flink for '" + jobGraph.toString() + "':\n" +
final List<SerializableScheduledJobVertex> responseBody = resultGraph.toSerializableScheduledJobVertexList();
logger.finest("Response body returned to Flink for '" + jobGraph.toString() + "':\n" +
new GsonBuilder().setPrettyPrinting().create().toJson(responseBody));
return generateResponse(Response.Status.OK, responseBody);
}
Expand Down

0 comments on commit 3794444

Please sign in to comment.