Skip to content

Commit

Permalink
[HUDI-7551] Avoid loading all partitions in CleanPlanner when MDT is …
Browse files Browse the repository at this point in the history
…enabled (apache#10928)
  • Loading branch information
the-other-tim-brown authored Mar 28, 2024
1 parent 136d075 commit 28f67ff
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;
import static org.apache.hudi.common.util.MapUtils.nonEmpty;
import static org.apache.hudi.table.action.clean.CleanPlanner.SAVEPOINTED_TIMESTAMPS;

Expand Down Expand Up @@ -122,10 +123,15 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {

Map<String, List<HoodieCleanFileInfo>> cleanOps = new HashMap<>();
List<String> partitionsToDelete = new ArrayList<>();
boolean shouldUseBatchLookup = shouldUseBatchLookup(table.getMetaClient().getTableConfig(), config);
for (int i = 0; i < partitionsToClean.size(); i += cleanerParallelism) {
// Handles at most 'cleanerParallelism' number of partitions once at a time to avoid overlarge memory pressure to the timeline server
// (remote or local embedded), thus to reduce the risk of an OOM exception.
List<String> subPartitionsToClean = partitionsToClean.subList(i, Math.min(i + cleanerParallelism, partitionsToClean.size()));
if (shouldUseBatchLookup) {
LOG.info("Load partitions and files into file system view in advance. Paths: {}", subPartitionsToClean);
table.getHoodieView().loadPartitions(subPartitionsToClean);
}
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(subPartitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean, earliestInstant)), cleanerParallelism)
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.client.utils.MetadataTableUtils.shouldUseBatchLookup;

/**
* Cleaner is responsible for garbage collecting older files in a given partition path. Such that
* <p>
Expand Down Expand Up @@ -108,14 +106,9 @@ public CleanPlanner(HoodieEngineContext context, HoodieTable<T, I, K, O> hoodieT
.map(entry -> Pair.of(new HoodieFileGroupId(entry.getValue().getPartitionPath(), entry.getValue().getFileId()), entry.getValue()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));

// load all partitions in advance if necessary.
if (shouldUseBatchLookup(hoodieTable.getMetaClient().getTableConfig(), config)) {
LOG.info("Load all partitions and files into file system view in advance.");
fileSystemView.loadAllPartitions();
}
// collect savepointed timestamps to be assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.EMPTY_LIST : (hoodieTable.isPartitioned() ? hoodieTable.getSavepointTimestamps().stream().collect(Collectors.toList())
: Collections.EMPTY_LIST);
// collect savepointed timestamps to assist with incremental cleaning. For non-partitioned and metadata table, we may not need this.
this.savepointedTimestamps = hoodieTable.isMetadataTable() ? Collections.emptyList() : (hoodieTable.isPartitioned() ? new ArrayList<>(hoodieTable.getSavepointTimestamps())
: Collections.emptyList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,11 +801,20 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commi
}

@Override
public Void loadAllPartitions() {
public void loadAllPartitions() {
try {
readLock.lock();
ensureAllPartitionsLoadedCorrectly();
return null;
} finally {
readLock.unlock();
}
}

@Override
public void loadPartitions(List<String> partitionPaths) {
try {
readLock.lock();
ensurePartitionsLoadedCorrectly(partitionPaths);
} finally {
readLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,29 @@ public Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commitsToRe
}

@Override
public Void loadAllPartitions() {
return execute(preferredView::loadAllPartitions, secondaryView::loadAllPartitions);
public void loadAllPartitions() {
execute(
() -> {
preferredView.loadAllPartitions();
return null;
},
() -> {
secondaryView.loadAllPartitions();
return null;
});
}

@Override
public void loadPartitions(List<String> partitionPaths) {
execute(
() -> {
preferredView.loadPartitions(partitionPaths);
return null;
},
() -> {
secondaryView.loadPartitions(partitionPaths);
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,10 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
// POST Requests
public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL, "refresh/");
public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadallpartitions/");
public static final String LOAD_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadpartitions/");

public static final String PARTITION_PARAM = "partition";
public static final String PARTITIONS_PARAM = "partitions";
public static final String BASEPATH_PARAM = "basepath";
public static final String INSTANT_PARAM = "instant";
public static final String MAX_INSTANT_PARAM = "maxinstant";
Expand Down Expand Up @@ -526,11 +528,21 @@ public boolean refresh() {
}

@Override
public Void loadAllPartitions() {
public void loadAllPartitions() {
Map<String, String> paramsMap = getParams();
try {
executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
return null;
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}

@Override
public void loadPartitions(List<String> partitionPaths) {
try {
Map<String, String> paramsMap = getParams();
paramsMap.put(PARTITIONS_PARAM, OBJECT_MAPPER.writeValueAsString(partitionPaths));
executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,5 +246,11 @@ interface SliceView extends SliceViewWithLatestSlice {
/**
* Load all partition and file slices into view
*/
Void loadAllPartitions();
void loadAllPartitions();

/**
* Load all partition and file slices into view for the provided partition paths
* @param partitionPaths List of partition paths to load
*/
void loadPartitions(List<String> partitionPaths);
}
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,61 @@ void testFileSlicingWithMultipleDeltaWriters() throws Exception {
assertEquals(deltaFile3, logFiles.get(0).getFileName(), "Log File Order check");
}

