Skip to content

Commit

Permalink
Enable rate limits to automatically update based on the amount of ava…
Browse files Browse the repository at this point in the history
…ilable preprocessors (#1215)

* Enable rate limits to automatically scale based on the amount of available preprocessors.

This should effectively unlock the ability to enable autoscaling on preprocessors as a
whole

* Remove this bit

* Readd this other bit
  • Loading branch information
kyle-sammons authored Jan 28, 2025
1 parent dd6e2ac commit 35bdc0a
Show file tree
Hide file tree
Showing 12 changed files with 307 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,41 +1,55 @@
package com.slack.astra.bulkIngestApi;

import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.slack.astra.metadata.core.AstraMetadataStoreChangeListener;
import com.slack.astra.metadata.dataset.DatasetMetadata;
import com.slack.astra.metadata.dataset.DatasetMetadataStore;
import com.slack.astra.metadata.preprocessor.PreprocessorMetadata;
import com.slack.astra.metadata.preprocessor.PreprocessorMetadataStore;
import com.slack.astra.preprocessor.PreprocessorRateLimiter;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.service.murron.trace.Trace;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;

/**
* Guava service that maintains an rate limiting object consistent with the value stored in the
* Guava service that maintains a rate limiting object consistent with the value stored in the
* dataset metadata store.
*/
public class DatasetRateLimitingService extends AbstractIdleService {
public class DatasetRateLimitingService extends AbstractScheduledService {
private final DatasetMetadataStore datasetMetadataStore;
private final PreprocessorMetadataStore preprocessorMetadataStore;
private final AstraMetadataStoreChangeListener<DatasetMetadata> datasetListener =
(_) -> updateRateLimiter();
(_) -> runOneIteration();
private final AstraMetadataStoreChangeListener<PreprocessorMetadata> preprocessorListener =
(_) -> runOneIteration();

private final PreprocessorRateLimiter rateLimiter;
private ScheduledFuture<?> pendingTask;
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private BiPredicate<String, List<Trace.Span>> rateLimiterPredicate;

private final MeterRegistry meterRegistry;
public static final String RATE_LIMIT_RELOAD_TIMER =
"preprocessor_dataset_rate_limit_reload_timer";
private final Timer rateLimitReloadtimer;
private final AstraConfigs.PreprocessorConfig preprocessorConfig;

public DatasetRateLimitingService(
DatasetMetadataStore datasetMetadataStore,
PreprocessorMetadataStore preprocessorMetadataStore,
AstraConfigs.PreprocessorConfig preprocessorConfig,
MeterRegistry meterRegistry) {
this.datasetMetadataStore = datasetMetadataStore;
this.meterRegistry = meterRegistry;

this.preprocessorMetadataStore = preprocessorMetadataStore;
this.preprocessorConfig = preprocessorConfig;
this.rateLimiter =
new PreprocessorRateLimiter(
meterRegistry,
Expand All @@ -48,24 +62,61 @@ public DatasetRateLimitingService(

private void updateRateLimiter() {
Timer.Sample sample = Timer.start(meterRegistry);
Integer preprocessorCountValue = 1;
try {
List<PreprocessorMetadata> preprocessorMetadataList =
this.preprocessorMetadataStore.listSync();
preprocessorCountValue = preprocessorMetadataList.size();
} catch (Exception e) {
sample.stop(rateLimitReloadtimer);
return;
}

try {
List<DatasetMetadata> datasetMetadataList = datasetMetadataStore.listSync();
this.rateLimiterPredicate = rateLimiter.createBulkIngestRateLimiter(datasetMetadataList);

this.rateLimiterPredicate =
rateLimiter.createBulkIngestRateLimiter(datasetMetadataList, preprocessorCountValue);
} finally {
// TODO: re-work this so that we can add success/failure tags and capture them
sample.stop(rateLimitReloadtimer);
}
}

@Override
protected synchronized void runOneIteration() {
if (pendingTask == null || pendingTask.getDelay(TimeUnit.SECONDS) <= 0) {
pendingTask =
executor.schedule(
this::updateRateLimiter,
this.preprocessorConfig.getDatasetRateLimitAggregationSecs(),
TimeUnit.SECONDS);
}
}

@Override
protected void startUp() throws Exception {
updateRateLimiter();
datasetMetadataStore.addListener(datasetListener);
this.preprocessorMetadataStore.addListener(preprocessorListener);

// We need to await for te cache to be initialized _before_ we try
// adding the metadata to the store. If we don't, then we end up
// clobbering the ZK init event, which prevents a listSync from
// ever being called
this.preprocessorMetadataStore.awaitCacheInitialized();
this.preprocessorMetadataStore.createSync(new PreprocessorMetadata());
}

@Override
protected void shutDown() throws Exception {
datasetMetadataStore.removeListener(datasetListener);
this.preprocessorMetadataStore.removeListener(preprocessorListener);
}

@Override
protected Scheduler scheduler() {
return Scheduler.newFixedRateSchedule(
1, this.preprocessorConfig.getDatasetRateLimitPeriodSecs(), TimeUnit.SECONDS);
}

public boolean tryAcquire(String index, List<Trace.Span> value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public CompletionStage<T> getAsync(String path) {
if (cachedModeledFramework != null) {
return cachedModeledFramework.withPath(zPath.resolved(path)).readThrough();
}

return modeledClient.withPath(zPath.resolved(path)).read();
}

Expand Down Expand Up @@ -223,7 +224,7 @@ public void removeListener(AstraMetadataStoreChangeListener<T> watcher) {
cachedModeledFramework.listenable().removeListener(listenerMap.remove(watcher));
}

private void awaitCacheInitialized() {
public void awaitCacheInitialized() {
try {
if (!cacheInitialized.await(zkConfig.getZkCacheInitTimeoutMs(), TimeUnit.MILLISECONDS)) {
// in the event we deadlock, go ahead and time this out at 30s and restart the pod
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.slack.astra.metadata.preprocessor;

import com.slack.astra.metadata.core.AstraMetadata;
import java.util.UUID;

/** A container for all the metadata needed by preprocessors. */
public class PreprocessorMetadata extends AstraMetadata {

public PreprocessorMetadata() {
super(UUID.randomUUID().toString());
}

public PreprocessorMetadata(String name) {
super(name);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof PreprocessorMetadata that)) return false;
return !super.equals(o);
}

@Override
public int hashCode() {
return super.hashCode();
}

@Override
public String toString() {
return "PreprocessorMetadata{" + '\'' + "name='" + name + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.slack.astra.metadata.preprocessor;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.slack.astra.metadata.core.MetadataSerializer;
import com.slack.astra.proto.metadata.Metadata;

public class PreprocessorMetadataSerializer implements MetadataSerializer<PreprocessorMetadata> {
private static PreprocessorMetadata fromPreprocessorMetadataProto(
Metadata.PreprocessorMetadata preprocessorMetadataProto) {
return new PreprocessorMetadata(preprocessorMetadataProto.getName());
}

private static Metadata.PreprocessorMetadata toPreprocessorMetadataProto(
PreprocessorMetadata metadata) {
return Metadata.PreprocessorMetadata.newBuilder().setName(metadata.name).build();
}

@Override
public String toJsonStr(PreprocessorMetadata metadata) throws InvalidProtocolBufferException {
if (metadata == null) throw new IllegalArgumentException("metadata object can't be null");

return printer.print(toPreprocessorMetadataProto(metadata));
}

@Override
public PreprocessorMetadata fromJsonStr(String data) throws InvalidProtocolBufferException {
Metadata.PreprocessorMetadata.Builder preProcessorBuilder =
Metadata.PreprocessorMetadata.newBuilder();
JsonFormat.parser().ignoringUnknownFields().merge(data, preProcessorBuilder);
return fromPreprocessorMetadataProto(preProcessorBuilder.build());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.slack.astra.metadata.preprocessor;

import com.slack.astra.metadata.core.AstraMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.zookeeper.CreateMode;

public class PreprocessorMetadataStore extends AstraMetadataStore<PreprocessorMetadata> {
public static final String PREPROCESSOR_ZK_PATH = "/preprocessors";

/**
* Initializes a cache slot metadata store at the CACHE_SLOT_ZK_PATH. This should be used to
* create/update the cache slots, and for listening to all cache slot events.
*/
public PreprocessorMetadataStore(
AsyncCuratorFramework curatorFramework,
AstraConfigs.ZookeeperConfig zkConfig,
boolean shouldCache) {
super(
curatorFramework,
zkConfig,
CreateMode.EPHEMERAL,
shouldCache,
new PreprocessorMetadataSerializer().toModelSerializer(),
PREPROCESSOR_ZK_PATH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class PreprocessorRateLimiter {
private static final Logger LOG = LoggerFactory.getLogger(PreprocessorRateLimiter.class);

private final int preprocessorCount;
private int preprocessorCount;

private final int maxBurstSeconds;

Expand Down Expand Up @@ -136,6 +136,12 @@ public static int getSpanBytes(List<Trace.Span> spans) {
return spans.stream().mapToInt(Trace.Span::getSerializedSize).sum();
}

public BiPredicate<String, List<Trace.Span>> createBulkIngestRateLimiter(
List<DatasetMetadata> datasetMetadataList, Integer preprocessorCount) {
this.preprocessorCount = preprocessorCount;
return this.createBulkIngestRateLimiter(datasetMetadataList);
}

public BiPredicate<String, List<Trace.Span>> createBulkIngestRateLimiter(
List<DatasetMetadata> datasetMetadataList) {

Expand Down
12 changes: 10 additions & 2 deletions astra/src/main/java/com/slack/astra/server/Astra.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.slack.astra.metadata.dataset.DatasetMetadataStore;
import com.slack.astra.metadata.fieldredaction.FieldRedactionMetadataStore;
import com.slack.astra.metadata.hpa.HpaMetricMetadataStore;
import com.slack.astra.metadata.preprocessor.PreprocessorMetadataStore;
import com.slack.astra.metadata.recovery.RecoveryNodeMetadataStore;
import com.slack.astra.metadata.recovery.RecoveryTaskMetadataStore;
import com.slack.astra.metadata.replica.ReplicaMetadataStore;
Expand Down Expand Up @@ -435,6 +436,10 @@ private static Set<Service> getServices(
new DatasetMetadataStore(
curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true);

PreprocessorMetadataStore preprocessorMetadataStore =
new PreprocessorMetadataStore(
curatorFramework, astraConfig.getMetadataStoreConfig().getZookeeperConfig(), true);

final AstraConfigs.PreprocessorConfig preprocessorConfig =
astraConfig.getPreprocessorConfig();
final int serverPort = preprocessorConfig.getServerConfig().getServerPort();
Expand All @@ -449,13 +454,16 @@ private static Set<Service> getServices(

services.add(
new CloseableLifecycleManager(
AstraConfigs.NodeRole.PREPROCESSOR, List.of(datasetMetadataStore)));
AstraConfigs.NodeRole.PREPROCESSOR,
List.of(datasetMetadataStore, preprocessorMetadataStore)));

BulkIngestKafkaProducer bulkIngestKafkaProducer =
new BulkIngestKafkaProducer(datasetMetadataStore, preprocessorConfig, meterRegistry);
services.add(bulkIngestKafkaProducer);

DatasetRateLimitingService datasetRateLimitingService =
new DatasetRateLimitingService(datasetMetadataStore, preprocessorConfig, meterRegistry);
new DatasetRateLimitingService(
datasetMetadataStore, preprocessorMetadataStore, preprocessorConfig, meterRegistry);
services.add(datasetRateLimitingService);

Schema.IngestSchema schema = Schema.IngestSchema.getDefaultInstance();
Expand Down
9 changes: 9 additions & 0 deletions astra/src/main/proto/astra_configs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,13 @@ message PreprocessorConfig {
// We intend to use ZK for the schema file in the future
// Path to the schema file in the local file system helps us iterate faster
string schema_file = 12;

// Dataset rate limit secs is a de-bounce setting. It's the time
// a service waits to take an action after a ZK notification.
int32 dataset_rate_limit_aggregation_secs = 13;

// Dataset rate limit period is the amount of time between each
// refresh of the dataset rate limits, whether or not a ZK event
// has triggered it
int32 dataset_rate_limit_period_secs = 14;
}
12 changes: 12 additions & 0 deletions astra/src/main/proto/metadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ package slack.proto.astra;

option java_package = "com.slack.astra.proto.metadata";

message PreprocessorMetadata {
string name = 1;
}

message CacheSlotMetadata {
enum CacheSlotState {
FREE = 0;
Expand Down Expand Up @@ -170,6 +174,14 @@ message HpaMetricMetadata {
double value = 3;
}

// For cluster wide metadata, such as the number of operating preprocessors
message AstraClusterMetadata {
string name = 1;

// The number of available/working preprocessors
int32 preprocessor_count = 2;
}

// Describes a set of partitions along with their effective start and end times
message DatasetPartitionMetadata {
// Start time this partition received traffic
Expand Down
Loading

0 comments on commit 35bdc0a

Please sign in to comment.