Skip to content

Commit

Permalink
[hotfix] hotfix branch manager gets schema, tag and snapshot paths me…
Browse files Browse the repository at this point in the history
…thod to reduce constant string usage. (apache#4136)
  • Loading branch information
zhuangchong authored Sep 19, 2024
1 parent 105e50b commit f1da943
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -633,6 +634,13 @@ public Path toSchemaPath(long schemaId) {
return new Path(branchPath() + "/schema/" + SCHEMA_PREFIX + schemaId);
}

public List<Path> schemaPaths(Predicate<Long> predicate) throws IOException {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
.filter(predicate)
.map(this::toSchemaPath)
.collect(Collectors.toList());
}

/**
* Delete schema with specific id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand All @@ -29,12 +28,12 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.paimon.utils.FileUtils.listVersionedDirectories;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Manager for {@code Branch}. */
Expand Down Expand Up @@ -87,20 +86,7 @@ public Path branchPath(String branchName) {

/** Create empty branch. */
public void createBranch(String branchName) {
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(branchName),
"Branch name '%s' is blank.",
branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
validateBranch(branchName);

try {
TableSchema latestSchema = schemaManager.latest().get();
Expand All @@ -118,21 +104,8 @@ branchName, branchPath(tablePath, branchName)),
}

public void createBranch(String branchName, String tagName) {
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default branch and cannot be created.",
DEFAULT_MAIN_BRANCH));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(branchName),
"Branch name '%s' is blank.",
branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
validateBranch(branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);

Snapshot snapshot = tagManager.taggedSnapshot(tagName);

Expand Down Expand Up @@ -176,10 +149,7 @@ public void deleteBranch(String branchName) {
/** Check if path exists. */
public boolean fileExists(Path path) {
try {
if (fileIO.exists(path)) {
return true;
}
return false;
return fileIO.exists(path);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to determine if path '%s' exists.", path), e);
Expand All @@ -206,37 +176,15 @@ public void fastForward(String branchName) {
// Delete snapshot, schema, and tag from the main branch which occurs after
// earliestSnapshotId
List<Path> deleteSnapshotPaths =
listVersionedFileStatus(
fileIO, snapshotManager.snapshotDirectory(), "snapshot-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());
List<Path> deleteSchemaPaths =
listVersionedFileStatus(fileIO, schemaManager.schemaDirectory(), "schema-")
.map(FileStatus::getPath)
.filter(
path ->
TableSchema.fromPath(fileIO, path).id()
>= earliestSchemaId)
.collect(Collectors.toList());
snapshotManager.snapshotPaths(id -> id >= earliestSnapshotId);
List<Path> deleteSchemaPaths = schemaManager.schemaPaths(id -> id >= earliestSchemaId);
List<Path> deleteTagPaths =
listVersionedFileStatus(fileIO, tagManager.tagDirectory(), "tag-")
.map(FileStatus::getPath)
.filter(
path ->
Snapshot.fromPath(fileIO, path).id()
>= earliestSnapshotId)
.collect(Collectors.toList());
tagManager.tagPaths(
path -> Snapshot.fromPath(fileIO, path).id() >= earliestSnapshotId);

List<Path> deletePaths =
Stream.concat(
Stream.concat(
deleteSnapshotPaths.stream(),
deleteSchemaPaths.stream()),
deleteTagPaths.stream())
Stream.of(deleteSnapshotPaths, deleteSchemaPaths, deleteTagPaths)
.flatMap(Collection::stream)
.collect(Collectors.toList());

// Delete latest snapshot hint
Expand Down Expand Up @@ -280,4 +228,21 @@ public List<String> branches() {
throw new RuntimeException(e);
}
}

private void validateBranch(String branchName) {
checkArgument(
!isMainBranch(branchName),
String.format(
"Branch name '%s' is the default branch and cannot be used.",
DEFAULT_MAIN_BRANCH));
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(branchName),
"Branch name '%s' is blank.",
branchName);
checkArgument(!branchExists(branchName), "Branch name '%s' already exists.", branchName);
checkArgument(
!branchName.chars().allMatch(Character::isDigit),
"Branch name cannot be pure numeric string but is '%s'.",
branchName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -391,11 +391,18 @@ public long snapshotCount() throws IOException {

public Iterator<Snapshot> snapshots() throws IOException {
return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
.map(id -> snapshot(id))
.map(this::snapshot)
.sorted(Comparator.comparingLong(Snapshot::id))
.iterator();
}

public List<Path> snapshotPaths(Predicate<Long> predicate) throws IOException {
return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX)
.filter(predicate)
.map(this::snapshotPath)
.collect(Collectors.toList());
}

public Iterator<Snapshot> snapshotsWithinRange(
Optional<Long> optionalMaxSnapshotId, Optional<Long> optionalMinSnapshotId)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ public Path tagPath(String tagName) {
return new Path(branchPath(tablePath, branch) + "/tag/" + TAG_PREFIX + tagName);
}

public List<Path> tagPaths(Predicate<Path> predicate) throws IOException {
return listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
.map(FileStatus::getPath)
.filter(predicate)
.collect(Collectors.toList());
}

/** Create a tag from given snapshot and save it in the storage. */
public void createTag(
Snapshot snapshot,
Expand Down Expand Up @@ -307,10 +314,7 @@ public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
TreeMap<Snapshot, List<String>> tags =
new TreeMap<>(Comparator.comparingLong(Snapshot::id));
try {
List<Path> paths =
listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
.map(FileStatus::getPath)
.collect(Collectors.toList());
List<Path> paths = tagPaths(path -> true);

for (Path path : paths) {
String tagName = path.getName().substring(TAG_PREFIX.length());
Expand All @@ -335,10 +339,7 @@ public SortedMap<Snapshot, List<String>> tags(Predicate<String> filter) {
/** Get all {@link Tag}s. */
public List<Pair<Tag, String>> tagObjects() {
try {
List<Path> paths =
listVersionedFileStatus(fileIO, tagDirectory(), TAG_PREFIX)
.map(FileStatus::getPath)
.collect(Collectors.toList());
List<Path> paths = tagPaths(path -> true);
List<Pair<Tag, String>> tags = new ArrayList<>();
for (Path path : paths) {
String tagName = path.getName().substring(TAG_PREFIX.length());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ public void testUnsupportedBranchName() throws Exception {
.satisfies(
anyCauseMatches(
IllegalArgumentException.class,
"Branch name 'main' is the default branch and cannot be created."));
"Branch name 'main' is the default branch and cannot be used."));

assertThatThrownBy(() -> table.createBranch("branch-1", "tag1"))
.satisfies(
Expand Down

0 comments on commit f1da943

Please sign in to comment.