Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fsck orphan data #91

Merged
merged 5 commits into from
May 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class CassandraAppStorage extends AbstractAppStorage {

public static final String ORPHAN_NODE = "ORPHAN_NODE";

public static final String ORPHAN_DATA = "ORPHAN_DATA";

private final String fileSystemName;

private final Supplier<CassandraContext> contextSupplier;
Expand Down Expand Up @@ -833,9 +835,7 @@ private UUID deleteNode(UUID nodeUuid) {
statements.add(delete().from(DEPENDENCIES).where(eq(FROM_ID, nodeUuid)));
statements.add(delete().from(BACKWARD_DEPENDENCIES).where(in(TO_ID, new ArrayList<>(dependencies.values()))));

for (Statement statement : statements) {
getSession().execute(statement);
}
executeStatements(statements);

backwardDependencies.entrySet().stream().flatMap(dep -> dep.getValue().stream().map(depUuid -> Pair.of(dep.getKey(), depUuid))).forEach(dep -> {
pushEvent(new DependencyRemoved(dep.getValue().toString(), dep.getKey()), APPSTORAGE_DEPENDENCY_TOPIC);
Expand Down Expand Up @@ -1119,9 +1119,7 @@ public boolean removeData(String nodeId, String name) {

List<Statement> statements = new ArrayList<>();
removeData(nodeUuid, name, statements);
for (Statement statement : statements) {
getSession().execute(statement);
}
executeStatements(statements);

return true;
}
Expand Down Expand Up @@ -1351,9 +1349,7 @@ public void clearTimeSeries(String nodeId) {

List<Statement> statements = new ArrayList<>();
clearTimeSeries(nodeUuid, statements);
for (Statement statement : statements) {
getSession().execute(statement);
}
executeStatements(statements);
pushEvent(new TimeSeriesCleared(nodeUuid.toString()), APPSTORAGE_TIMESERIES_TOPIC);
}

Expand Down Expand Up @@ -1491,7 +1487,8 @@ public void close() {

@Override
public List<String> getSupportedFileSystemChecks() {
return ImmutableList.of(FileSystemCheckOptions.EXPIRED_INCONSISTENT_NODES, REF_NOT_FOUND, ORPHAN_NODE);
return ImmutableList.of(FileSystemCheckOptions.EXPIRED_INCONSISTENT_NODES,
REF_NOT_FOUND, ORPHAN_NODE, ORPHAN_DATA);
}

@Override
Expand All @@ -1510,6 +1507,9 @@ public List<FileSystemCheckIssue> checkFileSystem(FileSystemCheckOptions options
case ORPHAN_NODE:
checkOrphanNode(results, options);
break;
case ORPHAN_DATA:
checkOrphanData(results, options);
break;
default:
LOGGER.warn("Check {} not supported in {}", type, getClass());
}
Expand All @@ -1518,6 +1518,39 @@ public List<FileSystemCheckIssue> checkFileSystem(FileSystemCheckOptions options
return results;
}

private void checkOrphanData(List<FileSystemCheckIssue> results, FileSystemCheckOptions options) {
Set<UUID> existingNodeIds = new HashSet<>();
Set<UUID> orphanDataIds = new HashSet<>();
ResultSet existingNodes = getSession().execute(select(ID).distinct().from(CHILDREN_BY_NAME_AND_CLASS));
for (Row row : existingNodes) {
existingNodeIds.add(row.getUUID(ID));
}
ResultSet nodeDatas = getSession().execute(select(ID, NAME).distinct().from(NODE_DATA));
for (Row row : nodeDatas) {
UUID uuid = row.getUUID(ID);
if (!existingNodeIds.contains(uuid)) {
orphanDataIds.add(uuid);
FileSystemCheckIssue issue = new FileSystemCheckIssue().setNodeName("N/A")
.setNodeId(uuid.toString())
.setType(ORPHAN_DATA)
.setDescription("Orphan data(" + row.getString(NAME) + ") is binding to non-existing node(" + uuid + ")")
.setRepaired(options.isRepair());
if (options.isRepair()) {
issue.setRepaired(true)
.setResolutionDescription("Delete orphan data(" + row.getString(NAME) + ").");
}
results.add(issue);
}
}
if (options.isRepair()) {
List<Statement> statements = new ArrayList<>();
for (UUID id : orphanDataIds) {
removeAllData(id, statements);
}
executeStatements(statements);
}
}

private void checkOrphanNode(List<FileSystemCheckIssue> results, FileSystemCheckOptions options) {
// get all child id which parent name is null
ResultSet resultSet = getSession().execute(select(ID, CHILD_ID, NAME, CHILD_NAME).from(CHILDREN_BY_NAME_AND_CLASS));
Expand Down Expand Up @@ -1577,9 +1610,13 @@ private void checkReferenceNotFound(List<FileSystemCheckIssue> results, FileSyst
}
}
if (options.isRepair()) {
for (Statement statement : statements) {
getSession().execute(statement);
}
executeStatements(statements);
}
}

private void executeStatements(List<Statement> statements) {
for (Statement statement : statements) {
getSession().execute(statement);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,39 @@ protected void nextDependentTests() {
testSupportedChecks();
testInconsistendNodeRepair();
testAbsentChildRepair();
testGetParentWithInconsistentChild();
testOrphanNodeRepair();
testOrphanDataRepair();
testGetParentWithInconsistentChild();
}

private void testOrphanDataRepair() {
NodeInfo rootFolderInfo = storage.createRootNodeIfNotExists(storage.getFileSystemName(), FOLDER_PSEUDO_CLASS);
try (OutputStream os = storage.writeBinaryData(rootFolderInfo.getId(), "should_exist")) {
os.write("word2".getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
fail();
}
String orphanDataId = UUIDs.timeBased().toString();
try (OutputStream os = storage.writeBinaryData(orphanDataId, "blob")) {
os.write("word2".getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
fail();
}
assertThat(storage.getDataNames(orphanDataId)).containsOnly("blob");
assertAfsNodeNotFound(orphanDataId);

FileSystemCheckOptions repairOption = new FileSystemCheckOptionsBuilder()
.addCheckTypes(CassandraAppStorage.ORPHAN_DATA)
.repair().build();
List<FileSystemCheckIssue> issues = storage.checkFileSystem(repairOption);
assertThat(issues).hasOnlyOneElementSatisfying(i -> {
assertEquals(orphanDataId, i.getNodeId());
assertEquals(CassandraAppStorage.ORPHAN_DATA, i.getType());
assertEquals("N/A", i.getNodeName());
});

assertTrue(storage.dataExists(rootFolderInfo.getId(), "should_exist"));
assertFalse(storage.dataExists(orphanDataId, "blob"));
}

private void testOrphanNodeRepair() {
Expand All @@ -73,16 +104,16 @@ private void testOrphanNodeRepair() {
.repair().build();
List<FileSystemCheckIssue> issues = storage.checkFileSystem(repairOption);
assertThat(issues).hasOnlyOneElementSatisfying(i -> assertEquals(orphanNode.getId(), i.getNodeId()));
assertThatThrownBy(() -> storage.getNodeInfo(orphanNode.getId()))
.isInstanceOf(CassandraAfsException.class)
.hasMessageContaining("not found");
assertThatThrownBy(() -> storage.getParentNode(orphanNode.getId()))
.isInstanceOf(CassandraAfsException.class)
.hasMessageContaining("not found");
assertThatThrownBy(() -> storage.getNodeInfo(orphanChild.getId()))
assertAfsNodeNotFound(orphanNode.getId());
assertAfsNodeNotFound(orphanNode.getId());
assertAfsNodeNotFound(orphanChild.getId());
assertThat(storage.getDataNames(orphanNode.getId())).isEmpty();
}

private void assertAfsNodeNotFound(String id) {
assertThatThrownBy(() -> storage.getNodeInfo(id))
.isInstanceOf(CassandraAfsException.class)
.hasMessageContaining("not found");
assertThat(storage.getDataNames(orphanNode.getId())).isEmpty();
}

void testInconsistendNodeRepair() {
Expand Down Expand Up @@ -182,7 +213,11 @@ void testAbsentChildRepair() {

void testSupportedChecks() {
assertThat(storage.getSupportedFileSystemChecks())
.containsExactlyInAnyOrder(CassandraAppStorage.REF_NOT_FOUND, FileSystemCheckOptions.EXPIRED_INCONSISTENT_NODES, CassandraAppStorage.ORPHAN_NODE);
.containsExactlyInAnyOrder(CassandraAppStorage.REF_NOT_FOUND,
FileSystemCheckOptions.EXPIRED_INCONSISTENT_NODES,
CassandraAppStorage.ORPHAN_NODE,
CassandraAppStorage.ORPHAN_DATA
);
}

private NodeInfo createFolder(NodeInfo parent, String name) {
Expand Down