Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
arjun4084346 committed Sep 11, 2024
1 parent 43d53e5 commit fc6f745
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ public static Set<DagNode<JobExecutionPlan>> getNext(Dag<JobExecutionPlan> dag)
DagNode<JobExecutionPlan> node = nodesToExpand.poll();
ExecutionStatus executionStatus = getExecutionStatus(node);
boolean addFlag = true;
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME ||
executionStatus == SKIPPED) {
if (executionStatus == PENDING || executionStatus == PENDING_RETRY || executionStatus == PENDING_RESUME) {
//Add a node to be executed next, only if all of its parent nodes are COMPLETE.
List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
Expand Down Expand Up @@ -183,7 +184,7 @@ public static void cancelDagNode(Dag.DagNode<JobExecutionPlan> dagNodeToCancel)
}

/**
* Emits JOB_SKIPPED GTE for each of the dependent job.
* Emits JOB_SKIPPED GTE for each of the dependent jobs.
*/
public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, Dag.DagNode<JobExecutionPlan> node) {
Set<Dag.DagNode<JobExecutionPlan>> dependentJobs = new HashSet<>();
Expand All @@ -195,11 +196,11 @@ public static void sendSkippedEventForDependentJobs(Dag<JobExecutionPlan> dag, D
}

private static void findDependentJobs(Dag<JobExecutionPlan> dag,
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> dependentJobs) {
Dag.DagNode<JobExecutionPlan> node, Set<Dag.DagNode<JobExecutionPlan>> result) {
for (Dag.DagNode<JobExecutionPlan> child : dag.getChildren(node)) {
if (!dependentJobs.contains(child)) {
dependentJobs.add(child);
findDependentJobs(dag, child, dependentJobs);
if (!result.contains(child)) {
result.add(child);
findDependentJobs(dag, child, result);
}
}
}
Expand All @@ -223,7 +224,7 @@ private static void sendJobCancellationEvent(Dag.DagNode<JobExecutionPlan> dagNo
* Sets {@link Dag#flowEvent} and emits a {@link GobblinTrackingEvent} of the provided
* flow event type.
*/
public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String flowEvent) {
public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter, Dag<JobExecutionPlan> dag, String flowEvent) {
if (!dag.isEmpty()) {
// Every dag node will contain the same flow metadata
Config config = DagUtils.getDagJobConfig(dag);
Expand All @@ -234,7 +235,7 @@ public static void setAndEmitFlowEvent(Dag<JobExecutionPlan> dag, String flowEve
flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
}

DagProc.eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
}

dag.get().setMessage("Flow killed by request");
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.setAndEmitFlowEvent(DagProc.eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_CANCELLED);

if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel = dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(), getDagId());
DagProcUtils.setAndEmitFlowEvent(dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
DagProcUtils.setAndEmitFlowEvent(DagProc.eventSubmitter, dag.get(), TimingEvent.FlowTimings.FLOW_RUNNING);
dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagUtils.getFlowId(dag.get()),
Dag.FlowState.RUNNING);
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore, getDagTask().getDagAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Pair<Optiona
} else if (DagProcUtils.isDagFinished(dag)) {
String flowEvent = DagProcUtils.calcFlowStatus(dag);
dag.setFlowEvent(flowEvent);
DagProcUtils.setAndEmitFlowEvent(dag, flowEvent);
DagProcUtils.setAndEmitFlowEvent(DagProc.eventSubmitter, dag, flowEvent);
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
// todo - verify if work from PR#3641 is required
dagManagementStateStore.deleteDag(getDagId());
Expand Down Expand Up @@ -167,7 +167,6 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
case CANCELLED:
case SKIPPED:
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
DagProcUtils.sendSkippedEventForDependentJobs(dag, dagNode);
break;
Expand All @@ -177,6 +176,9 @@ private void onJobFinish(DagManagementStateStore dagManagementStateStore, Dag.Da
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, getDagId());
}
break;
case SKIPPED:
// no action needed for a skipped job
break;
default:
log.warn("It should not reach here. Job status {} is unexpected.", executionStatus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
long flowResumeTime = System.currentTimeMillis();

// Set the flow and its failed or cancelled nodes to PENDING_RESUME so that the flow will be resumed from the point before it failed
DagProcUtils.setAndEmitFlowEvent(failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
DagProcUtils.setAndEmitFlowEvent(DagProc.eventSubmitter, failedDag.get(), TimingEvent.FlowTimings.FLOW_PENDING_RESUME);

for (Dag.DagNode<JobExecutionPlan> node : failedDag.get().getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
Expand Down

0 comments on commit fc6f745

Please sign in to comment.