Skip to content

Commit

Permalink
WIP: explore the possibility of Path extraction
Browse files Browse the repository at this point in the history
Signed-off-by: Christos Katsakioris <[email protected]>
  • Loading branch information
ckatsak committed Jan 22, 2021
1 parent 7544ec9 commit f231f41
Showing 1 changed file with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
Expand Down Expand Up @@ -435,6 +438,33 @@ private void debuggingInspection(final JobGraph jobGraph) {
// ====================================================================================
// ^^^ User's lambda deserialization ^^^
// ====================================================================================

if (vertex instanceof InputOutputFormatVertex) {
inspectionMsg = "\n\nINPUTOUTPUTFORMATVERTEX FOUND!\n";
final InputOutputFormatVertex iofVertex = (InputOutputFormatVertex) vertex;

inspectionMsg += "\nOperatorIDPairs:\n";
for (OperatorIDPair operatorIDPair : iofVertex.getOperatorIDs()) {
inspectionMsg += "\n - (generated):\t\t" + operatorIDPair.getGeneratedOperatorID();
inspectionMsg += "\n - (user-defined):\t" + operatorIDPair.getUserDefinedOperatorID();
inspectionMsg += "\n";

inspectionMsg += "\t* " + iofVertex.getFormatDescription(operatorIDPair.getGeneratedOperatorID());
}
inspectionMsg += "\n";

for (JobVertex u : jobGraph.getVerticesSortedTopologicallyFromSources()) {
for (OperatorIDPair operatorIDPair : u.getOperatorIDs()) {
final OperatorID operatorID = operatorIDPair.getGeneratedOperatorID();
final String fd = iofVertex.getFormatDescription(operatorID);
if (null != fd) {
inspectionMsg += "FOUND AN ENTRY:\t" + operatorID.toString() + " --> " + fd + "\n";
}
}
}

logger.finest(inspectionMsg + "\n\n");
}
}
HaierExecutionGraph.logOffloadability(jobGraph);

Expand Down Expand Up @@ -487,6 +517,15 @@ private void debuggingInspection(final JobGraph jobGraph) {
// ========================================================================================
// ^^^ User's JAR inspection/manipulation ^^^
// ========================================================================================


// ========================================================================================
// vvv HDFS Path Extraction vvv
// ========================================================================================
// TODO?
// ========================================================================================
// ^^^ HDFS Path Extraction ^^^
// ========================================================================================
}

/*
Expand Down

0 comments on commit f231f41

Please sign in to comment.