diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index 723a95bb21813..77c96b47f0576 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup; import static org.apache.hudi.common.util.MapUtils.nonEmpty; import static org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS; @@ -122,10 +123,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { Map> cleanOps = new HashMap<>(); List partitionsToDelete = new ArrayList<>(); + boolean shouldUseBatchLookup = shouldUseBatchLookup(table.getMetaClient().getTableConfig(), config); for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) { // Handles at most 'cleanerParallelism' number of partitions once at a time to avoid overlarge memory pressure to the timeline server // (remote or local embedded), thus to reduce the risk of an OOM exception. List subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size())); + if (shouldUseBatchLookup) { + LOG.info("Load partitions and files into file system view in advance. Paths: {}", subPartitionsToClean); + table.getHoodieView().loadPartitions(subPartitionsToClean); + } Map>> cleanOpsWithPartitionMeta = context .map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism) .stream() diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 21d12333d87fd..48ec8f9baa1e9 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -64,8 +64,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup; - /** * Cleaner is responsible for garbage collecting older files in a given partition path. Such that *

@@ -108,14 +106,9 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable hoodieT .map(entry -> Pair.of(new HoodieFileGroupId(entry.getValue().getPartitionPath(), entry.getValue().getFileId()), entry.getValue())) .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); - // load all partitions in advance if necessary. - if (shouldUseBatchLookup(hoodieTable.getMetaClient().getTableConfig(), config)) { - LOG.info("Load all partitions and files into file system view in advance."); - fileSystemView.loadAllPartitions(); - } - // collect savepointed timestamps to be assist with incremental cleaning. For non-partitioned and metadata table, we may not need this. - this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList()) - : Collections.EMPTY_LIST); + // collect savepointed timestamps to assist with incremental cleaning. For non-partitioned and metadata table, we may not need this. + this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.emptyList() : (hoodieTable.isPartitioned() ? new ArrayList<>(hoodieTable.getSavepointTimestamps()) + : Collections.emptyList()); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index 76750171fb552..cdac0eeeb200c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -801,11 +801,20 @@ public final Stream getLatestBaseFilesInRange(List commi } @Override - public Void loadAllPartitions() { + public void loadAllPartitions() { try { readLock.lock(); ensureAllPartitionsLoadedCorrectly(); - return null; + } finally { + readLock.unlock(); + } + } + + @Override + public void loadPartitions(List partitionPaths) { + try { + readLock.lock(); + ensurePartitionsLoadedCorrectly(partitionPaths); } finally { readLock.unlock(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index 56d7c7cc25cf2..1e4b1852d1b24 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -168,8 +168,29 @@ public Stream getLatestBaseFilesInRange(List commitsToRe } @Override - public Void loadAllPartitions() { - return execute(preferredView::loadAllPartitions, secondaryView::loadAllPartitions); + public void loadAllPartitions() { + execute( + () -> { + preferredView.loadAllPartitions(); + return null; + }, + () -> { + secondaryView.loadAllPartitions(); + return null; + }); + } + + @Override + public void loadPartitions(List partitionPaths) { + execute( + () -> { + preferredView.loadPartitions(partitionPaths); + return null; + }, + () -> { + secondaryView.loadPartitions(partitionPaths); + return null; + }); } @Override diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 4363a7daf271d..61c90c6eb020d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -127,8 +127,10 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, // POST Requests public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL, "refresh/"); public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadallpartitions/"); + public static final String LOAD_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadpartitions/"); public static final String PARTITION_PARAM = "partition"; + public static final String PARTITIONS_PARAM = "partitions"; public static final String BASEPATH_PARAM = "basepath"; public static final String INSTANT_PARAM = "instant"; public static final String MAX_INSTANT_PARAM = "maxinstant"; @@ -526,11 +528,21 @@ public boolean refresh() { } @Override - public Void loadAllPartitions() { + public void loadAllPartitions() { Map paramsMap = getParams(); try { executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); - return null; + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + + @Override + public void loadPartitions(List partitionPaths) { + try { + Map paramsMap = getParams(); + paramsMap.put(PARTITIONS_PARAM, OBJECT_MAPPER.writeValueAsString(partitionPaths)); + executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException(e); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 1bcd1de61bc5d..87b3db142e67b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -246,5 +246,11 @@ interface SliceView extends SliceViewWithLatestSlice { /** * Load all partition and file slices into view */ - Void loadAllPartitions(); + void loadAllPartitions(); + + /** + * Load all partition and file slices into view for the provided partition paths + * @param partitionPaths List of partition paths to load + */ + void loadPartitions(List partitionPaths); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java index 1ab824724aabd..165a565da222e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java @@ -633,6 +633,61 @@ void testFileSlicingWithMultipleDeltaWriters() throws Exception { assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order check"); } + @Test + void testLoadPartitions_unPartitioned() throws Exception { + String partitionPath = ""; + Paths.get(basePath, partitionPath).toFile().mkdirs(); + String fileId = UUID.randomUUID().toString(); + + String instantTime1 = "1"; + String fileName1 = + FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN); + + Paths.get(basePath, partitionPath, fileName1).toFile().createNewFile(); + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1); + + saveAsComplete(commitTimeline, instant1, Option.empty()); + refreshFsView(); + + // Assert that no base files are returned without the partitions being loaded + assertEquals(0, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count()); + // Assert that load does not fail for un-partitioned tables + fsView.loadPartitions(Collections.singletonList(partitionPath)); + // Assert that base files are returned after the empty-string partition is loaded + assertEquals(1, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count()); + } + + @Test + void testLoadPartitions_partitioned() throws Exception { + String partitionPath1 = "2016/05/01"; + String partitionPath2 = "2016/05/02"; + Paths.get(basePath, partitionPath1).toFile().mkdirs(); + Paths.get(basePath, partitionPath2).toFile().mkdirs(); + String fileId1 = UUID.randomUUID().toString(); + String fileId2 = UUID.randomUUID().toString(); + String instantTime1 = "1"; + String fileName1 = + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN); + String fileName2 = + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN); + + Paths.get(basePath, partitionPath1, fileName1).toFile().createNewFile(); + Paths.get(basePath, partitionPath2, fileName2).toFile().createNewFile(); + HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline(); + HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1); + + saveAsComplete(commitTimeline, instant1, Option.empty()); + refreshFsView(); + + // Assert that no base files are returned without the partitions being loaded + assertEquals(0, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count()); + // Only load a single partition path + fsView.loadPartitions(Collections.singletonList(partitionPath1)); + // Assert that base file is returned for partitionPath1 only + assertEquals(1, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count()); + } + /** * Returns all file-slices including uncommitted ones. * diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java index b297d320c7a6b..1e2b8e0c35e5a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestPriorityBasedFileSystemView.java @@ -53,6 +53,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -698,6 +701,27 @@ public void testGetLatestFileSlice() { }); } + @Test + public void testLoadPartitions() { + String partitionPath = "/table2"; + + fsView.loadPartitions(Collections.singletonList(partitionPath)); + verify(primary, times(1)).loadPartitions(Collections.singletonList(partitionPath)); + verify(secondary, never()).loadPartitions(any()); + + resetMocks(); + doThrow(new RuntimeException()).when(primary).loadPartitions(Collections.singletonList(partitionPath)); + fsView.loadPartitions(Collections.singletonList(partitionPath)); + verify(primary, times(1)).loadPartitions(Collections.singletonList(partitionPath)); + verify(secondary, times(1)).loadPartitions(Collections.singletonList(partitionPath)); + + resetMocks(); + doThrow(new RuntimeException()).when(secondary).loadPartitions(Collections.singletonList(partitionPath)); + assertThrows(RuntimeException.class, () -> { + fsView.loadPartitions(Collections.singletonList(partitionPath)); + }); + } + @Test public void testGetPreferredView() { assertEquals(primary, fsView.getPreferredView()); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index c72491341fe42..f17c5624084ea 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.timeline.service.handlers.BaseFileHandler; import org.apache.hudi.timeline.service.handlers.FileSliceHandler; import org.apache.hudi.timeline.service.handlers.InstantStateHandler; @@ -44,6 +45,7 @@ import org.apache.hudi.timeline.service.handlers.TimelineHandler; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import io.javalin.Javalin; @@ -72,6 +74,7 @@ public class RequestHandler { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class); + private static final TypeReference> LIST_TYPE_REFERENCE = new TypeReference>() {}; private final TimelineService.Config timelineServiceConfig; private final FileSystemViewManager viewManager; @@ -444,6 +447,19 @@ private void registerFileSlicesAPI() { writeValueAsString(ctx, success); }, false)); + app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new ViewHandler(ctx -> { + metricsRegistry.add("LOAD_PARTITIONS", 1); + String basePath = ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")); + try { + List partitionPaths = OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM, String.class) + .getOrThrow(e -> new HoodieException("Partitions param is invalid")), LIST_TYPE_REFERENCE); + boolean success = sliceHandler.loadPartitions(basePath, partitionPaths); + writeValueAsString(ctx, success); + } catch (IOException e) { + throw new HoodieIOException("Failed to parse request parameter", e); + } + }, false)); + app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new ViewHandler(ctx -> { metricsRegistry.add("LOAD_ALL_PARTITIONS", 1); boolean success = sliceHandler diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java index 4a4226724f8bc..391145c5cf8b5 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/FileSliceHandler.java @@ -163,4 +163,9 @@ public boolean loadAllPartitions(String basePath) { viewManager.getFileSystemView(basePath).loadAllPartitions(); return true; } + + public boolean loadPartitions(String basePath, List partitionPaths) { + viewManager.getFileSystemView(basePath).loadPartitions(partitionPaths); + return true; + } }