Skip to content

Commit

Permalink
remove special type handling (#910)
Browse files Browse the repository at this point in the history
  • Loading branch information
vthacker authored May 3, 2024
1 parent 9cb1cc1 commit 5d6d81b
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 175 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ public Query buildQuery(
* and if not attempt to register it with the mapper service
*/
public void reloadSchema() {
// todo - see SchemaAwareLogDocumentBuilderImpl.getDefaultLuceneFieldDefinitions
// this needs to be adapted to include other field types once we have support
// TreeMap here ensures the schema is sorted by natural order - to ensure multifields are
// registered by their parent first, and then fields added second
for (Map.Entry<String, LuceneFieldDef> entry : new TreeMap<>(chunkSchema).entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.astra.logstore.schema;

import com.slack.astra.logstore.LogMessage;
import com.slack.astra.proto.schema.Schema;

public class ReservedFields {
Expand All @@ -9,9 +10,25 @@ public class ReservedFields {
public static Schema.IngestSchema addPredefinedFields(Schema.IngestSchema currentSchema) {
Schema.SchemaField timestampField =
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.DATE).build();
Schema.SchemaField messageField =
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.TEXT).build();
Schema.SchemaField allField =
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.TEXT).build();
Schema.SchemaField timeSinceEpoch =
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.LONG).build();
Schema.SchemaField idField =
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.ID).build();

return Schema.IngestSchema.newBuilder()
.putAllFields(currentSchema.getFieldsMap())
.putFields(TIMESTAMP, timestampField)
.putFields(LogMessage.ReservedField.MESSAGE.fieldName, messageField)
.putFields(LogMessage.SystemField.ALL.fieldName, allField)
.putFields(LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName, timeSinceEpoch)
.putFields(LogMessage.SystemField.ID.fieldName, idField)
.build();
}

public static Schema.IngestSchema START_SCHEMA =
addPredefinedFields(Schema.IngestSchema.getDefaultInstance());
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static com.slack.astra.writer.SpanFormatter.isValidTimestamp;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableMap;
import com.slack.astra.logstore.DocumentBuilder;
import com.slack.astra.logstore.FieldDefMismatchException;
import com.slack.astra.logstore.LogMessage;
Expand Down Expand Up @@ -49,66 +48,6 @@ public class SchemaAwareLogDocumentBuilderImpl implements DocumentBuilder {
// TODO: In future, make this value configurable.
private static final int MAX_NESTING_DEPTH = 3;

private static void addTextField(
ImmutableMap.Builder<String, LuceneFieldDef> fieldDefBuilder,
String fieldName,
boolean isStored,
boolean isIndexed) {
fieldDefBuilder.put(
fieldName, new LuceneFieldDef(fieldName, FieldType.TEXT.name, isStored, isIndexed, false));
}

// TODO: Move this definition to the config file.
public static ImmutableMap<String, LuceneFieldDef> getDefaultLuceneFieldDefinitions(
boolean enableFullTextSearch) {
ImmutableMap.Builder<String, LuceneFieldDef> fieldDefBuilder = ImmutableMap.builder();

addTextField(fieldDefBuilder, LogMessage.SystemField.SOURCE.fieldName, true, false);
addTextField(fieldDefBuilder, LogMessage.ReservedField.MESSAGE.fieldName, false, true);
if (enableFullTextSearch) {
addTextField(fieldDefBuilder, LogMessage.SystemField.ALL.fieldName, false, true);
}

String[] fieldsAsString = {
LogMessage.SystemField.INDEX.fieldName,
LogMessage.ReservedField.TYPE.fieldName,
LogMessage.ReservedField.HOSTNAME.fieldName,
LogMessage.ReservedField.PACKAGE.fieldName,
LogMessage.ReservedField.TAG.fieldName,
LogMessage.ReservedField.USERNAME.fieldName,
LogMessage.ReservedField.PAYLOAD.fieldName,
LogMessage.ReservedField.NAME.fieldName,
LogMessage.ReservedField.SERVICE_NAME.fieldName,
LogMessage.ReservedField.TRACE_ID.fieldName,
LogMessage.ReservedField.PARENT_ID.fieldName
};

for (String fieldName : fieldsAsString) {
fieldDefBuilder.put(
fieldName, new LuceneFieldDef(fieldName, FieldType.STRING.name, false, true, true));
}

String[] fieldsAsIds = {
LogMessage.SystemField.ID.fieldName,
};
for (String fieldName : fieldsAsIds) {
fieldDefBuilder.put(
fieldName, new LuceneFieldDef(fieldName, FieldType.ID.name, false, true, true));
}

String[] fieldsAsLong = {
LogMessage.ReservedField.DURATION_MS.fieldName,
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
};

for (String fieldName : fieldsAsLong) {
fieldDefBuilder.put(
fieldName, new LuceneFieldDef(fieldName, FieldType.LONG.name, false, true, true));
}

return fieldDefBuilder.build();
}

/**
* This enum tracks the field conflict policy for a chunk.
*
Expand Down Expand Up @@ -228,12 +167,12 @@ private boolean isStored(String fieldName) {

private boolean isDocValueField(Schema.SchemaFieldType schemaFieldType, String fieldName) {
return !fieldName.equals(LogMessage.SystemField.SOURCE.fieldName)
|| !schemaFieldType.equals(Schema.SchemaFieldType.TEXT);
&& !schemaFieldType.equals(Schema.SchemaFieldType.TEXT);
}

private boolean isIndexed(Schema.SchemaFieldType schemaFieldType, String fieldName) {
return !fieldName.equals(LogMessage.SystemField.SOURCE.fieldName)
|| !schemaFieldType.equals(Schema.SchemaFieldType.BINARY);
&& !schemaFieldType.equals(Schema.SchemaFieldType.BINARY);
}

// In the future, we need this to take SchemaField instead of FieldType
Expand Down Expand Up @@ -289,10 +228,7 @@ public static SchemaAwareLogDocumentBuilderImpl build(
MeterRegistry meterRegistry) {
// Add basic fields by default
return new SchemaAwareLogDocumentBuilderImpl(
fieldConflictPolicy,
getDefaultLuceneFieldDefinitions(enableFullTextSearch),
enableFullTextSearch,
meterRegistry);
fieldConflictPolicy, enableFullTextSearch, meterRegistry);
}

static final String DROP_FIELDS_COUNTER = "dropped_fields";
Expand All @@ -312,7 +248,6 @@ public static SchemaAwareLogDocumentBuilderImpl build(

SchemaAwareLogDocumentBuilderImpl(
FieldConflictPolicy indexFieldConflictPolicy,
final Map<String, LuceneFieldDef> initialFields,
boolean enableFullTextSearch,
MeterRegistry meterRegistry) {
this.indexFieldConflictPolicy = indexFieldConflictPolicy;
Expand All @@ -323,9 +258,6 @@ public static SchemaAwareLogDocumentBuilderImpl build(
convertAndDuplicateFieldCounter = meterRegistry.counter(CONVERT_AND_DUPLICATE_FIELD_COUNTER);
convertErrorCounter = meterRegistry.counter(CONVERT_ERROR_COUNTER);
totalFieldsCounter = meterRegistry.counter(TOTAL_FIELDS_COUNTER);

totalFieldsCounter.increment(initialFields.size());
fieldDefMap.putAll(initialFields);
}

@Override
Expand Down Expand Up @@ -381,7 +313,7 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
doc,
LogMessage.SystemField.ID.fieldName,
message.getId().toStringUtf8(),
Schema.SchemaFieldType.KEYWORD,
Schema.SchemaFieldType.ID,
"",
0);
} else {
Expand Down Expand Up @@ -433,18 +365,6 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
// if we don't do this LogMessage#isValid will be unhappy when we recreate the message
// we need to fix this!!!
indexName = computedIndexName(indexName);
String msgType =
tags.containsKey(LogMessage.ReservedField.TYPE.fieldName)
? tags.get(LogMessage.ReservedField.TYPE.fieldName).getVStr()
: DEFAULT_LOG_MESSAGE_TYPE;

addField(
doc,
LogMessage.ReservedField.TYPE.fieldName,
msgType,
Schema.SchemaFieldType.KEYWORD,
"",
0);

jsonMap.put(LogMessage.ReservedField.SERVICE_NAME.fieldName, indexName);
addField(
Expand All @@ -463,7 +383,6 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
0);

tags.remove(LogMessage.ReservedField.SERVICE_NAME.fieldName);
tags.remove(LogMessage.ReservedField.TYPE.fieldName);

// if any top level fields are in the tags, we should remove them
tags.remove(LogMessage.ReservedField.PARENT_ID.fieldName);
Expand Down Expand Up @@ -539,6 +458,10 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
}
}

String msgType =
tags.containsKey(LogMessage.ReservedField.TYPE.fieldName)
? tags.get(LogMessage.ReservedField.TYPE.fieldName).getVStr()
: DEFAULT_LOG_MESSAGE_TYPE;
LogWireMessage logWireMessage =
new LogWireMessage(indexName, msgType, message.getId().toStringUtf8(), timestamp, jsonMap);
final String msgString = JsonUtil.writeAsString(logWireMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import brave.Tracing;
import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.google.protobuf.ByteString;
import com.slack.astra.blobfs.s3.S3CrtBlobFs;
import com.slack.astra.blobfs.s3.S3TestUtils;
import com.slack.astra.logstore.LogMessage;
Expand All @@ -33,7 +32,6 @@
import com.slack.astra.metadata.snapshot.SnapshotMetadata;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.proto.schema.Schema;
import com.slack.astra.testlib.MessageUtil;
import com.slack.astra.testlib.SpanUtil;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -499,16 +497,7 @@ public void tearDown() throws IOException, TimeoutException {

@Test
public void testAddInvalidMessagesToChunk() {
Trace.Span invalidSpan =
Trace.Span.newBuilder()
.setId(ByteString.copyFromUtf8("1"))
.addTags(
Trace.KeyValue.newBuilder()
.setVInt32(123)
.setKey(LogMessage.ReservedField.MESSAGE.fieldName)
.setFieldType(Schema.SchemaFieldType.INTEGER)
.build())
.build();
Trace.Span invalidSpan = Trace.Span.newBuilder().build();

// An Invalid message is dropped but failure counter is incremented.
chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import brave.Tracing;
import com.adobe.testing.s3mock.junit5.S3MockExtension;
import com.google.protobuf.ByteString;
import com.slack.astra.blobfs.s3.S3CrtBlobFs;
import com.slack.astra.blobfs.s3.S3TestUtils;
import com.slack.astra.logstore.LogMessage;
Expand All @@ -30,7 +29,6 @@
import com.slack.astra.metadata.snapshot.SnapshotMetadata;
import com.slack.astra.metadata.snapshot.SnapshotMetadataStore;
import com.slack.astra.proto.config.AstraConfigs;
import com.slack.astra.proto.schema.Schema;
import com.slack.astra.testlib.MessageUtil;
import com.slack.astra.testlib.SpanUtil;
import com.slack.service.murron.trace.Trace;
Expand Down Expand Up @@ -493,16 +491,7 @@ public void tearDown() throws IOException, TimeoutException {
public void testAddInvalidMessagesToChunk() {

// An Invalid message is dropped but failure counter is incremented.
Trace.Span invalidSpan =
Trace.Span.newBuilder()
.setId(ByteString.copyFromUtf8("1"))
.addTags(
Trace.KeyValue.newBuilder()
.setVInt32(123)
.setKey(LogMessage.ReservedField.MESSAGE.fieldName)
.setFieldType(Schema.SchemaFieldType.INTEGER)
.build())
.build();
Trace.Span invalidSpan = Trace.Span.newBuilder().build();
chunk.addMessage(invalidSpan, TEST_KAFKA_PARTITION_ID, 1);
chunk.commit();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testSchemaFields() throws Exception {
objectMapper.convertValue(
jsonNode.get("test").get("mappings").get("properties"), Map.class);
assertThat(map).isNotNull();
assertThat(map.size()).isEqualTo(31);
assertThat(map.size()).isEqualTo(25);
}

// todo - test mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,8 @@ public TestsWithRaiseErrorFieldConflictPolicy() throws IOException {}

@Test
public void failIndexingDocsWithMismatchedTypeErrors() {
Trace.KeyValue wrongField =
Trace.KeyValue.newBuilder()
.setKey(ReservedField.HOSTNAME.fieldName)
.setFieldType(Schema.SchemaFieldType.INTEGER)
.setVInt32(20000)
.build();

logStore.logStore.addMessage(
SpanUtil.makeSpan(100, "test", Instant.now(), List.of(wrongField)));
logStore.logStore.addMessage(Trace.Span.newBuilder().build());
addMessages(logStore.logStore, 1, 99, true);
Collection<LogMessage> results =
findAllMessages(logStore.logSearcher, MessageUtil.TEST_DATASET_NAME, "identifier", 1000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;

import com.google.common.collect.ImmutableMap;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.logstore.schema.SchemaAwareLogDocumentBuilderImpl;
import com.slack.astra.logstore.search.aggregations.AggBuilder;
import com.slack.astra.logstore.search.aggregations.AggBuilderBase;
import com.slack.astra.logstore.search.aggregations.AutoDateHistogramAggBuilder;
Expand All @@ -20,6 +20,8 @@
import com.slack.astra.logstore.search.aggregations.MovingFunctionAggBuilder;
import com.slack.astra.logstore.search.aggregations.SumAggBuilder;
import com.slack.astra.logstore.search.aggregations.UniqueCountAggBuilder;
import com.slack.astra.metadata.schema.FieldType;
import com.slack.astra.metadata.schema.LuceneFieldDef;
import com.slack.astra.testlib.TemporaryLogStoreAndSearcherExtension;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -59,11 +61,24 @@ public class OpenSearchAdapterTest {
public TemporaryLogStoreAndSearcherExtension logStoreAndSearcherRule =
new TemporaryLogStoreAndSearcherExtension(false);

private final OpenSearchAdapter openSearchAdapter =
new OpenSearchAdapter(
SchemaAwareLogDocumentBuilderImpl.getDefaultLuceneFieldDefinitions(false));
private final OpenSearchAdapter openSearchAdapter;

public OpenSearchAdapterTest() throws IOException {
ImmutableMap.Builder<String, LuceneFieldDef> fieldDefBuilder = ImmutableMap.builder();
fieldDefBuilder.put(
LogMessage.SystemField.ID.fieldName,
new LuceneFieldDef(
LogMessage.SystemField.ID.fieldName, FieldType.ID.name, false, true, true));
fieldDefBuilder.put(
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
new LuceneFieldDef(
LogMessage.SystemField.TIME_SINCE_EPOCH.fieldName,
FieldType.LONG.name,
false,
true,
true));
openSearchAdapter = new OpenSearchAdapter(fieldDefBuilder.build());

// We need to reload the schema so that query optimizations take into account the schema
openSearchAdapter.reloadSchema();
}
Expand Down
Loading

0 comments on commit 5d6d81b

Please sign in to comment.