diff --git a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java index e622503..c8b202c 100644 --- a/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java +++ b/src/main/java/gr/ntua/ece/cslab/e2datascheduler/ws/SchedulerService.java @@ -19,10 +19,13 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -435,6 +438,33 @@ private void debuggingInspection(final JobGraph jobGraph) { // ==================================================================================== // ^^^ User's lambda deserialization ^^^ // ==================================================================================== + + if (vertex instanceof InputOutputFormatVertex) { + inspectionMsg = "\n\nINPUTOUTPUTFORMATVERTEX FOUND!\n"; + final InputOutputFormatVertex iofVertex = (InputOutputFormatVertex) vertex; + + inspectionMsg += "\nOperatorIDPairs:\n"; + for (OperatorIDPair operatorIDPair : iofVertex.getOperatorIDs()) { + inspectionMsg += "\n - (generated):\t\t" + operatorIDPair.getGeneratedOperatorID(); + inspectionMsg += "\n - (user-defined):\t" + operatorIDPair.getUserDefinedOperatorID(); + inspectionMsg += "\n"; + + inspectionMsg += "\t* " + iofVertex.getFormatDescription(operatorIDPair.getGeneratedOperatorID()); + } + inspectionMsg += "\n"; + + for (JobVertex u : jobGraph.getVerticesSortedTopologicallyFromSources()) { + for (OperatorIDPair operatorIDPair : u.getOperatorIDs()) { + final OperatorID operatorID = operatorIDPair.getGeneratedOperatorID(); + final String fd = iofVertex.getFormatDescription(operatorID); + if (null != fd) { + inspectionMsg += "FOUND AN ENTRY:\t" + operatorID.toString() + " --> " + fd + "\n"; + } + } + } + + logger.finest(inspectionMsg + "\n\n"); + } } HaierExecutionGraph.logOffloadability(jobGraph); @@ -487,6 +517,15 @@ private void debuggingInspection(final JobGraph jobGraph) { // ======================================================================================== // ^^^ User's JAR inspection/manipulation ^^^ // ======================================================================================== + + + // ======================================================================================== + // vvv HDFS Path Extraction vvv + // ======================================================================================== + // TODO? + // ======================================================================================== + // ^^^ HDFS Path Extraction ^^^ + // ======================================================================================== } /*