diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java index 29ffc485e02..5f6e73ebd6c 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flowgraph/Dag.java @@ -48,6 +48,8 @@ public class Dag { // Map to maintain parent to children mapping. private Map>> parentChildMap; private List> nodes; + @Setter + private boolean isFailedDag; @Setter @Deprecated // because this field is not persisted in mysql and contains information in very limited cases diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java index 139082545b2..e1979ee71c3 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementStateStore.java @@ -77,22 +77,11 @@ public interface DagManagementStateStore { /** * This marks the dag as a failed one. - * Failed dags are queried using {@link DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried. + * Failed dags are queried using {@link DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried. * @param dagId failing dag's dagId */ void markDagFailed(DagManager.DagId dagId) throws IOException; - /** - * Returns the failed dag. - * If the dag is not found because it was never marked as failed through - * {@link DagManagementStateStore#markDagFailed(org.apache.gobblin.service.modules.orchestration.DagManager.DagId)}, - * it returns Optional.absent. - * @param dagId dag id of the failed dag - */ - Optional> getFailedDag(DagManager.DagId dagId) throws IOException; - - void deleteFailedDag(DagManager.DagId dagId) throws IOException; - /** * Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store. * Note that a DagNode is a part of a Dag and must already be present in the store through diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java index 03aaf41520c..64c11f81e12 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagStateStoreWithDagNodes.java @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore { * Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one * Refer */ - int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode) throws IOException; + int updateDagNode(DagManager.DagId dagId, Dag.DagNode dagNode, boolean isFailedDag) throws IOException; + /** * Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java index c0984f835b2..7ab6e8b34aa 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MySqlDagManagementStateStore.java @@ -61,15 +61,12 @@ @Slf4j @Singleton public class MySqlDagManagementStateStore implements DagManagementStateStore { - // todo - these two stores should merge private DagStateStoreWithDagNodes dagStateStore; - private DagStateStoreWithDagNodes failedDagStateStore; private final JobStatusRetriever jobStatusRetriever; private boolean dagStoresInitialized = false; private final UserQuotaManager quotaManager; Map topologySpecMap; private final Config config; - public static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore"; public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass"; FlowCatalog flowCatalog; @Getter @@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, User private synchronized void start() { if (!dagStoresInitialized) { this.dagStateStore = createDagStateStore(config, topologySpecMap); - this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config), - topologySpecMap); // This implementation does not need to update quota usage when the service restarts or when its leadership status // changes because quota usage are persisted in mysql table. For the same reason, there is no need to call getDags also. // Also, calling getDags during startUp may fail, because the topologies that are required to deserialize dags may @@ -134,10 +129,8 @@ public void addDag(Dag dag) throws IOException { @Override public void markDagFailed(DagManager.DagId dagId) throws IOException { Dag dag = this.dagStateStore.getDag(dagId); - this.failedDagStateStore.writeCheckpoint(dag); - this.dagStateStore.cleanUp(dagId); - // todo - updated failedDagStateStore iff cleanup returned 1 - // or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed` + dag.setFailedDag(true); + this.dagStateStore.writeCheckpoint(dag); log.info("Marked dag failed {}", dagId); } @@ -147,21 +140,10 @@ public void deleteDag(DagManager.DagId dagId) throws IOException { log.info("Deleted dag {}", dagId); } - @Override - public void deleteFailedDag(DagManager.DagId dagId) throws IOException { - this.failedDagStateStore.cleanUp(dagId); - log.info("Deleted failed dag {}", dagId); - } - - @Override - public Optional> getFailedDag(DagManager.DagId dagId) throws IOException { - return Optional.of(this.failedDagStateStore.getDag(dagId)); - } - @Override public synchronized void addDagNodeState(Dag.DagNode dagNode, DagManager.DagId dagId) throws IOException { - this.dagStateStore.updateDagNode(dagId, dagNode); + this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is set as false because addDagNodeState adds a new DagNode, doesn't update an existing dagNode as failed. } @Override diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java index 2692e20697a..afe24c740a2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStoreWithDagNodes.java @@ -77,19 +77,19 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes protected final GsonSerDe> serDe; private final JobExecutionPlanDagFactory jobExecPlanDagFactory; - // todo add a column that tells if it is a running dag or a failed dag - protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (" - + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " - + "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " - + "dag_node JSON NOT NULL, " - + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " - + "PRIMARY KEY (dag_node_id), " - + "UNIQUE INDEX dag_node_index (dag_node_id), " - + "INDEX dag_index (parent_dag_id))"; - - protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) " - + "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node"; - protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; + protected static final String CREATE_TABLE_STATEMENT = + "CREATE TABLE IF NOT EXISTS %s (" + "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR(" + + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, " + + "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, " + + "is_failed_dag TINYINT(1) DEFAULT 0, " + "PRIMARY KEY (dag_node_id), " + + "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))"; + + protected static final String INSERT_STATEMENT = + "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) " + + "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag"; + protected static final String GET_DAG_NODES_STATEMENT = + "SELECT dag_node FROM %s WHERE parent_dag_id = ?"; protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?"; protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?"; private final ContextAwareCounter totalDagCount; @@ -105,7 +105,8 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo DataSource dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker()); try (Connection connection = dataSource.getConnection(); - PreparedStatement createStatement = connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, tableName))) { + PreparedStatement createStatement = connection.prepareStatement( + String.format(CREATE_TABLE_STATEMENT, tableName))) { createStatement.executeUpdate(); connection.commit(); } catch (SQLException e) { @@ -126,12 +127,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map topo } @Override - public void writeCheckpoint(Dag dag) - throws IOException { + public void writeCheckpoint(Dag dag) throws IOException { DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); boolean newDag = false; for (Dag.DagNode dagNode : dag.getNodes()) { - if (updateDagNode(dagId, dagNode) == 1) { + if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) { newDag = true; } } @@ -167,7 +167,7 @@ public void cleanUp(String dagId) throws IOException { @Override public List> getDags() throws IOException { throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with " - + "the DagManager that is replaced by DagProcessingEngine"); } + + "the DagManager that is replaced by DagProcessingEngine");} @Override public Dag getDag(DagManager.DagId dagId) throws IOException { @@ -195,13 +195,14 @@ private Dag convertDagNodesIntoDag(Set dagNode) throws IOException { + public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode dagNode, boolean isFailedDag) throws IOException { String dagNodeId = dagNode.getValue().getId().toString(); return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> { try { insertStatement.setString(1, dagNodeId); insertStatement.setString(2, parentDagId.toString()); insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue()))); + insertStatement.setInt(4, isFailedDag ? 1 : 0); return insertStatement.executeUpdate(); } catch (SQLException e) { throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e); diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java index 8326d83f095..773b6a605e7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java @@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) { @Override protected Optional> initialize(DagManagementStateStore dagManagementStateStore) throws IOException { - return dagManagementStateStore.getFailedDag(getDagId()); + return dagManagementStateStore.getDag(getDagId()); } @Override @@ -92,7 +92,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional originalDag1 = DagTestUtils.buildDag("random_1", 123L); Dag originalDag2 = DagTestUtils.buildDag("random_2", 456L); DagManager.DagId dagId1 = DagManagerUtils.generateDagId(originalDag1); @@ -137,4 +152,56 @@ public void testAddGetAndDeleteDag() throws Exception{ Assert.assertNull(this.dagStateStore.getDag(dagId1)); Assert.assertNull(this.dagStateStore.getDag(dagId2)); } + + @Test + public void testMarkDagAsFailed() throws Exception { + // Set up initial conditions + Dag dag = DagTestUtils.buildDag("test_dag", 789L); + DagManager.DagId dagId = DagManagerUtils.generateDagId(dag); + + this.dagStateStore.writeCheckpoint(dag); + + // Fetch all initial states into a list + List initialStates = fetchDagNodeStates(dagId.toString()); + + // Check Initial State + for (Boolean state : initialStates) { + Assert.assertFalse(state); + } + // Set the DAG as failed + dag.setFailedDag(true); + this.dagStateStore.writeCheckpoint(dag); + + // Fetch all states after marking the DAG as failed + List failedStates = fetchDagNodeStates(dagId.toString()); + + // Check if all states are now true (indicating failure) + for (Boolean state : failedStates) { + Assert.assertTrue(state); + } + dagStateStore.cleanUp(dagId); + Assert.assertNull(this.dagStateStore.getDag(dagId)); + } + + private List fetchDagNodeStates(String dagId) throws IOException { + List states = new ArrayList<>(); + + dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> { + + getStatement.setString(1, dagId.toString()); + + HashSet> dagNodes = new HashSet<>(); + + try (ResultSet rs = getStatement.executeQuery()) { + while (rs.next()) { + states.add(rs.getBoolean(2)); + } + return dagNodes; + } catch (SQLException e) { + throw new IOException(String.format("Failure get dag nodes for dag %s", dagId), e); + } + }, true); + + return states; + } } \ No newline at end of file