Skip to content

Commit

Permalink
Indexer local Bulk ingest API
Browse files Browse the repository at this point in the history
  • Loading branch information
zarna1parekh committed Nov 18, 2024
1 parent 16d8f56 commit 3563980
Show file tree
Hide file tree
Showing 17 changed files with 259 additions and 94 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.slack.astra.bulkIngestApi;

import static com.linecorp.armeria.common.HttpStatus.CREATED;
import static com.linecorp.armeria.common.HttpStatus.INTERNAL_SERVER_ERROR;

import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.server.annotation.Post;
import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.astra.chunkManager.ChunkManager;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.proto.schema.Schema;
import com.slack.service.murron.trace.Trace;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is responsible for defining the http endpoint behavior for the bulk ingest. It is
* expected to handle appropriate rate limiting, error handling, and submit the parsed messages to
* Kafka for ingestion.
*/
public class BulkLocalIngestApi {
private static final Logger LOG = LoggerFactory.getLogger(BulkLocalIngestApi.class);

// private final BulkIngestKafkaProducer bulkIngestKafkaProducer;
// private final DatasetRateLimitingService datasetRateLimitingService;
// private final MeterRegistry meterRegistry;
// private final Counter incomingByteTotal;
// private final Counter incomingDocsTotal;
// private final Timer bulkIngestTimer;
// private final String BULK_INGEST_INCOMING_BYTE_TOTAL = "astra_preprocessor_incoming_byte";
// private final String BULK_INGEST_INCOMING_BYTE_DOCS = "astra_preprocessor_incoming_docs";
// private final String BULK_INGEST_ERROR = "astra_preprocessor_error";
// private final String BULK_INGEST_TIMER = "astra_preprocessor_bulk_ingest";
// private final int rateLimitExceededErrorCode;
private final ChunkManager<LogMessage> chunkManager;
private final Schema.IngestSchema schema;

// private final Counter bulkIngestErrorCounter;

public BulkLocalIngestApi(
// MeterRegistry meterRegistry,
ChunkManager<LogMessage> chunkManager, Schema.IngestSchema schema) {

// this.bulkIngestKafkaProducer = bulkIngestKafkaProducer;
// this.datasetRateLimitingService = datasetRateLimitingService;
// this.meterRegistry = meterRegistry;
// this.incomingByteTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_TOTAL);
// this.incomingDocsTotal = meterRegistry.counter(BULK_INGEST_INCOMING_BYTE_DOCS);
// this.bulkIngestTimer = meterRegistry.timer(BULK_INGEST_TIMER);
// if (rateLimitExceededErrorCode <= 0 || rateLimitExceededErrorCode > 599) {
// this.rateLimitExceededErrorCode = 400;
// } else {
// this.rateLimitExceededErrorCode = rateLimitExceededErrorCode;
// }
this.schema = schema;
this.chunkManager = chunkManager;
// this.bulkIngestErrorCounter = meterRegistry.counter(BULK_INGEST_ERROR);
}

@Post("/_local_bulk")
public HttpResponse addDocument(String bulkRequest) {
// 1. Astra does not support the concept of "updates". It's always an add.
// 2. The "index" is used as the span name
CompletableFuture<HttpResponse> future = new CompletableFuture<>();
// Timer.Sample sample = Timer.start(meterRegistry);
// future.thenRun(() -> sample.stop(bulkIngestTimer));

int count = 0;

try {
byte[] bulkRequestBytes = bulkRequest.getBytes(StandardCharsets.UTF_8);
// incomingByteTotal.increment(bulkRequestBytes.length);
Map<String, List<Trace.Span>> docs = Map.of();
try {
docs = BulkApiRequestParser.parseRequest(bulkRequestBytes, schema);
} catch (Exception e) {
LOG.error("Request failed ", e);
// bulkIngestErrorCounter.increment();
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}
LOG.info("Parsed docs message: {}", docs);

// todo - our rate limiter doesn't have a way to acquire permits across multiple
// datasets
// so today as a limitation we reject any request that has documents against
// multiple indexes
// We think most indexing requests will be against 1 index
if (docs.keySet().size() > 1) {
BulkIngestResponse response =
new BulkIngestResponse(0, 0, "request must contain only 1 unique index");
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
// bulkIngestErrorCounter.increment();
return HttpResponse.of(future);
}

// for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
// incomingDocsTotal.increment(indexDocs.getValue().size());
// final String index = indexDocs.getKey();
// if (!datasetRateLimitingService.tryAcquire(index, indexDocs.getValue())) {
// BulkIngestResponse response = new BulkIngestResponse(0, 0, "rate limit exceeded");
// future.complete(
// HttpResponse.ofJson(HttpStatus.valueOf(rateLimitExceededErrorCode),
// response));
// return HttpResponse.of(future);
// }
// }

// todo - explore the possibility of using the blocking task executor backed by virtual
// threads to fulfill this

for (Map.Entry<String, List<Trace.Span>> indexDocs : docs.entrySet()) {
for (Trace.Span span : indexDocs.getValue()) {
try {
chunkManager.addMessage(span, span.getSerializedSize(), String.valueOf(0), 12345, true);
count += 1;
// return HttpResponse.of(future);
} catch (Exception e) {
LOG.error("Request failed ", e);
// bulkIngestErrorCounter.increment();
future.complete(
HttpResponse.ofJson(
INTERNAL_SERVER_ERROR, new BulkIngestResponse(0, 0, e.getMessage())));
}
}
}
} catch (Exception e) {
LOG.error("Request failed ", e);
// bulkIngestErrorCounter.increment();
BulkIngestResponse response = new BulkIngestResponse(0, 0, e.getMessage());
future.complete(HttpResponse.ofJson(INTERNAL_SERVER_ERROR, response));
}

future.complete(HttpResponse.ofJson(CREATED, new BulkIngestResponse(count, 0, "")));
return HttpResponse.of(future);
}
}
6 changes: 4 additions & 2 deletions astra/src/main/java/com/slack/astra/chunk/ReadWriteChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public static SearchMetadata toSearchMetadata(String snapshotName, SearchContext
}

