From f60e4e575b2b59f034904931ef806fa4b73c8e86 Mon Sep 17 00:00:00 2001 From: Kyle <1023070+kyle-sammons@users.noreply.github.com> Date: Fri, 7 Feb 2025 15:23:43 -0800 Subject: [PATCH] Fix a bug in the preprocessor rate limiting reloading (#1226) --- .../DatasetRateLimitingService.java | 34 ++++++++++++++++--- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/astra/src/main/java/com/slack/astra/bulkIngestApi/DatasetRateLimitingService.java b/astra/src/main/java/com/slack/astra/bulkIngestApi/DatasetRateLimitingService.java index c9f8346fce..5aff7e9590 100644 --- a/astra/src/main/java/com/slack/astra/bulkIngestApi/DatasetRateLimitingService.java +++ b/astra/src/main/java/com/slack/astra/bulkIngestApi/DatasetRateLimitingService.java @@ -26,9 +26,9 @@ public class DatasetRateLimitingService extends AbstractScheduledService { private final DatasetMetadataStore datasetMetadataStore; private final PreprocessorMetadataStore preprocessorMetadataStore; private final AstraMetadataStoreChangeListener datasetListener = - (_) -> runOneIteration(); + (_) -> updateDatasetMetadataList(); private final AstraMetadataStoreChangeListener preprocessorListener = - (_) -> runOneIteration(); + (_) -> updatePreprocessorCount(); private final PreprocessorRateLimiter rateLimiter; private ScheduledFuture pendingTask; @@ -41,6 +41,8 @@ public class DatasetRateLimitingService extends AbstractScheduledService { private final Timer rateLimitReloadtimer; private final AstraConfigs.PreprocessorConfig preprocessorConfig; + private int lastKnownPreprocessorCount; + public DatasetRateLimitingService( DatasetMetadataStore datasetMetadataStore, PreprocessorMetadataStore preprocessorMetadataStore, @@ -58,11 +60,12 @@ public DatasetRateLimitingService( true); this.rateLimitReloadtimer = meterRegistry.timer(RATE_LIMIT_RELOAD_TIMER); + this.lastKnownPreprocessorCount = 1; } - private void updateRateLimiter() { + private void updatePreprocessorCount() { Timer.Sample sample = Timer.start(meterRegistry); - Integer preprocessorCountValue = 1; + int preprocessorCountValue = 1; try { List preprocessorMetadataList = this.preprocessorMetadataStore.listSync(); @@ -72,6 +75,13 @@ private void updateRateLimiter() { return; } + // Only recreate the rate limiter if we have to + if (preprocessorCountValue == lastKnownPreprocessorCount) { + return; + } + + lastKnownPreprocessorCount = preprocessorCountValue; + try { List datasetMetadataList = datasetMetadataStore.listSync(); @@ -83,12 +93,26 @@ private void updateRateLimiter() { } } + private void updateDatasetMetadataList() { + Timer.Sample sample = Timer.start(meterRegistry); + + try { + List datasetMetadataList = datasetMetadataStore.listSync(); + + this.rateLimiterPredicate = + rateLimiter.createBulkIngestRateLimiter(datasetMetadataList, lastKnownPreprocessorCount); + } finally { + // TODO: re-work this so that we can add success/failure tags and capture them + sample.stop(rateLimitReloadtimer); + } + } + @Override protected synchronized void runOneIteration() { if (pendingTask == null || pendingTask.getDelay(TimeUnit.SECONDS) <= 0) { pendingTask = executor.schedule( - this::updateRateLimiter, + this::updatePreprocessorCount, this.preprocessorConfig.getDatasetRateLimitAggregationSecs(), TimeUnit.SECONDS); }