diff --git a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java index 789fde8..7bbcb46 100644 --- a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java +++ b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java @@ -9,7 +9,12 @@ import gr.ntua.ece.cslab.e2datascheduler.util.HaierLogHandler; import gr.ntua.ece.cslab.e2datascheduler.util.SelectionQueue; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; @@ -26,9 +31,12 @@ import com.google.gson.GsonBuilder; +import java.util.ArrayList; import java.util.Base64; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.ResourceBundle; import java.util.concurrent.BlockingQueue; import java.util.logging.Logger; @@ -242,6 +250,8 @@ public Response flink_schedule( "}" + "}"; + debuggingInspection(jobGraph); // FIXME(ckatsak) debugging logging + final E2dScheduler.SchedulingResult result = this.scheduler.schedule(jobGraph, OptimizationPolicy.parseJSON(policyStr)); if (null == result) { @@ -283,5 +293,61 @@ private static void persistSerializedJobGraphFile( out.close(); } + + // -------------------------------------------------------------------------------------------- + + + private static void debuggingInspection(final JobGraph jobGraph) { +/// for (JobVertex vertex : jobGraph.getVerticesSortedTopologicallyFromSources()) { +/// String str = "JobVertex: " + vertex.getID().toString() + ", " + vertex.getName() + "\n"; +/// HashMap confData = (HashMap) vertex.getConfiguration().toMap(); +/// if (confData.containsKey("driver.class")) { +/// str += "CKATSAK: driver.class is \"" + confData.get("driver.class").toString() + "\"\n"; +/// } +/// if (confData.containsKey("udf")) { +/// str += "CKATSAK: found a UDF -- (getClass(): " + confData.get("udf").getClass() + ")\n"; +/// //if (confData.get("udf") instanceof byte[]) { +/// // byte[] udf = (byte[]) confData.get("udf"); +/// // str += "CKATSAK: (udf size == " + udf.length + " bytes)\n"; +/// //} +/// str += "CKATSAK: (UDF = " + confData.get("udf") + ")\n"; +/// } +/// logger.finest(str); +/// } + for (JobVertex vertex : jobGraph.getVerticesSortedTopologicallyFromSources()) { + // General information about the JobVertex at hand: + String inspectionMsg = "JobVertex: " + vertex.getID().toString() + "\n"; + inspectionMsg += new GsonBuilder().setPrettyPrinting().create().toJson(vertex.getID()) + "\n"; + inspectionMsg += "name: \"" + vertex.getName() + "\"\n"; + final SlotSharingGroup ssg = vertex.getSlotSharingGroup(); + final CoLocationGroup clg = vertex.getCoLocationGroup(); + inspectionMsg += "SlotSharingGroup: " + (ssg != null ? ssg.toString() : null) + + ";\nCoLocationGroup: " + (clg != null ? clg.toString() : null) + "\n"; + // Create the `ConfigOption` objects to query the `Configuration` object: + final ConfigOption driverClassOption = ConfigOptions + .key("driver.class") + .noDefaultValue(); +// final ConfigOption udfOption = ConfigOptions +// .key("udf") +// .defaultValue(null); + // Use the `ConfigOption`s to query the `Configuration`: + if (vertex.getConfiguration().contains(driverClassOption)) { + final String driverClass = vertex.getConfiguration().getValue(driverClassOption); + inspectionMsg += "CKATSAK: driver.class is \"" + driverClass + "\"\n"; + } + final byte[] udf = vertex.getConfiguration().getBytes("udf", null); + inspectionMsg += "CKATSAK: found a UDF (size = " + (udf != null ? udf.length : 0) + "):\n" + udf; + logger.finest(inspectionMsg); + + inspectionMsg = ""; + final HashMap confData = (HashMap) vertex.getConfiguration().toMap(); + for (Map.Entry entry : confData.entrySet()) { + inspectionMsg += "\"" + entry.getKey() + "\" : \"" + entry.getValue() + "\"\n"; + } + logger.finest("Whole configuration:\n" + inspectionMsg + "\n"); + } + HaierExecutionGraph.logOffloadability(jobGraph); + } + }