@Test
void testLoadPartitions_unPartitioned() throws Exception {
String partitionPath = "";
Paths.get(basePath, partitionPath).toFile().mkdirs();
String fileId = UUID.randomUUID().toString();

String instantTime1 = "1";
String fileName1 =
FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN);

Paths.get(basePath, partitionPath, fileName1).toFile().createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);

saveAsComplete(commitTimeline, instant1, Option.empty());
refreshFsView();

// Assert that no base files are returned without the partitions being loaded
assertEquals(0, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
// Assert that load does not fail for un-partitioned tables
fsView.loadPartitions(Collections.singletonList(partitionPath));
// Assert that base files are returned after the empty-string partition is loaded
assertEquals(1, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
}

@Test
void testLoadPartitions_partitioned() throws Exception {
String partitionPath1 = "2016/05/01";
String partitionPath2 = "2016/05/02";
Paths.get(basePath, partitionPath1).toFile().mkdirs();
Paths.get(basePath, partitionPath2).toFile().mkdirs();
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
String instantTime1 = "1";
String fileName1 =
FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN);
String fileName2 =
FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0, TEST_WRITE_TOKEN);

Paths.get(basePath, partitionPath1, fileName1).toFile().createNewFile();
Paths.get(basePath, partitionPath2, fileName2).toFile().createNewFile();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, instantTime1);

saveAsComplete(commitTimeline, instant1, Option.empty());
refreshFsView();

// Assert that no base files are returned without the partitions being loaded
assertEquals(0, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
// Only load a single partition path
fsView.loadPartitions(Collections.singletonList(partitionPath1));
// Assert that base file is returned for partitionPath1 only
assertEquals(1, fsView.getLatestFileSliceInRange(Collections.singletonList("1")).count());
}

/**
* Returns all file-slices including uncommitted ones.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -698,6 +701,27 @@ public void testGetLatestFileSlice() {
});
}

@Test
public void testLoadPartitions() {
String partitionPath = "/table2";

fsView.loadPartitions(Collections.singletonList(partitionPath));
verify(primary, times(1)).loadPartitions(Collections.singletonList(partitionPath));
verify(secondary, never()).loadPartitions(any());

resetMocks();
doThrow(new RuntimeException()).when(primary).loadPartitions(Collections.singletonList(partitionPath));
fsView.loadPartitions(Collections.singletonList(partitionPath));
verify(primary, times(1)).loadPartitions(Collections.singletonList(partitionPath));
verify(secondary, times(1)).loadPartitions(Collections.singletonList(partitionPath));

resetMocks();
doThrow(new RuntimeException()).when(secondary).loadPartitions(Collections.singletonList(partitionPath));
assertThrows(RuntimeException.class, () -> {
fsView.loadPartitions(Collections.singletonList(partitionPath));
});
}

@Test
public void testGetPreferredView() {
assertEquals(primary, fsView.getPreferredView());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
import org.apache.hudi.timeline.service.handlers.InstantStateHandler;
import org.apache.hudi.timeline.service.handlers.MarkerHandler;
import org.apache.hudi.timeline.service.handlers.TimelineHandler;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
import io.javalin.Javalin;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class RequestHandler {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class);
private static final TypeReference<List<String>> LIST_TYPE_REFERENCE = new TypeReference<List<String>>() {};

private final TimelineService.Config timelineServiceConfig;
private final FileSystemViewManager viewManager;
Expand Down Expand Up @@ -444,6 +447,19 @@ private void registerFileSlicesAPI() {
writeValueAsString(ctx, success);
}, false));

app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LOAD_PARTITIONS", 1);
String basePath = ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
try {
List<String> partitionPaths = OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM, String.class)
.getOrThrow(e -> new HoodieException("Partitions param is invalid")), LIST_TYPE_REFERENCE);
boolean success = sliceHandler.loadPartitions(basePath, partitionPaths);
writeValueAsString(ctx, success);
} catch (IOException e) {
throw new HoodieIOException("Failed to parse request parameter", e);
}
}, false));

app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new ViewHandler(ctx -> {
metricsRegistry.add("LOAD_ALL_PARTITIONS", 1);
boolean success = sliceHandler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,9 @@ public boolean loadAllPartitions(String basePath) {
viewManager.getFileSystemView(basePath).loadAllPartitions();
return true;
}

public boolean loadPartitions(String basePath, List<String> partitionPaths) {
viewManager.getFileSystemView(basePath).loadPartitions(partitionPaths);
return true;
}
}

0 comments on commit 28f67ff

Please sign in to comment.