Skip to content

Commit

Permalink
Fix a bug in the preprocessor rate limiting reloading (#1226)
Browse files Browse the repository at this point in the history
  • Loading branch information
kyle-sammons authored Feb 7, 2025
1 parent 3084918 commit f60e4e5
Showing 1 changed file with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public class DatasetRateLimitingService extends AbstractScheduledService {
private final DatasetMetadataStore datasetMetadataStore;
private final PreprocessorMetadataStore preprocessorMetadataStore;
private final AstraMetadataStoreChangeListener<DatasetMetadata> datasetListener =
(_) -> runOneIteration();
(_) -> updateDatasetMetadataList();
private final AstraMetadataStoreChangeListener<PreprocessorMetadata> preprocessorListener =
(_) -> runOneIteration();
(_) -> updatePreprocessorCount();

private final PreprocessorRateLimiter rateLimiter;
private ScheduledFuture<?> pendingTask;
Expand All @@ -41,6 +41,8 @@ public class DatasetRateLimitingService extends AbstractScheduledService {
private final Timer rateLimitReloadtimer;
private final AstraConfigs.PreprocessorConfig preprocessorConfig;

private int lastKnownPreprocessorCount;

public DatasetRateLimitingService(
DatasetMetadataStore datasetMetadataStore,
PreprocessorMetadataStore preprocessorMetadataStore,
Expand All @@ -58,11 +60,12 @@ public DatasetRateLimitingService(
true);

this.rateLimitReloadtimer = meterRegistry.timer(RATE_LIMIT_RELOAD_TIMER);
this.lastKnownPreprocessorCount = 1;
}

private void updateRateLimiter() {
private void updatePreprocessorCount() {
Timer.Sample sample = Timer.start(meterRegistry);
Integer preprocessorCountValue = 1;
int preprocessorCountValue = 1;
try {
List<PreprocessorMetadata> preprocessorMetadataList =
this.preprocessorMetadataStore.listSync();
Expand All @@ -72,6 +75,13 @@ private void updateRateLimiter() {
return;
}

// Only recreate the rate limiter if we have to
if (preprocessorCountValue == lastKnownPreprocessorCount) {
return;
}

lastKnownPreprocessorCount = preprocessorCountValue;

try {
List<DatasetMetadata> datasetMetadataList = datasetMetadataStore.listSync();

Expand All @@ -83,12 +93,26 @@ private void updateRateLimiter() {
}
}

private void updateDatasetMetadataList() {
Timer.Sample sample = Timer.start(meterRegistry);

try {
List<DatasetMetadata> datasetMetadataList = datasetMetadataStore.listSync();

this.rateLimiterPredicate =
rateLimiter.createBulkIngestRateLimiter(datasetMetadataList, lastKnownPreprocessorCount);
} finally {
// TODO: re-work this so that we can add success/failure tags and capture them
sample.stop(rateLimitReloadtimer);
}
}

@Override
protected synchronized void runOneIteration() {
if (pendingTask == null || pendingTask.getDelay(TimeUnit.SECONDS) <= 0) {
pendingTask =
executor.schedule(
this::updateRateLimiter,
this::updatePreprocessorCount,
this.preprocessorConfig.getDatasetRateLimitAggregationSecs(),
TimeUnit.SECONDS);
}
Expand Down

0 comments on commit f60e4e5

Please sign in to comment.