Skip to content

Commit

Permalink
Optimize parallelism in RPC calls for storage statistics (#1410)
Browse files Browse the repository at this point in the history
* Optimize parallelism in RPC calls for storage statistics

Signed-off-by: Ansari, Mujammil <[email protected]>

* Enhance batch size calculation

Signed-off-by: Ansari, Mujammil <[email protected]>

---------

Signed-off-by: Ansari, Mujammil <[email protected]>
  • Loading branch information
mujammil10 authored Dec 10, 2024
1 parent 4a830de commit eda8220
Showing 1 changed file with 36 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -48,10 +49,8 @@

public class StorageStatisticsProvider {

private static final int MIN_SPACE_BATCH_SIZE = 100;
private static final int MAX_SPACE_BATCH_SIZE = 10_000;
private static final int MIN_BATCH_COUNT = 5;
private static final int MAX_BATCH_COUNT = 20;
private static final int MAX_CONCURRENT_REQUEST_COUNT = 100;
private static final int MAX_SPACE_BATCH_SIZE = 50;

public static Future<StorageStatistics> provideStorageStatistics(Marker marker, long includeChangesSince) {
SpaceSelectionCondition ssc = new SpaceSelectionCondition();
Expand Down Expand Up @@ -118,38 +117,41 @@ private static Future<StorageStatistics> fetchFromStorage(Marker marker, String
}

private static Future<StorageStatistics> fetchFromStorage(Marker marker, Connector storage, List<String> spaceIds) {
if (spaceIds.size() > MAX_BATCH_COUNT * MAX_SPACE_BATCH_SIZE)
return Future.failedFuture(new IllegalArgumentException("Too many spaces in storage " + storage.id
+ " to gather storage statistics."));
int batchCount = 1;
if (spaceIds.size() > MAX_SPACE_BATCH_SIZE)
batchCount = MAX_BATCH_COUNT;
else if (spaceIds.size() > MIN_SPACE_BATCH_SIZE)
batchCount = MIN_BATCH_COUNT;
if (batchCount > 1) {
//Call this method recursively with the batches
int targetSize = (int) Math.ceil((float) spaceIds.size() / (float) batchCount);
return CompositeFuture.all(Lists.partition(spaceIds, targetSize)
.stream()
.map(batchSpaceIds -> fetchFromStorage(marker, storage, batchSpaceIds))
.collect(Collectors.toList()))
.compose(results -> Future.succeededFuture(mergeStats(results.list())));
}

Promise<StorageStatistics> p = Promise.promise();

GetStorageStatisticsEvent event = new GetStorageStatisticsEvent()
.withStreamId(marker.getName())
.withSpaceIds(spaceIds);
RpcClient.getInstanceFor(storage).execute(marker, event, true, ar -> {
if (ar.failed()) p.fail(ar.cause());
else {
if (!(ar.result() instanceof StorageStatistics)) p.fail("Wrong response returned by storage " + storage.id);
else p.complete((StorageStatistics) ar.result());
}
});
int targetSize = spaceIds.size() > MAX_CONCURRENT_REQUEST_COUNT * MAX_SPACE_BATCH_SIZE
? (int) Math.ceil((double) spaceIds.size() / MAX_CONCURRENT_REQUEST_COUNT) : MAX_SPACE_BATCH_SIZE;

return CompositeFuture.all(Lists.partition(spaceIds, targetSize)
.stream()
.map(batchSpaceIds -> fetchFromStorageInBatches(marker, storage, batchSpaceIds))
.collect(Collectors.toList()))
.compose(results -> Future.succeededFuture(mergeStats(results.list())));
}

private static Future<StorageStatistics> fetchFromStorageInBatches(Marker marker, Connector storage, List<String> spaceIds) {
Future<List<StorageStatistics>> f = Future.succeededFuture(new ArrayList<>());
for(List<String> batchSpaceIds : Lists.partition(spaceIds, MAX_SPACE_BATCH_SIZE)) {
f = f.compose(statsLists -> {
Promise<List<StorageStatistics>> p = Promise.promise();

GetStorageStatisticsEvent event = new GetStorageStatisticsEvent()
.withStreamId(marker.getName())
.withSpaceIds(batchSpaceIds);
RpcClient.getInstanceFor(storage).execute(marker, event, true, ar -> {
if (ar.failed()) p.fail(ar.cause());
else {
if (!(ar.result() instanceof StorageStatistics)) p.fail("Wrong response returned by storage " + storage.id);
else {
statsLists.add((StorageStatistics) ar.result());
p.complete(statsLists);
}
}
});
return p.future();
});
}

return p.future();
return f.compose(statsList -> Future.succeededFuture(mergeStats(statsList)));
}

}

0 comments on commit eda8220

Please sign in to comment.