Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2137]merged dagNodeStateStore and failedDagNodeStateStore tables #4032

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class Dag<T> {
// Map to maintain parent to children mapping.
private Map<DagNode, List<DagNode<T>>> parentChildMap;
private List<DagNode<T>> nodes;
@Setter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we do not persist dag level field in mysql, adding fields to Dag will not be much useful and may lead to bugs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to have this field here as we didn't want to add additional parameters in all the methods to pass on is_failed value.

private boolean isFailedDag;

@Setter
@Deprecated // because this field is not persisted in mysql and contains information in very limited cases
Expand Down Expand Up @@ -259,11 +261,17 @@ public static class DagNode<T> {
private T value;
//List of parent Nodes that are dependencies of this Node.
private List<DagNode<T>> parentNodes;
@Setter
private boolean isFailedDag;

//Constructor
public DagNode(T value) {
this.value = value;
}
public DagNode(T value,boolean isFailedDag) {
this.value = value;
this.isFailedDag = isFailedDag;
}


public void addParentNode(DagNode<T> node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,11 @@ public interface DagManagementStateStore {

/**
* This marks the dag as a failed one.
* Failed dags are queried using {@link DagManagementStateStore#getFailedDag(DagManager.DagId)} ()} later to be retried.
* Failed dags are queried using {@link DagManagementStateStore#getDag(DagManager.DagId)} ()} later to be retried.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it remain useful to both retrieve the DAG while also asserting that it's failed?

* @param dagId failing dag's dagId
*/
void markDagFailed(DagManager.DagId dagId) throws IOException;

/**
* Returns the failed dag.
* If the dag is not found because it was never marked as failed through
* {@link DagManagementStateStore#markDagFailed(org.apache.gobblin.service.modules.orchestration.DagManager.DagId)},
* it returns Optional.absent.
* @param dagId dag id of the failed dag
*/
Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws IOException;

void deleteFailedDag(DagManager.DagId dagId) throws IOException;

/**
* Adds state of a {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode} to the store.
* Note that a DagNode is a part of a Dag and must already be present in the store through
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public interface DagStateStoreWithDagNodes extends DagStateStore {
* Returns 1 if the dag node is inserted as a new one, 2 if is updated, and 0 if new dag node is same as the existing one
* <a href="https://dev.mysql.com/doc/refman/8.4/en/insert-on-duplicate.html">Refer</a>
*/
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
int updateDagNode(DagManager.DagId dagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag) throws IOException;
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved


/**
* Returns all the {@link org.apache.gobblin.service.modules.flowgraph.Dag.DagNode}s for the given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,12 @@
@Slf4j
@Singleton
public class MySqlDagManagementStateStore implements DagManagementStateStore {
// todo - these two stores should merge
private DagStateStoreWithDagNodes dagStateStore;
private DagStateStoreWithDagNodes failedDagStateStore;
private final JobStatusRetriever jobStatusRetriever;
private boolean dagStoresInitialized = false;
private final UserQuotaManager quotaManager;
Map<URI, TopologySpec> topologySpecMap;
private final Config config;
public static final String FAILED_DAG_STATESTORE_PREFIX = "failedDagStateStore";
public static final String DAG_STATESTORE_CLASS_KEY = DagManager.DAG_MANAGER_PREFIX + "dagStateStoreClass";
FlowCatalog flowCatalog;
@Getter
Expand All @@ -91,8 +88,6 @@ public MySqlDagManagementStateStore(Config config, FlowCatalog flowCatalog, User
private synchronized void start() {
if (!dagStoresInitialized) {
this.dagStateStore = createDagStateStore(config, topologySpecMap);
this.failedDagStateStore = createDagStateStore(ConfigUtils.getConfigOrEmpty(config, FAILED_DAG_STATESTORE_PREFIX).withFallback(config),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any ideas on handling migration when we roll this out (presuming the failed DagStateStore was not empty)?

topologySpecMap);
// This implementation does not need to update quota usage when the service restarts or when its leadership status
// changes because quota usage are persisted in mysql table. For the same reason, there is no need to call getDags also.
// Also, calling getDags during startUp may fail, because the topologies that are required to deserialize dags may
Expand Down Expand Up @@ -134,10 +129,8 @@ public void addDag(Dag<JobExecutionPlan> dag) throws IOException {
@Override
public void markDagFailed(DagManager.DagId dagId) throws IOException {
Dag<JobExecutionPlan> dag = this.dagStateStore.getDag(dagId);
this.failedDagStateStore.writeCheckpoint(dag);
this.dagStateStore.cleanUp(dagId);
// todo - updated failedDagStateStore iff cleanup returned 1
// or merge dagStateStore and failedDagStateStore and change the flag that marks a dag `failed`
dag.setFailedDag(true);
this.dagStateStore.writeCheckpoint(dag);
log.info("Marked dag failed {}", dagId);
}

Expand All @@ -147,21 +140,10 @@ public void deleteDag(DagManager.DagId dagId) throws IOException {
log.info("Deleted dag {}", dagId);
}

@Override
public void deleteFailedDag(DagManager.DagId dagId) throws IOException {
this.failedDagStateStore.cleanUp(dagId);
log.info("Deleted failed dag {}", dagId);
}

@Override
public Optional<Dag<JobExecutionPlan>> getFailedDag(DagManager.DagId dagId) throws IOException {
return Optional.of(this.failedDagStateStore.getDag(dagId));
}

@Override
public synchronized void addDagNodeState(Dag.DagNode<JobExecutionPlan> dagNode, DagManager.DagId dagId)
throws IOException {
this.dagStateStore.updateDagNode(dagId, dagNode);
this.dagStateStore.updateDagNode(dagId, dagNode, false);// isFailedDag is set as false because addDagNodeState adds a new DagNode, doesn't update an existing dagNode as failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space before starting a comment. also more brevity; e.g.:

// create all DagNodes as isFailedDag == false

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,18 @@ public class MysqlDagStateStoreWithDagNodes implements DagStateStoreWithDagNodes
protected final GsonSerDe<List<JobExecutionPlan>> serDe;
private final JobExecutionPlanDagFactory jobExecPlanDagFactory;

// todo add a column that tells if it is a running dag or a failed dag
protected static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH + ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, "
+ "parent_dag_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, "
+ "dag_node JSON NOT NULL, "
+ "dag_node_id VARCHAR(" + ServiceConfigKeys.MAX_DAG_NODE_ID_LENGTH
+ ") CHARACTER SET latin1 COLLATE latin1_bin NOT NULL, " + "parent_dag_id VARCHAR("
+ ServiceConfigKeys.MAX_DAG_ID_LENGTH + ") NOT NULL, " + "dag_node JSON NOT NULL, "
+ "modified_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, "
+ "PRIMARY KEY (dag_node_id), "
+ "UNIQUE INDEX dag_node_index (dag_node_id), "
+ "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node) "
+ "VALUES (?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node FROM %s WHERE dag_node_id = ?";
+ "is_failed_dag INT NOT NULL DEFAULT 0, " + "PRIMARY KEY (dag_node_id), "
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved
+ "UNIQUE INDEX dag_node_index (dag_node_id), " + "INDEX dag_index (parent_dag_id))";

protected static final String INSERT_STATEMENT = "INSERT INTO %s (dag_node_id, parent_dag_id, dag_node, is_failed_dag) "
+ "VALUES (?, ?, ?, ?) AS new ON DUPLICATE KEY UPDATE dag_node = new.dag_node, is_failed_dag = new.is_failed_dag";
protected static final String GET_DAG_NODES_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE parent_dag_id = ?";
protected static final String GET_DAG_NODE_STATEMENT = "SELECT dag_node,is_failed_dag FROM %s WHERE dag_node_id = ?";
pratapaditya04 marked this conversation as resolved.
Show resolved Hide resolved
protected static final String DELETE_DAG_STATEMENT = "DELETE FROM %s WHERE parent_dag_id = ?";
private final ContextAwareCounter totalDagCount;

Expand Down Expand Up @@ -126,12 +124,11 @@ public MysqlDagStateStoreWithDagNodes(Config config, Map<URI, TopologySpec> topo
}

@Override
public void writeCheckpoint(Dag<JobExecutionPlan> dag)
throws IOException {
public void writeCheckpoint(Dag<JobExecutionPlan> dag) throws IOException {
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);
boolean newDag = false;
for (Dag.DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
if (updateDagNode(dagId, dagNode) == 1) {
if (updateDagNode(dagId, dagNode, dag.isFailedDag()) == 1) {
newDag = true;
}
}
Expand All @@ -153,7 +150,8 @@ public boolean cleanUp(DagManager.DagId dagId) throws IOException {
return deleteStatement.executeUpdate() != 0;
} catch (SQLException e) {
throw new IOException(String.format("Failure deleting dag for %s", dagId), e);
}}, true);
}
}, true);
this.totalDagCount.dec();
return true;
}
Expand All @@ -167,7 +165,7 @@ public void cleanUp(String dagId) throws IOException {
@Override
public List<Dag<JobExecutionPlan>> getDags() throws IOException {
throw new NotSupportedException(getClass().getSimpleName() + " does not need this legacy API that originated with "
+ "the DagManager that is replaced by DagProcessingEngine"); }
+ "the DagManager that is replaced by DagProcessingEngine"); }

@Override
public Dag<JobExecutionPlan> getDag(DagManager.DagId dagId) throws IOException {
Expand All @@ -191,37 +189,48 @@ private Dag<JobExecutionPlan> convertDagNodesIntoDag(Set<Dag.DagNode<JobExecutio
if (dagNodes.isEmpty()) {
return null;
}
return jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList()));
Dag<JobExecutionPlan> dag = jobExecPlanDagFactory.createDag(dagNodes.stream().map(Dag.DagNode::getValue).collect(Collectors.toList()));

// if any node of the dag is failed it means that the dag has been marked as failed, update the is_failed_dag field of the dag and it's nodes as true
if (dag.getNodes().stream().anyMatch(Dag.DagNode::isFailedDag)) {
dag.setFailedDag(true);
dag.getNodes().forEach(node -> node.setFailedDag(true));
}
return dag;
}

@Override
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
public int updateDagNode(DagManager.DagId parentDagId, Dag.DagNode<JobExecutionPlan> dagNode, boolean isFailedDag)
throws IOException {
String dagNodeId = dagNode.getValue().getId().toString();
return dbStatementExecutor.withPreparedStatement(String.format(INSERT_STATEMENT, tableName), insertStatement -> {
try {
insertStatement.setString(1, dagNodeId);
insertStatement.setString(2, parentDagId.toString());
insertStatement.setString(3, this.serDe.serialize(Collections.singletonList(dagNode.getValue())));
insertStatement.setInt(4, isFailedDag ? 1 : 0);
return insertStatement.executeUpdate();
} catch (SQLException e) {
throw new IOException(String.format("Failure adding dag node for %s", dagNodeId), e);
}}, true);
}
}, true);
}

@Override
public Set<Dag.DagNode<JobExecutionPlan>> getDagNodes(DagManager.DagId parentDagId) throws IOException {
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName), getStatement -> {
getStatement.setString(1, parentDagId.toString());
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
try (ResultSet rs = getStatement.executeQuery()) {
while (rs.next()) {
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
}
return dagNodes;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
}
}, true);
return dbStatementExecutor.withPreparedStatement(String.format(GET_DAG_NODES_STATEMENT, tableName),
getStatement -> {
getStatement.setString(1, parentDagId.toString());
HashSet<Dag.DagNode<JobExecutionPlan>> dagNodes = new HashSet<>();
try (ResultSet rs = getStatement.executeQuery()) {
while (rs.next()) {
dagNodes.add(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2)));
}
return dagNodes;
} catch (SQLException e) {
throw new IOException(String.format("Failure get dag nodes for dag %s", parentDagId), e);
}
}, true);
}

@Override
Expand All @@ -230,7 +239,7 @@ public Optional<Dag.DagNode<JobExecutionPlan>> getDagNode(DagNodeId dagNodeId) t
getStatement.setString(1, dagNodeId.toString());
try (ResultSet rs = getStatement.executeQuery()) {
if (rs.next()) {
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0)));
return Optional.of(new Dag.DagNode<>(this.serDe.deserialize(rs.getString(1)).get(0), rs.getBoolean(2)));
}
return Optional.empty();
} catch (SQLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public ResumeDagProc(ResumeDagTask resumeDagTask, Config config) {
@Override
protected Optional<Dag<JobExecutionPlan>> initialize(DagManagementStateStore dagManagementStateStore)
throws IOException {
return dagManagementStateStore.getFailedDag(getDagId());
return dagManagementStateStore.getDag(getDagId());
Copy link
Contributor

@phet phet Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we verify the one returned is actually failed?

}

@Override
Expand Down Expand Up @@ -92,7 +92,7 @@ protected void act(DagManagementStateStore dagManagementStateStore, Optional<Dag
dagManagementStateStore.addDag(failedDag.get());

// if it fails here, it will check point the failed dag in the (running) dag store again, which is idempotent
dagManagementStateStore.deleteFailedDag(getDagId());
dagManagementStateStore.deleteDag(getDagId());

DagProcUtils.submitNextNodes(dagManagementStateStore, failedDag.get(), getDagId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public static MySqlDagManagementStateStore getDummyDMSS(ITestMetastoreDatabase t
configBuilder.addPrimitive(MySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, MysqlDagStateStoreWithDagNodes.class.getName())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl())
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, "dag" + 1)
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER)
.addPrimitive(MySqlDagManagementStateStore.FAILED_DAG_STATESTORE_PREFIX
+ "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TEST_TABLE + 2);
.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
Config config = configBuilder.build();
JobStatusRetriever jobStatusRetriever = mock(JobStatusRetriever.class);
JobStatus dummyJobStatus = JobStatus.builder().flowName("fn").flowGroup("fg").jobGroup("fg").jobName("job0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,29 @@ public void testAddGetAndDeleteDag() throws Exception{
Assert.assertNull(this.dagStateStore.getDag(dagId1));
Assert.assertNull(this.dagStateStore.getDag(dagId2));
}

@Test
public void testMarkDagAsFailed() throws Exception {
//Set up initial conditions
Dag<JobExecutionPlan> dag = DagTestUtils.buildDag("test_dag", 789L);
DagManager.DagId dagId = DagManagerUtils.generateDagId(dag);

this.dagStateStore.writeCheckpoint(dag);
//Check Initial State
for (Dag.DagNode<JobExecutionPlan> node : dag.getNodes()) {
Assert.assertFalse(node.isFailedDag());
}
dag.setFailedDag(true);
this.dagStateStore.writeCheckpoint(dag);

Dag<JobExecutionPlan> updatedDag = this.dagStateStore.getDag(dagId);
for (Dag.DagNode<JobExecutionPlan> node : updatedDag.getNodes()) {
Assert.assertTrue(node.isFailedDag());
}

// Cleanup
dagStateStore.cleanUp(dagId);
Assert.assertNull(this.dagStateStore.getDag(dagId));
}

}
Loading