diff --git a/xyz-hub-service/src/main/java/com/here/xyz/hub/connectors/statistics/StorageStatisticsProvider.java b/xyz-hub-service/src/main/java/com/here/xyz/hub/connectors/statistics/StorageStatisticsProvider.java index 537a9f4838..d75fd6d0c9 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/hub/connectors/statistics/StorageStatisticsProvider.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/hub/connectors/statistics/StorageStatisticsProvider.java @@ -38,6 +38,7 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; import io.vertx.core.Promise; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -48,10 +49,8 @@ public class StorageStatisticsProvider { - private static final int MIN_SPACE_BATCH_SIZE = 100; - private static final int MAX_SPACE_BATCH_SIZE = 10_000; - private static final int MIN_BATCH_COUNT = 5; - private static final int MAX_BATCH_COUNT = 20; + private static final int MAX_CONCURRENT_REQUEST_COUNT = 100; + private static final int MAX_SPACE_BATCH_SIZE = 50; public static Future provideStorageStatistics(Marker marker, long includeChangesSince) { SpaceSelectionCondition ssc = new SpaceSelectionCondition(); @@ -118,38 +117,41 @@ private static Future fetchFromStorage(Marker marker, String } private static Future fetchFromStorage(Marker marker, Connector storage, List spaceIds) { - if (spaceIds.size() > MAX_BATCH_COUNT * MAX_SPACE_BATCH_SIZE) - return Future.failedFuture(new IllegalArgumentException("Too many spaces in storage " + storage.id - + " to gather storage statistics.")); - int batchCount = 1; - if (spaceIds.size() > MAX_SPACE_BATCH_SIZE) - batchCount = MAX_BATCH_COUNT; - else if (spaceIds.size() > MIN_SPACE_BATCH_SIZE) - batchCount = MIN_BATCH_COUNT; - if (batchCount > 1) { - //Call this method recursively with the batches - int targetSize = (int) Math.ceil((float) spaceIds.size() / (float) batchCount); - return CompositeFuture.all(Lists.partition(spaceIds, targetSize) - .stream() - .map(batchSpaceIds -> fetchFromStorage(marker, storage, batchSpaceIds)) - .collect(Collectors.toList())) - .compose(results -> Future.succeededFuture(mergeStats(results.list()))); - } - Promise p = Promise.promise(); - - GetStorageStatisticsEvent event = new GetStorageStatisticsEvent() - .withStreamId(marker.getName()) - .withSpaceIds(spaceIds); - RpcClient.getInstanceFor(storage).execute(marker, event, true, ar -> { - if (ar.failed()) p.fail(ar.cause()); - else { - if (!(ar.result() instanceof StorageStatistics)) p.fail("Wrong response returned by storage " + storage.id); - else p.complete((StorageStatistics) ar.result()); - } - }); + int targetSize = spaceIds.size() > MAX_CONCURRENT_REQUEST_COUNT * MAX_SPACE_BATCH_SIZE + ? (int) Math.ceil((double) spaceIds.size() / MAX_CONCURRENT_REQUEST_COUNT) : MAX_SPACE_BATCH_SIZE; + + return CompositeFuture.all(Lists.partition(spaceIds, targetSize) + .stream() + .map(batchSpaceIds -> fetchFromStorageInBatches(marker, storage, batchSpaceIds)) + .collect(Collectors.toList())) + .compose(results -> Future.succeededFuture(mergeStats(results.list()))); + } + + private static Future fetchFromStorageInBatches(Marker marker, Connector storage, List spaceIds) { + Future> f = Future.succeededFuture(new ArrayList<>()); + for(List batchSpaceIds : Lists.partition(spaceIds, MAX_SPACE_BATCH_SIZE)) { + f = f.compose(statsLists -> { + Promise> p = Promise.promise(); + + GetStorageStatisticsEvent event = new GetStorageStatisticsEvent() + .withStreamId(marker.getName()) + .withSpaceIds(batchSpaceIds); + RpcClient.getInstanceFor(storage).execute(marker, event, true, ar -> { + if (ar.failed()) p.fail(ar.cause()); + else { + if (!(ar.result() instanceof StorageStatistics)) p.fail("Wrong response returned by storage " + storage.id); + else { + statsLists.add((StorageStatistics) ar.result()); + p.complete(statsLists); + } + } + }); + return p.future(); + }); + } - return p.future(); + return f.compose(statsList -> Future.succeededFuture(mergeStats(statsList))); } }