Skip to content

Commit

Permalink
Add debugging logging
Browse files Browse the repository at this point in the history
Signed-off-by: Christos Katsakioris <[email protected]>
  • Loading branch information
ckatsak committed Oct 8, 2020
1 parent 3794444 commit e680366
Showing 1 changed file with 66 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@
import gr.ntua.ece.cslab.e2datascheduler.util.HaierLogHandler;
import gr.ntua.ece.cslab.e2datascheduler.util.SelectionQueue;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;

import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
Expand All @@ -26,9 +31,12 @@

import com.google.gson.GsonBuilder;

import java.util.ArrayList;
import java.util.Base64;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.concurrent.BlockingQueue;
import java.util.logging.Logger;
Expand Down Expand Up @@ -242,6 +250,8 @@ public Response flink_schedule(
"}" +
"}";

debuggingInspection(jobGraph); // FIXME(ckatsak) debugging logging

final E2dScheduler.SchedulingResult result =
this.scheduler.schedule(jobGraph, OptimizationPolicy.parseJSON(policyStr));
if (null == result) {
Expand Down Expand Up @@ -283,5 +293,61 @@ private static void persistSerializedJobGraphFile(
out.close();
}


// --------------------------------------------------------------------------------------------


private static void debuggingInspection(final JobGraph jobGraph) {
/// for (JobVertex vertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
/// String str = "JobVertex: " + vertex.getID().toString() + ", " + vertex.getName() + "\n";
/// HashMap confData = (HashMap) vertex.getConfiguration().toMap();
/// if (confData.containsKey("driver.class")) {
/// str += "CKATSAK: driver.class is \"" + confData.get("driver.class").toString() + "\"\n";
/// }
/// if (confData.containsKey("udf")) {
/// str += "CKATSAK: found a UDF -- (getClass(): " + confData.get("udf").getClass() + ")\n";
/// //if (confData.get("udf") instanceof byte[]) {
/// // byte[] udf = (byte[]) confData.get("udf");
/// // str += "CKATSAK: (udf size == " + udf.length + " bytes)\n";
/// //}
/// str += "CKATSAK: (UDF = " + confData.get("udf") + ")\n";
/// }
/// logger.finest(str);
/// }
for (JobVertex vertex : jobGraph.getVerticesSortedTopologicallyFromSources()) {
// General information about the JobVertex at hand:
String inspectionMsg = "JobVertex: " + vertex.getID().toString() + "\n";
inspectionMsg += new GsonBuilder().setPrettyPrinting().create().toJson(vertex.getID()) + "\n";
inspectionMsg += "name: \"" + vertex.getName() + "\"\n";
final SlotSharingGroup ssg = vertex.getSlotSharingGroup();
final CoLocationGroup clg = vertex.getCoLocationGroup();
inspectionMsg += "SlotSharingGroup: " + (ssg != null ? ssg.toString() : null) +
";\nCoLocationGroup: " + (clg != null ? clg.toString() : null) + "\n";
// Create the `ConfigOption` objects to query the `Configuration` object:
final ConfigOption<String> driverClassOption = ConfigOptions
.key("driver.class")
.noDefaultValue();
// final ConfigOption<byte[]> udfOption = ConfigOptions
// .key("udf")
// .defaultValue(null);
// Use the `ConfigOption`s to query the `Configuration`:
if (vertex.getConfiguration().contains(driverClassOption)) {
final String driverClass = vertex.getConfiguration().getValue(driverClassOption);
inspectionMsg += "CKATSAK: driver.class is \"" + driverClass + "\"\n";
}
final byte[] udf = vertex.getConfiguration().getBytes("udf", null);
inspectionMsg += "CKATSAK: found a UDF (size = " + (udf != null ? udf.length : 0) + "):\n" + udf;
logger.finest(inspectionMsg);

inspectionMsg = "";
final HashMap<String, String> confData = (HashMap<String, String>) vertex.getConfiguration().toMap();
for (Map.Entry<String, String> entry : confData.entrySet()) {
inspectionMsg += "\"" + entry.getKey() + "\" : \"" + entry.getValue() + "\"\n";
}
logger.finest("Whole configuration:\n" + inspectionMsg + "\n");
}
HaierExecutionGraph.logOffloadability(jobGraph);
}

}

0 comments on commit e680366

Please sign in to comment.