/** Index the message in the logstore and update the chunk data time range. */
public void addMessage(Trace.Span message, String kafkaPartitionId, long offset) {
public void addMessage(Trace.Span message, String kafkaPartitionId, long offset, boolean local_update) {
if (!this.kafkaPartitionId.equals(kafkaPartitionId)) {
throw new IllegalArgumentException(
"All messages for this chunk should belong to partition: "
Expand All @@ -158,7 +158,9 @@ public void addMessage(Trace.Span message, String kafkaPartitionId, long offset)
}
chunkInfo.updateDataTimeRange(timestamp.toEpochMilli());

chunkInfo.updateMaxOffset(offset);
if (local_update) {
chunkInfo.updateMaxOffset(offset);
}
} else {
throw new IllegalStateException(String.format("Chunk %s is read only", chunkInfo));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public static CachingChunkManager<LogMessage> fromConfig(
}

@Override
public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
public void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert)
throws IOException {
throw new UnsupportedOperationException(
"Adding messages is not supported on a caching chunk manager");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.Map;

public interface ChunkManager<T> {
void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
void addMessage(Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_update)
throws IOException;

SearchResult<T> query(SearchQuery query, Duration queryTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public IndexingChunkManager(
*/
@Override
public void addMessage(
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert)
throws IOException {
if (stopIngestion) {
// Currently, this flag is set on only a chunkRollOverException.
Expand All @@ -175,7 +175,7 @@ public void addMessage(

// find the active chunk and add a message to it
ReadWriteChunk<T> currentChunk = getOrCreateActiveChunk(kafkaPartitionId, indexerConfig);
currentChunk.addMessage(message, kafkaPartitionId, offset);
currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert);
long currentIndexedMessages = liveMessagesIndexedGauge.incrementAndGet();
long currentIndexedBytes = liveBytesIndexedGauge.addAndGet(msgSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public RecoveryChunkManager(

@Override
public void addMessage(
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset)
final Trace.Span message, long msgSize, String kafkaPartitionId, long offset, boolean local_insert)
throws IOException {
if (readOnly) {
LOG.warn("Ingestion is stopped since the chunk is in read only mode.");
Expand All @@ -89,7 +89,7 @@ public void addMessage(

// find the active chunk and add a message to it
ReadWriteChunk<T> currentChunk = getOrCreateActiveChunk(kafkaPartitionId);
currentChunk.addMessage(message, kafkaPartitionId, offset);
currentChunk.addMessage(message, kafkaPartitionId, offset, local_insert);
liveMessagesIndexedGauge.incrementAndGet();
liveBytesIndexedGauge.addAndGet(msgSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ public Builder withRequestTimeout(Duration requestTimeout) {
return this;
}

public Builder maxContentLength(long maxRequestLength) {
serverBuilder.maxRequestLength(maxRequestLength);
return this;
}

public Builder withTracing(AstraConfigs.TracingConfig tracingConfig) {
// span handlers is an ordered list, so we need to be careful with ordering
if (tracingConfig.getCommonTagsCount() > 0) {
Expand Down
18 changes: 14 additions & 4 deletions astra/src/main/java/com/slack/astra/server/Astra.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.slack.astra.blobfs.S3AsyncUtil;
import com.slack.astra.bulkIngestApi.BulkIngestApi;
import com.slack.astra.bulkIngestApi.BulkIngestKafkaProducer;
import com.slack.astra.bulkIngestApi.BulkLocalIngestApi;
import com.slack.astra.bulkIngestApi.DatasetRateLimitingService;
import com.slack.astra.chunkManager.CachingChunkManager;
import com.slack.astra.chunkManager.IndexingChunkManager;
Expand Down Expand Up @@ -185,13 +186,22 @@ private static Set<Service> getServices(
final int serverPort = astraConfig.getIndexerConfig().getServerConfig().getServerPort();
Duration requestTimeout =
Duration.ofMillis(astraConfig.getIndexerConfig().getServerConfig().getRequestTimeoutMs());
ArmeriaService armeriaService =
ArmeriaService.Builder armeriaServiceBuilder =
new ArmeriaService.Builder(serverPort, "astraIndex", meterRegistry)
.withRequestTimeout(requestTimeout)
.maxContentLength(2000000000)
.withTracing(astraConfig.getTracingConfig())
.withGrpcService(searcher)
.build();
services.add(armeriaService);
.withGrpcService(searcher);
Schema.IngestSchema schema = SchemaUtil.parseSchema(Path.of(""));
LOG.info(
"Loaded schema with fields count: {}, defaults count: {}",
schema.getFieldsCount(),
schema.getDefaultsCount());
schema = ReservedFields.addPredefinedFields(schema);
BulkLocalIngestApi localOpenSearchBulkApiService =
new BulkLocalIngestApi(chunkManager, schema);
armeriaServiceBuilder.withAnnotatedService(localOpenSearchBulkApiService);
services.add(armeriaServiceBuilder.build());
}

if (roles.contains(AstraConfigs.NodeRole.QUERY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public boolean insertRecord(ConsumerRecord<String, byte[]> record) throws IOExce
Trace.Span.parseFrom(record.value()),
record.serializedValueSize(),
String.valueOf(record.partition()),
record.offset());
record.offset(),
false);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void testAddAndSearchChunk() throws IOException {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -208,7 +208,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException {
TimeUnit.MILLISECONDS.convert(messages.get(0).getTimestamp(), TimeUnit.MICROSECONDS);
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -256,7 +256,7 @@ public void testAddAndSearchChunkInTimeRange() throws IOException {
final long newMessageEndTimeEpochMs =
TimeUnit.MILLISECONDS.convert(newMessages.get(99).getTimestamp(), TimeUnit.MICROSECONDS);
for (Trace.Span m : newMessages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -325,7 +325,7 @@ public void testSearchInReadOnlyChunk() throws IOException {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand Down Expand Up @@ -358,7 +358,7 @@ public void testAddMessageToReadOnlyChunk() {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand All @@ -370,15 +370,15 @@ public void testAddMessageToReadOnlyChunk() {
int finalOffset = offset;
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(
() -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset));
() -> chunk.addMessage(SpanUtil.makeSpan(101), TEST_KAFKA_PARTITION_ID, finalOffset, false));
}

@Test
public void testMessageFromDifferentPartitionFails() {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
chunk.commit();
Expand All @@ -391,15 +391,15 @@ public void testMessageFromDifferentPartitionFails() {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(
() ->
chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset));
chunk.addMessage(SpanUtil.makeSpan(101), "differentKafkaPartition", finalOffset, false));
}

@Test
public void testCommitBeforeSnapshot() throws IOException {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}
assertThat(chunk.isReadOnly()).isFalse();
Expand Down Expand Up @@ -503,7 +503,7 @@ public void testAddInvalidMessagesToChunk() {
Trace.Span invalidSpan = Trace.Span.newBuilder().build();

// An Invalid message is dropped but failure counter is incremented.
chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1);
chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1, false);
chunk.commit();

assertThat(getCount(MESSAGES_RECEIVED_COUNTER, registry)).isEqualTo(1);
Expand Down Expand Up @@ -595,7 +595,7 @@ public void testSnapshotToNonExistentS3BucketFails()
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}

Expand Down Expand Up @@ -653,7 +653,7 @@ public void testSnapshotToS3UsingChunkApi() throws Exception {
List<Trace.Span> messages = SpanUtil.makeSpansWithTimeDifference(1, 100, 1, Instant.now());
int offset = 1;
for (Trace.Span m : messages) {
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset);
chunk.addMessage(m, TEST_KAFKA_PARTITION_ID, offset, false);
offset++;
}

Expand Down
Loading

0 comments on commit 3563980

Please sign in to comment.