Skip to content

Commit

Permalink
[HUDI-8092] Replace FileSystem and related classes to dehadoop hudi-c…
Browse files Browse the repository at this point in the history
…lient-common (apache#11805)

Co-authored-by: Shawn Chang <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
  • Loading branch information
3 people authored Sep 9, 2024
1 parent cedc58c commit 9bb8dcd
Show file tree
Hide file tree
Showing 40 changed files with 394 additions and 445 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.storage.HoodieStorage;

import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;

Expand All @@ -41,8 +40,8 @@
public class DirectMarkerTransactionManager extends TransactionManager {
private final String filePath;

public DirectMarkerTransactionManager(HoodieWriteConfig config, FileSystem fs, String partitionPath, String fileId) {
super(new LockManager(config, fs, createUpdatedLockProps(config, partitionPath, fileId)), config.isLockRequired());
public DirectMarkerTransactionManager(HoodieWriteConfig config, HoodieStorage storage, String partitionPath, String fileId) {
super(new LockManager(config, storage, createUpdatedLockProps(config, partitionPath, fileId)), config.isLockRequired());
this.filePath = partitionPath + "/" + fileId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.storage.HoodieStorage;

import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,7 +42,7 @@ public class TransactionManager implements Serializable {
private Option<HoodieInstant> lastCompletedTxnOwnerInstant = Option.empty();

public TransactionManager(HoodieWriteConfig config, HoodieStorage storage) {
this(new LockManager(config, (FileSystem) storage.getFileSystem()), config.isLockRequired());
this(new LockManager(config, storage), config.isLockRequired());
}

protected TransactionManager(LockManager lockManager, boolean isLockRequired) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,18 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StorageSchemes;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
Expand All @@ -65,8 +64,8 @@ public class FileSystemBasedLockProvider implements LockProvider<String>, Serial
private static final Logger LOG = LoggerFactory.getLogger(FileSystemBasedLockProvider.class);
private static final String LOCK_FILE_NAME = "lock";
private final int lockTimeoutMinutes;
private final transient FileSystem fs;
private final transient Path lockFile;
private final transient HoodieStorage storage;
private final transient StoragePath lockFile;
protected LockConfiguration lockConfiguration;
private SimpleDateFormat sdf;
private LockInfo lockInfo;
Expand All @@ -81,21 +80,21 @@ public FileSystemBasedLockProvider(final LockConfiguration lockConfiguration, fi
+ StoragePath.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME;
}
this.lockTimeoutMinutes = lockConfiguration.getConfig().getInteger(FILESYSTEM_LOCK_EXPIRE_PROP_KEY);
this.lockFile = new Path(lockDirectory + StoragePath.SEPARATOR + LOCK_FILE_NAME);
this.lockFile = new StoragePath(lockDirectory + StoragePath.SEPARATOR + LOCK_FILE_NAME);
this.lockInfo = new LockInfo();
this.sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
this.fs = HadoopFSUtils.getFs(this.lockFile.toString(), configuration);
this.storage = HoodieStorageUtils.getStorage(this.lockFile.toString(), configuration);
List<String> customSupportedFSs = lockConfiguration.getConfig().getStringList(HoodieCommonConfig.HOODIE_FS_ATOMIC_CREATION_SUPPORT.key(), ",", new ArrayList<>());
if (!customSupportedFSs.contains(this.fs.getScheme()) && !StorageSchemes.isAtomicCreationSupported(this.fs.getScheme())) {
throw new HoodieLockException("Unsupported scheme :" + this.fs.getScheme() + ", since this fs can not support atomic creation");
if (!customSupportedFSs.contains(this.storage.getScheme()) && !StorageSchemes.isAtomicCreationSupported(this.storage.getScheme())) {
throw new HoodieLockException("Unsupported scheme :" + this.storage.getScheme() + ", since this fs can not support atomic creation");
}
}

@Override
public void close() {
synchronized (LOCK_FILE_NAME) {
try {
fs.delete(this.lockFile, true);
storage.deleteFile(this.lockFile);
} catch (IOException e) {
throw new HoodieLockException(generateLogStatement(LockState.FAILED_TO_RELEASE), e);
}
Expand All @@ -107,17 +106,17 @@ public boolean tryLock(long time, TimeUnit unit) {
try {
synchronized (LOCK_FILE_NAME) {
// Check whether lock is already expired, if so try to delete lock file
if (fs.exists(this.lockFile)) {
if (storage.exists(this.lockFile)) {
if (checkIfExpired()) {
fs.delete(this.lockFile, true);
storage.deleteFile(this.lockFile);
LOG.warn("Delete expired lock file: " + this.lockFile);
} else {
reloadCurrentOwnerLockInfo();
return false;
}
}
acquireLock();
return fs.exists(this.lockFile);
return storage.exists(this.lockFile);
}
} catch (IOException | HoodieIOException e) {
LOG.info(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
Expand All @@ -129,8 +128,8 @@ public boolean tryLock(long time, TimeUnit unit) {
public void unlock() {
synchronized (LOCK_FILE_NAME) {
try {
if (fs.exists(this.lockFile)) {
fs.delete(this.lockFile, true);
if (storage.exists(this.lockFile)) {
storage.deleteFile(this.lockFile);
}
} catch (IOException io) {
throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_RELEASE), io);
Expand All @@ -153,7 +152,7 @@ private boolean checkIfExpired() {
return false;
}
try {
long modificationTime = fs.getFileStatus(this.lockFile).getModificationTime();
long modificationTime = storage.getPathInfo(this.lockFile).getModificationTime();
if (System.currentTimeMillis() - modificationTime > lockTimeoutMinutes * 60 * 1000L) {
return true;
}
Expand All @@ -164,10 +163,10 @@ private boolean checkIfExpired() {
}

private void acquireLock() {
try (FSDataOutputStream fos = fs.create(this.lockFile, false)) {
if (!fs.exists(this.lockFile)) {
try (OutputStream os = storage.create(this.lockFile, false)) {
if (!storage.exists(this.lockFile)) {
initLockInfo();
fos.writeBytes(lockInfo.toString());
os.write(StringUtils.getUTF8Bytes(lockInfo.toString()));
}
} catch (IOException e) {
throw new HoodieIOException(generateLogStatement(LockState.FAILED_TO_ACQUIRE), e);
Expand All @@ -181,8 +180,8 @@ public void initLockInfo() {
}

public void reloadCurrentOwnerLockInfo() {
try (InputStream is = fs.open(this.lockFile)) {
if (fs.exists(this.lockFile)) {
try (InputStream is = storage.open(this.lockFile)) {
if (storage.exists(this.lockFile)) {
this.currentOwnerLockInfo = FileIOUtils.readAsUTFString(is);
} else {
this.currentOwnerLockInfo = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;

import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,19 +56,19 @@ public class LockManager implements Serializable, AutoCloseable {
private transient HoodieLockMetrics metrics;
private volatile LockProvider lockProvider;

public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
this(writeConfig, fs, writeConfig.getProps());
public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage) {
this(writeConfig, storage, writeConfig.getProps());
}

public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, TypedProperties lockProps) {
public LockManager(HoodieWriteConfig writeConfig, HoodieStorage storage, TypedProperties lockProps) {
this.writeConfig = writeConfig;
this.storageConf = HadoopFSUtils.getStorageConfWithCopy(fs.getConf());
this.storageConf = storage.getConf().newInstance();
this.lockConfiguration = new LockConfiguration(lockProps);
maxRetries = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
maxWaitTimeInMs = lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
metrics = new HoodieLockMetrics(writeConfig, new HoodieHadoopStorage(fs));
metrics = new HoodieLockMetrics(writeConfig, storage);
lockRetryHelper = new RetryHelper<>(maxWaitTimeInMs, maxRetries, maxWaitTimeInMs,
Arrays.asList(HoodieLockException.class, InterruptedException.class), "acquire lock");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,11 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,7 +42,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
Expand All @@ -58,7 +54,6 @@
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.HASHING_METADATA_FILE_SUFFIX;
import static org.apache.hudi.common.model.HoodieConsistentHashingMetadata.getTimestampFromFile;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;

/**
* Utilities class for consistent bucket index metadata management.
Expand Down Expand Up @@ -110,30 +105,31 @@ public static HoodieConsistentHashingMetadata loadOrCreateMetadata(HoodieTable t
*/
public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable table, String partition) {
HoodieTableMetaClient metaClient = table.getMetaClient();
Path metadataPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getHashingMetadataPath(), partition);
Path partitionPath = HadoopFSUtils.constructAbsolutePathInHadoopPath(metaClient.getBasePath().toString(), partition);
StoragePath metadataPath = FSUtils.constructAbsolutePath(metaClient.getHashingMetadataPath(), partition);
StoragePath partitionPath = FSUtils.constructAbsolutePath(metaClient.getBasePath(), partition);
try {
Predicate<FileStatus> hashingMetaCommitFilePredicate = fileStatus -> {
String filename = fileStatus.getPath().getName();
Predicate<StoragePathInfo> hashingMetaCommitFilePredicate = pathInfo -> {
String filename = pathInfo.getPath().getName();
return filename.contains(HoodieConsistentHashingMetadata.HASHING_METADATA_COMMIT_FILE_SUFFIX);
};
Predicate<FileStatus> hashingMetadataFilePredicate = fileStatus -> {
String filename = fileStatus.getPath().getName();
Predicate<StoragePathInfo> hashingMetadataFilePredicate = pathInfo -> {
String filename = pathInfo.getPath().getName();
return filename.contains(HASHING_METADATA_FILE_SUFFIX);
};
final FileStatus[] metaFiles =
((FileSystem) metaClient.getStorage().getFileSystem()).listStatus(metadataPath);
final TreeSet<String> commitMetaTss = Arrays.stream(metaFiles).filter(hashingMetaCommitFilePredicate)
final List<StoragePathInfo> metaFiles = metaClient.getStorage().listDirectEntries(metadataPath);
final TreeSet<String> commitMetaTss = metaFiles.stream().filter(hashingMetaCommitFilePredicate)
.map(commitFile -> HoodieConsistentHashingMetadata.getTimestampFromFile(commitFile.getPath().getName()))
.sorted()
.collect(Collectors.toCollection(TreeSet::new));
final FileStatus[] hashingMetaFiles = Arrays.stream(metaFiles).filter(hashingMetadataFilePredicate)
final List<StoragePathInfo> hashingMetaFiles = metaFiles.stream().filter(hashingMetadataFilePredicate)
.sorted(Comparator.comparing(f -> f.getPath().getName()))
.toArray(FileStatus[]::new);
.collect(Collectors.toList());
// max committed metadata file
final String maxCommitMetaFileTs = commitMetaTss.isEmpty() ? null : commitMetaTss.last();
// max updated metadata file
FileStatus maxMetadataFile = hashingMetaFiles.length > 0 ? hashingMetaFiles[hashingMetaFiles.length - 1] : null;
StoragePathInfo maxMetadataFile = hashingMetaFiles.isEmpty()
? null
: hashingMetaFiles.get(hashingMetaFiles.size() - 1);
// If single file present in metadata and if its default file return it
if (maxMetadataFile != null && HoodieConsistentHashingMetadata.getTimestampFromFile(maxMetadataFile.getPath().getName()).equals(HoodieTimeline.INIT_INSTANT_TS)) {
return loadMetadataFromGivenFile(table, maxMetadataFile);
Expand All @@ -146,9 +142,9 @@ public static Option<HoodieConsistentHashingMetadata> loadMetadata(HoodieTable t
HoodieTimeline completedCommits = metaClient.getActiveTimeline().getCommitAndReplaceTimeline().filterCompletedInstants();

// fix the in-consistency between un-committed and committed hashing metadata files.
List<FileStatus> fixed = new ArrayList<>();
Arrays.stream(hashingMetaFiles).forEach(hashingMetaFile -> {
Path path = hashingMetaFile.getPath();
List<StoragePathInfo> fixed = new ArrayList<>();
hashingMetaFiles.forEach(hashingMetaFile -> {
StoragePath path = hashingMetaFile.getPath();
String timestamp = HoodieConsistentHashingMetadata.getTimestampFromFile(path.getName());
if (maxCommitMetaFileTs != null && timestamp.compareTo(maxCommitMetaFileTs) <= 0) {
// only fix the metadata with greater timestamp than max committed timestamp
Expand Down Expand Up @@ -206,14 +202,14 @@ public static boolean saveMetadata(HoodieTable table, HoodieConsistentHashingMet
* Creates commit marker corresponding to hashing metadata file after post commit clustering operation.
*
* @param table Hoodie table
* @param fileStatus File for which commit marker should be created
* @param path File for which commit marker should be created
* @param partitionPath Partition path the file belongs to
* @throws IOException
*/
private static void createCommitMarker(HoodieTable table, Path fileStatus, Path partitionPath) throws IOException {
private static void createCommitMarker(HoodieTable table, StoragePath path, StoragePath partitionPath) throws IOException {
HoodieStorage storage = table.getStorage();
StoragePath fullPath = new StoragePath(convertToStoragePath(partitionPath),
getTimestampFromFile(fileStatus.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX);
StoragePath fullPath = new StoragePath(partitionPath,
getTimestampFromFile(path.getName()) + HASHING_METADATA_COMMIT_FILE_SUFFIX);
if (storage.exists(fullPath)) {
return;
}
Expand All @@ -236,11 +232,11 @@ private static void createCommitMarker(HoodieTable table, Path fileStatus, Path
* @param metaFile Hashing metadata file
* @return HoodieConsistentHashingMetadata object
*/
private static Option<HoodieConsistentHashingMetadata> loadMetadataFromGivenFile(HoodieTable table, FileStatus metaFile) {
private static Option<HoodieConsistentHashingMetadata> loadMetadataFromGivenFile(HoodieTable table, StoragePathInfo metaFile) {
if (metaFile == null) {
return Option.empty();
}
try (InputStream is = table.getStorage().open(convertToStoragePath(metaFile.getPath()))) {
try (InputStream is = table.getStorage().open(metaFile.getPath())) {
byte[] content = FileIOUtils.readAsByteArray(is);
return Option.of(HoodieConsistentHashingMetadata.fromBytes(content));
} catch (FileNotFoundException e) {
Expand All @@ -267,8 +263,8 @@ private static Option<HoodieConsistentHashingMetadata> loadMetadataFromGivenFile
* @param partition Partition metadata file belongs to
* @return true if hashing metadata file is latest else false
*/
private static boolean recommitMetadataFile(HoodieTable table, FileStatus metaFile, String partition) {
Path partitionPath = new Path(FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition).toUri());
private static boolean recommitMetadataFile(HoodieTable table, StoragePathInfo metaFile, String partition) {
StoragePath partitionPath = FSUtils.constructAbsolutePath(table.getMetaClient().getBasePath(), partition);
String timestamp = getTimestampFromFile(metaFile.getPath().getName());
if (table.getPendingCommitsTimeline().containsInstant(timestamp)) {
return false;
Expand All @@ -279,7 +275,10 @@ private static boolean recommitMetadataFile(HoodieTable table, FileStatus metaFi
}
HoodieConsistentHashingMetadata hoodieConsistentHashingMetadata = hoodieConsistentHashingMetadataOption.get();

Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile -> hoodieConsistentHashingMetadata.getNodes().stream().anyMatch(node -> node.getFileIdPrefix().equals(hoodieBaseFile));
Predicate<String> hoodieFileGroupIdPredicate = hoodieBaseFile ->
hoodieConsistentHashingMetadata.getNodes()
.stream()
.anyMatch(node -> node.getFileIdPrefix().equals(hoodieBaseFile));
if (table.getBaseFileOnlyView().getLatestBaseFiles(partition)
.map(fileIdPrefix -> FSUtils.getFileIdPfxFromFileId(fileIdPrefix.getFileId())).anyMatch(hoodieFileGroupIdPredicate)) {
try {
Expand Down
Loading

0 comments on commit 9bb8dcd

Please sign in to comment.