From 379d77644d8222811bb63979e59f842f54980ac9 Mon Sep 17 00:00:00 2001 From: Jianjian Date: Thu, 19 Sep 2024 11:46:45 -0700 Subject: [PATCH] update the codes according to the comments --- .../filesystem/alluxio/AlluxioFileSystem.java | 92 +++++++++---------- .../alluxio/AlluxioFileSystemFactory.java | 14 +-- .../alluxio/AlluxioFileSystemInput.java | 1 - .../alluxio/AlluxioFileSystemInputFile.java | 31 +++---- .../alluxio/AlluxioFileSystemOutputFile.java | 19 ++-- .../alluxio/AlluxioTrinoInputStream.java | 1 + .../alluxio/AlluxioTrinoOutputStream.java | 1 - .../filesystem/alluxio/AlluxioUtils.java | 6 +- .../AbstractTestAlluxioFileSystem.java | 3 +- 9 files changed, 73 insertions(+), 95 deletions(-) diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java index 4efe5e60e40c5e..d922b66aa4bc8d 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystem.java @@ -38,20 +38,20 @@ import java.util.stream.Collectors; import static io.trino.filesystem.alluxio.AlluxioUtils.convertToAlluxioURI; +import static java.util.Objects.requireNonNull; import static java.util.UUID.randomUUID; public class AlluxioFileSystem implements TrinoFileSystem { - private final FileSystem fileSystem; + private final FileSystem alluxioClient; + private final String mountRoot; private Location rootLocation; - private final String mountRoot; - - public AlluxioFileSystem(FileSystem fileSystem) + public AlluxioFileSystem(FileSystem alluxioClient) { - this.fileSystem = requireNonNull(fileSystem, "filesystem is null"); + this.alluxioClient = requireNonNull(alluxioClient, "filesystem is null"); mountRoot = "/"; // default alluxio mount root } @@ -65,17 +65,12 @@ public void setRootLocation(Location rootLocation) this.rootLocation = rootLocation; } - public Location getRootLocation() - { - return rootLocation; - } - @Override public TrinoInputFile newInputFile(Location location) { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemInputFile(location, null, fileSystem, mountRoot, Optional.empty()); + return new AlluxioFileSystemInputFile(location, null, alluxioClient, mountRoot, Optional.empty()); } @Override @@ -83,7 +78,7 @@ public TrinoInputFile newInputFile(Location location, long length) { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.empty()); + return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.empty()); } @Override @@ -91,7 +86,7 @@ public TrinoInputFile newInputFile(Location location, long length, Instant lastM { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemInputFile(location, length, fileSystem, mountRoot, Optional.of(lastModified)); + return new AlluxioFileSystemInputFile(location, length, alluxioClient, mountRoot, Optional.of(lastModified)); } @Override @@ -99,7 +94,7 @@ public TrinoOutputFile newOutputFile(Location location) { ensureNotRootLocation(location); ensureNotEndWithSlash(location); - return new AlluxioFileSystemOutputFile(rootLocation, location, fileSystem, mountRoot); + return new AlluxioFileSystemOutputFile(rootLocation, location, alluxioClient, mountRoot); } @Override @@ -109,9 +104,9 @@ public void deleteFile(Location location) ensureNotRootLocation(location); ensureNotEndWithSlash(location); try { - fileSystem.delete(convertToAlluxioURI(location, mountRoot)); + alluxioClient.delete(convertToAlluxioURI(location, mountRoot)); } - catch (FileDoesNotExistException e) { + catch (FileDoesNotExistException _) { } catch (AlluxioException e) { throw new IOException("Error deleteFile %s".formatted(location), e); @@ -124,7 +119,7 @@ public void deleteDirectory(Location location) { try { AlluxioURI uri = convertToAlluxioURI(location, mountRoot); - URIStatus status = fileSystem.getStatus(uri); + URIStatus status = alluxioClient.getStatus(uri); if (status == null) { return; } @@ -134,12 +129,12 @@ public void deleteDirectory(Location location) DeletePOptions deletePOptions = DeletePOptions.newBuilder().setRecursive(true).build(); // recursive delete on the root directory must be handled manually if (location.path().isEmpty() || location.path().equals(mountRoot)) { - for (URIStatus uriStatus : fileSystem.listStatus(uri)) { - fileSystem.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions); + for (URIStatus uriStatus : alluxioClient.listStatus(uri)) { + alluxioClient.delete(new AlluxioURI(uriStatus.getPath()), deletePOptions); } } else { - fileSystem.delete(uri, deletePOptions); + alluxioClient.delete(uri, deletePOptions); } } catch (FileDoesNotExistException | NotFoundRuntimeException e) { @@ -160,27 +155,27 @@ public void renameFile(Location source, Location target) ensureNotEndWithSlash(target); } catch (IllegalStateException e) { - throw new IllegalStateException("Cannot rename file from %s to %s as one of them is root location" - .formatted(source, target), e); + throw new IllegalStateException( + "Cannot rename file from %s to %s as one of them is root location".formatted(source, target), e); } AlluxioURI sourceUri = convertToAlluxioURI(source, mountRoot); AlluxioURI targetUri = convertToAlluxioURI(target, mountRoot); try { - if (!fileSystem.exists(sourceUri)) { - throw new IOException("Cannot rename file %s to %s as file %s doesn't exist" - .formatted(source, target, source)); + if (!alluxioClient.exists(sourceUri)) { + throw new IOException( + "Cannot rename file %s to %s as file %s doesn't exist".formatted(source, target, source)); } - if (fileSystem.exists(targetUri)) { - throw new IOException("Cannot rename file %s to %s as file %s already exists" - .formatted(source, target, target)); + if (alluxioClient.exists(targetUri)) { + throw new IOException( + "Cannot rename file %s to %s as file %s already exists".formatted(source, target, target)); } - URIStatus status = fileSystem.getStatus(sourceUri); + URIStatus status = alluxioClient.getStatus(sourceUri); if (status.isFolder()) { - throw new IOException("Cannot rename file %s to %s as %s is a directory" - .formatted(source, target, source)); + throw new IOException( + "Cannot rename file %s to %s as %s is a directory".formatted(source, target, source)); } - fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); } catch (AlluxioException e) { throw new IOException("Error renameFile from %s to %s".formatted(source, target), e); @@ -192,7 +187,7 @@ public FileIterator listFiles(Location location) throws IOException { try { - URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); if (status == null) { new AlluxioFileIterator(Collections.emptyList(), mountRoot); } @@ -205,7 +200,7 @@ public FileIterator listFiles(Location location) } try { - List filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot), + List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot), ListStatusPOptions.newBuilder().setRecursive(true).build()); return new AlluxioFileIterator(filesStatus.stream().filter(status -> !status.isFolder() & status.isCompleted()).toList(), mountRoot); } @@ -222,7 +217,7 @@ public Optional directoryExists(Location location) return Optional.of(true); } try { - URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); if (status != null && status.isFolder()) { return Optional.of(true); } @@ -242,14 +237,14 @@ public void createDirectory(Location location) { try { AlluxioURI locationUri = convertToAlluxioURI(location, mountRoot); - if (fileSystem.exists(locationUri)) { - URIStatus status = fileSystem.getStatus(locationUri); + if (alluxioClient.exists(locationUri)) { + URIStatus status = alluxioClient.getStatus(locationUri); if (!status.isFolder()) { - throw new IOException("Cannot create a directory for an existing file location %s" - .formatted(location)); + throw new IOException( + "Cannot create a directory for an existing file location %s".formatted(location)); } } - fileSystem.createDirectory( + alluxioClient.createDirectory( locationUri, CreateDirectoryPOptions.newBuilder() .setAllowExists(true) @@ -270,16 +265,15 @@ public void renameDirectory(Location source, Location target) ensureNotRootLocation(target); } catch (IllegalStateException e) { - throw new IOException("Cannot rename directory from %s to %s as one of them is root location" - .formatted(source, target), e); + throw new IOException( + "Cannot rename directory from %s to %s as one of them is root location".formatted(source, target), e); } - try { - if (fileSystem.exists(convertToAlluxioURI(target, mountRoot))) { - throw new IOException("Cannot rename %s to %s as file %s already exists" - .formatted(source, target, target)); + if (alluxioClient.exists(convertToAlluxioURI(target, mountRoot))) { + throw new IOException( + "Cannot rename %s to %s as file %s already exists".formatted(source, target, target)); } - fileSystem.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); + alluxioClient.rename(convertToAlluxioURI(source, mountRoot), convertToAlluxioURI(target, mountRoot)); } catch (AlluxioException e) { throw new IOException("Error renameDirectory from %s to %s".formatted(source, target), e); @@ -294,7 +288,7 @@ public Set listDirectories(Location location) if (isFile(location)) { throw new IOException("Cannot list directories for a file %s".formatted(location)); } - List filesStatus = fileSystem.listStatus(convertToAlluxioURI(location, mountRoot)); + List filesStatus = alluxioClient.listStatus(convertToAlluxioURI(location, mountRoot)); return filesStatus.stream() .filter(URIStatus::isFolder) .map((URIStatus fileStatus) -> AlluxioUtils.convertToLocation(fileStatus, mountRoot)) @@ -367,7 +361,7 @@ private void ensureNotEndWithSlash(Location location) private boolean isFile(Location location) { try { - URIStatus status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); + URIStatus status = alluxioClient.getStatus(convertToAlluxioURI(location, mountRoot)); if (status == null) { return false; } diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java index e14b04fd1d6cae..cc7b6303e061c2 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemFactory.java @@ -25,28 +25,24 @@ public class AlluxioFileSystemFactory implements TrinoFileSystemFactory { - private AlluxioConfiguration conf = Configuration.global(); + private final AlluxioConfiguration conf; @Inject public AlluxioFileSystemFactory() { + this(Configuration.global()); } - public void setConf(AlluxioConfiguration conf) + public AlluxioFileSystemFactory(AlluxioConfiguration conf) { this.conf = conf; } - public AlluxioConfiguration getConf() - { - return conf; - } - @Override public TrinoFileSystem create(ConnectorIdentity identity) { FileSystemContext fsContext = FileSystemContext.create(conf); - FileSystem fileSystem = FileSystem.Factory.create(fsContext); - return new AlluxioFileSystem(fileSystem); + FileSystem alluxioClient = FileSystem.Factory.create(fsContext); + return new AlluxioFileSystem(alluxioClient); } } diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java index 1aec9275e366f1..44843c5ad58750 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInput.java @@ -27,7 +27,6 @@ public class AlluxioFileSystemInput implements TrinoInput { private final FileInStream stream; - private final TrinoInputFile inputFile; private volatile boolean closed; diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java index 2faea40d4fa35a..3c8c1ab32c4412 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemInputFile.java @@ -37,14 +37,11 @@ public class AlluxioFileSystemInputFile implements TrinoInputFile { private final Location location; - private final FileSystem fileSystem; - private final String mountRoot; - private Optional lastModified; + private Optional lastModified; private Long length; - private URIStatus status; public AlluxioFileSystemInputFile(Location location, Long length, FileSystem fileSystem, String mountRoot, Optional lastModified) @@ -73,7 +70,7 @@ public TrinoInputStream newStream() throws IOException { try { - return new AlluxioTrinoInputStream(location, openFile(), getStatus()); + return new AlluxioTrinoInputStream(location, openFile(), getURIStatus()); } catch (AlluxioException e) { throw new IOException("Error newStream() file: %s".formatted(location), e); @@ -86,22 +83,20 @@ private FileInStream openFile() if (!exists()) { throw new FileNotFoundException("File does not exist: " + location); } - return fileSystem.openFile(getStatus(), OpenFilePOptions.getDefaultInstance()); + return fileSystem.openFile(getURIStatus(), OpenFilePOptions.getDefaultInstance()); } - private URIStatus getStatus(boolean lazy) + private void loadFileStatus() throws IOException { - if (lazy) { - if (status == null) { - getStatus(); - } - return status; + if (status == null) { + URIStatus fileStatus = getURIStatus(); + length = fileStatus.getLength(); + lastModified = Optional.of(Instant.ofEpochMilli(fileStatus.getLastModificationTimeMs())); } - return getStatus(); } - private URIStatus getStatus() + private URIStatus getURIStatus() throws IOException { try { @@ -109,7 +104,7 @@ private URIStatus getStatus() status = fileSystem.getStatus(convertToAlluxioURI(location, mountRoot)); } catch (FileDoesNotExistException | NotFoundRuntimeException e) { - return null; + throw new FileNotFoundException("File does not exist: %s".formatted(location)); } catch (AlluxioException | IOException e) { throw new IOException("Get status for file %s failed: %s".formatted(location, e.getMessage()), e); @@ -122,7 +117,7 @@ public long length() throws IOException { if (length == null) { - URIStatus status = getStatus(true); + loadFileStatus(); if (status == null) { throw new FileNotFoundException("File does not exist: %s".formatted(location)); } @@ -136,7 +131,7 @@ public Instant lastModified() throws IOException { if (lastModified.isEmpty()) { - URIStatus status = getStatus(true); + loadFileStatus(); if (status == null) { throw new FileNotFoundException("File does not exist: %s".formatted(location)); } @@ -149,7 +144,7 @@ public Instant lastModified() public boolean exists() throws IOException { - URIStatus status = getStatus(); + URIStatus status = getURIStatus(); if (status == null || !status.isCompleted()) { return false; } diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java index 19d880fb4c8b60..7b456cef34d926 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioFileSystemOutputFile.java @@ -32,16 +32,13 @@ public class AlluxioFileSystemOutputFile implements TrinoOutputFile { private final Location rootLocation; - private final Location location; - private final FileSystem fileSystem; - private final String mountRoot; public AlluxioFileSystemOutputFile(Location rootLocation, Location location, FileSystem fileSystem, String mountRoot) { - this.rootLocation = rootLocation; + this.rootLocation = requireNonNull(rootLocation, "root location is null"); this.location = requireNonNull(location, "location is null"); this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); this.mountRoot = requireNonNull(mountRoot, "mountRoot is null"); @@ -52,8 +49,9 @@ public void createExclusive(byte[] data) throws IOException { throwIfAlreadyExists(); - try (FileOutStream outStream = - fileSystem.createFile(convertToAlluxioURI(location, mountRoot), CreateFilePOptions.newBuilder().setRecursive(true).setOverwrite(true).build())) { + try (FileOutStream outStream = fileSystem.createFile( + convertToAlluxioURI(location, mountRoot), + CreateFilePOptions.newBuilder().setRecursive(true).setOverwrite(true).build())) { outStream.write(data); } catch (AlluxioException e) { @@ -66,12 +64,9 @@ public void createOrOverwrite(byte[] data) throws IOException { ensureOutputFileNotOutsideOfRoot(location); - try (FileOutStream outStream = - fileSystem.createFile(convertToAlluxioURI(location, mountRoot), - CreateFilePOptions.newBuilder() - .setOverwrite(true) - .setRecursive(true) - .build())) { + try (FileOutStream outStream = fileSystem.createFile( + convertToAlluxioURI(location, mountRoot), + CreateFilePOptions.newBuilder().setOverwrite(true).setRecursive(true).build())) { outStream.write(data); } catch (AlluxioException e) { diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java index 8c7274df6b1460..e23d61c5cf6134 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoInputStream.java @@ -29,6 +29,7 @@ public final class AlluxioTrinoInputStream private final Location location; private final FileInStream stream; private final URIStatus fileStatus; + private boolean closed; public AlluxioTrinoInputStream(Location location, FileInStream stream, URIStatus fileStatus) diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java index 3cd1e2ad0b1bdf..516268d6eee1cb 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioTrinoOutputStream.java @@ -22,7 +22,6 @@ public final class AlluxioTrinoOutputStream extends OutputStream { private final Location location; - private final OutputStream delegate; private volatile boolean closed; diff --git a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java index 74bfcf1a8ebc5f..f7b50e5778a694 100644 --- a/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java +++ b/lib/trino-filesystem-alluxio/src/main/java/io/trino/filesystem/alluxio/AlluxioUtils.java @@ -104,9 +104,9 @@ public static AlluxioURI convertToAlluxioURIBak(Location location) public static AlluxioURI convertToAlluxioURI(Location location, String mountRoot) { - Optional schema = location.scheme(); - if (schema.isPresent()) { - if (!schema.get().equals("alluxio")) { + Optional scheme = location.scheme(); + if (scheme.isPresent()) { + if (!scheme.get().equals("alluxio")) { return new AlluxioURI(location.toString()); } } diff --git a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java index 935b9f01b20024..888f13277f1214 100644 --- a/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java +++ b/lib/trino-filesystem-alluxio/src/test/java/io/trino/filesystem/alluxio/AbstractTestAlluxioFileSystem.java @@ -49,8 +49,7 @@ protected void initialize() InstancedConfiguration conf = Configuration.copyGlobal(); FileSystemContext fsContext = FileSystemContext.create(conf); this.alluxioFs = FileSystem.Factory.create(fsContext); - this.alluxioFileSystemFactory = new AlluxioFileSystemFactory(); - this.alluxioFileSystemFactory.setConf(conf); + this.alluxioFileSystemFactory = new AlluxioFileSystemFactory(conf); this.fileSystem = alluxioFileSystemFactory.create(ConnectorIdentity.ofUser("alluxio")); ((AlluxioFileSystem) fileSystem).setRootLocation(rootLocation); }