From 348a08810e55167f8beb1de448c24dab6c308c3f Mon Sep 17 00:00:00 2001 From: Varun Thacker Date: Thu, 2 May 2024 11:09:50 -0700 Subject: [PATCH] Add true date field support and also index "@timestamp" as a date (#909) * index an additioanl @timestamp field * cleaner * more changes --- .../opensearch/BulkApiRequestParser.java | 38 ++++++--------- .../astra/logstore/schema/ReservedFields.java | 17 +++++++ .../SchemaAwareLogDocumentBuilderImpl.java | 6 ++- .../astra/metadata/schema/FieldType.java | 17 ++++++- .../java/com/slack/astra/server/Astra.java | 2 + .../com/slack/astra/writer/SpanFormatter.java | 25 ++++++++++ astra/src/main/proto/trace.proto | 3 ++ .../opensearch/BulkApiRequestParserTest.java | 21 ++++++--- .../schema/SpanFormatterWithSchemaTest.java | 29 ++++++++++-- .../writer/LogMessageWriterImplTest.java | 47 +++++++++++++++++++ 10 files changed, 168 insertions(+), 37 deletions(-) create mode 100644 astra/src/main/java/com/slack/astra/logstore/schema/ReservedFields.java diff --git a/astra/src/main/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParser.java b/astra/src/main/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParser.java index 52e516f364..0e91f375ca 100644 --- a/astra/src/main/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParser.java +++ b/astra/src/main/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParser.java @@ -3,15 +3,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.ByteString; import com.slack.astra.logstore.LogMessage; +import com.slack.astra.logstore.schema.ReservedFields; import com.slack.astra.proto.schema.Schema; import com.slack.astra.writer.SpanFormatter; import com.slack.service.murron.trace.Trace; import java.io.IOException; import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -49,47 +46,40 @@ public static Map> parseRequest( * in millis 2. Check if a field called `@timestamp` exists and parse that as a date (since * logstash sets that) 3. Use the current time from the ingestMetadata */ - public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) { + public static long getTimestampFromIngestDocument(Map sourceAndMetadata) { try { - if (ingestDocument.hasField("@timestamp")) { - String dateString = ingestDocument.getFieldValue("@timestamp", String.class); - LocalDateTime localDateTime = - LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME); - Instant instant = localDateTime.toInstant(ZoneOffset.UTC); + if (sourceAndMetadata.containsKey(ReservedFields.TIMESTAMP)) { + String dateString = (String) sourceAndMetadata.get(ReservedFields.TIMESTAMP); + Instant instant = Instant.parse(dateString); return instant.toEpochMilli(); } // assumption that the provided timestamp is in millis - // at some point both th unit and field need to be configurable - // when we do that, remember to change the called to appropriately remove the field - if (ingestDocument.hasField("timestamp")) { - return ingestDocument.getFieldValue("timestamp", Long.class); + // at some point both the unit and field need to be configurable + if (sourceAndMetadata.containsKey("timestamp")) { + return (long) sourceAndMetadata.get("timestamp"); } - if (ingestDocument.hasField("_timestamp")) { - return ingestDocument.getFieldValue("_timestamp", Long.class); + if (sourceAndMetadata.containsKey("_timestamp")) { + return (long) sourceAndMetadata.get("_timestamp"); } } catch (Exception e) { LOG.warn( "Unable to parse timestamp from ingest document. Using current time as timestamp", e); } - return ((ZonedDateTime) - ingestDocument - .getIngestMetadata() - .getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC))) - .toInstant() - .toEpochMilli(); + // We tried parsing 3 timestamp fields and failed. Use the current time + return Instant.now().toEpochMilli(); } @VisibleForTesting public static Trace.Span fromIngestDocument( IngestDocument ingestDocument, Schema.IngestSchema schema) { - long timestampInMillis = getTimestampFromIngestDocument(ingestDocument); - Map sourceAndMetadata = ingestDocument.getSourceAndMetadata(); + + long timestampInMillis = getTimestampFromIngestDocument(sourceAndMetadata); String id = (String) sourceAndMetadata.get(IngestDocument.Metadata.ID.getFieldName()); // See https://blog.mikemccandless.com/2014/05/choosing-fast-unique-identifier-uuid.html on how // to improve this diff --git a/astra/src/main/java/com/slack/astra/logstore/schema/ReservedFields.java b/astra/src/main/java/com/slack/astra/logstore/schema/ReservedFields.java new file mode 100644 index 0000000000..442da6e809 --- /dev/null +++ b/astra/src/main/java/com/slack/astra/logstore/schema/ReservedFields.java @@ -0,0 +1,17 @@ +package com.slack.astra.logstore.schema; + +import com.slack.astra.proto.schema.Schema; + +public class ReservedFields { + + public static final String TIMESTAMP = "@timestamp"; + + public static Schema.IngestSchema addPredefinedFields(Schema.IngestSchema currentSchema) { + Schema.SchemaField timestampField = + Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.DATE).build(); + return Schema.IngestSchema.newBuilder() + .putAllFields(currentSchema.getFieldsMap()) + .putFields(TIMESTAMP, timestampField) + .build(); + } +} diff --git a/astra/src/main/java/com/slack/astra/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java b/astra/src/main/java/com/slack/astra/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java index ce0fcc7a39..7ae8653f5a 100644 --- a/astra/src/main/java/com/slack/astra/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java +++ b/astra/src/main/java/com/slack/astra/logstore/schema/SchemaAwareLogDocumentBuilderImpl.java @@ -486,8 +486,10 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException { addField(doc, keyValue.getKey(), keyValue.getVStr(), Schema.SchemaFieldType.IP, "", 0); jsonMap.put(keyValue.getKey(), keyValue.getVStr()); } else if (schemaFieldType == Schema.SchemaFieldType.DATE) { - addField(doc, keyValue.getKey(), keyValue.getVInt64(), Schema.SchemaFieldType.DATE, "", 0); - jsonMap.put(keyValue.getKey(), keyValue.getVInt64()); + Instant instant = + Instant.ofEpochSecond(keyValue.getVDate().getSeconds(), keyValue.getVDate().getNanos()); + addField(doc, keyValue.getKey(), instant, Schema.SchemaFieldType.DATE, "", 0); + jsonMap.put(keyValue.getKey(), instant.toString()); } else if (schemaFieldType == Schema.SchemaFieldType.BOOLEAN) { addField( doc, keyValue.getKey(), keyValue.getVBool(), Schema.SchemaFieldType.BOOLEAN, "", 0); diff --git a/astra/src/main/java/com/slack/astra/metadata/schema/FieldType.java b/astra/src/main/java/com/slack/astra/metadata/schema/FieldType.java index 5e60c87f55..1e73cbe47b 100644 --- a/astra/src/main/java/com/slack/astra/metadata/schema/FieldType.java +++ b/astra/src/main/java/com/slack/astra/metadata/schema/FieldType.java @@ -6,6 +6,7 @@ import com.google.protobuf.ByteString; import com.slack.astra.proto.schema.Schema; import java.net.InetAddress; +import java.time.Instant; import java.util.List; import java.util.Set; import org.apache.lucene.document.Document; @@ -133,7 +134,21 @@ public Schema.SchemaFieldType toSchemaFieldType() { DATE("date") { @Override public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) { - LONG.addField(doc, name, value, fieldDef); + if (value instanceof Instant instant) { + long timeSinceEpoch = instant.toEpochMilli(); + if (fieldDef.isIndexed) { + doc.add(new LongPoint(name, timeSinceEpoch)); + } + if (fieldDef.isStored) { + doc.add(new StoredField(name, instant.toString())); + } + if (fieldDef.storeDocValue) { + doc.add(new NumericDocValuesField(name, timeSinceEpoch)); + } + } else { + // back-compat + LONG.addField(doc, name, value, fieldDef); + } } @Override diff --git a/astra/src/main/java/com/slack/astra/server/Astra.java b/astra/src/main/java/com/slack/astra/server/Astra.java index 7d974ce4e9..87a4b092a1 100644 --- a/astra/src/main/java/com/slack/astra/server/Astra.java +++ b/astra/src/main/java/com/slack/astra/server/Astra.java @@ -21,6 +21,7 @@ import com.slack.astra.clusterManager.SnapshotDeletionService; import com.slack.astra.elasticsearchApi.ElasticsearchApiService; import com.slack.astra.logstore.LogMessage; +import com.slack.astra.logstore.schema.ReservedFields; import com.slack.astra.logstore.search.AstraDistributedQueryService; import com.slack.astra.logstore.search.AstraLocalQueryService; import com.slack.astra.metadata.cache.CacheSlotMetadataStore; @@ -408,6 +409,7 @@ private static Set getServices( } else { LOG.info("No schema file provided, using default schema"); } + schema = ReservedFields.addPredefinedFields(schema); BulkIngestApi openSearchBulkApiService = new BulkIngestApi( bulkIngestKafkaProducer, diff --git a/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java b/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java index 43d5400d5b..419d0c9fac 100644 --- a/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java +++ b/astra/src/main/java/com/slack/astra/writer/SpanFormatter.java @@ -2,6 +2,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Timestamp; import com.slack.astra.logstore.LogMessage; import com.slack.astra.proto.schema.Schema; import com.slack.service.murron.Murron; @@ -12,10 +13,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** A utility class that converts a Span into a LogMessage, Json map to Span */ public class SpanFormatter { + private static final Logger LOG = LoggerFactory.getLogger(SpanFormatter.class); + public static final String DEFAULT_LOG_MESSAGE_TYPE = "INFO"; public static final String DEFAULT_INDEX_NAME = "unknown"; @@ -87,6 +92,23 @@ public static Trace.Span toSpan( return spanBuilder.build(); } + public static Timestamp parseDate(String dateStr, Schema.SchemaFieldType type) { + Instant instant; + try { + // type will expose parsing params in the future + // for now we'll just use Instant.parse + instant = Instant.parse(dateStr); + } catch (Exception e) { + // easier to debug rather than to skip or put current value + LOG.warn("Failed to parse date: {}", dateStr, e); + instant = Instant.EPOCH; + } + return Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build(); + } + public static Trace.KeyValue makeTraceKV(String key, Object value, Schema.SchemaFieldType type) { Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder(); tagBuilder.setKey(key); @@ -106,6 +128,9 @@ public static Trace.KeyValue makeTraceKV(String key, Object value, Schema.Schema } case DATE -> { tagBuilder.setFieldType(Schema.SchemaFieldType.DATE); + tagBuilder.setVDate(parseDate(value.toString(), type)); + // setting both for backward compatibility while deploying preprocessor and indexer + // I however commented it while testing to make sure all tests use the new field tagBuilder.setVInt64(Instant.parse(value.toString()).toEpochMilli()); } case BOOLEAN -> { diff --git a/astra/src/main/proto/trace.proto b/astra/src/main/proto/trace.proto index 6cdba6ad89..f70c50b43e 100644 --- a/astra/src/main/proto/trace.proto +++ b/astra/src/main/proto/trace.proto @@ -8,6 +8,8 @@ option java_package = "com.slack.service.murron.trace"; option go_package = "com.slack/astra/gen/proto/tracepb"; import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; + import "schema.proto"; message KeyValue { @@ -19,6 +21,7 @@ message KeyValue { bytes v_binary = 7; int32 v_int32 = 8; float v_float32 = 9; + google.protobuf.Timestamp v_date = 11; slack.proto.astra.schema.SchemaFieldType fieldType = 10; } diff --git a/astra/src/test/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParserTest.java b/astra/src/test/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParserTest.java index c6502c3851..73569a9489 100644 --- a/astra/src/test/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParserTest.java +++ b/astra/src/test/java/com/slack/astra/bulkIngestApi/opensearch/BulkApiRequestParserTest.java @@ -272,7 +272,8 @@ public void testTraceSpanGeneratedTimestamp() throws IOException { public void testTimestampParsingFromIngestDocument() { IngestDocument ingestDocument = new IngestDocument("index", "1", "routing", 1L, VersionType.INTERNAL, Map.of()); - long timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + long timeInMillis = + BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata()); Instant ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); // this tests that the parser inserted a timestamp close to the current time @@ -287,7 +288,8 @@ public void testTimestampParsingFromIngestDocument() { ingestDocument = new IngestDocument( "index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("@timestamp", ts)); - timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + timeInMillis = + BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata()); assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli()); // we put a long in the @timestamp field, which today we don't parse @@ -300,7 +302,8 @@ public void testTimestampParsingFromIngestDocument() { 1L, VersionType.INTERNAL, Map.of("@timestamp", providedTimeStamp.toEpochMilli())); - timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + timeInMillis = + BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata()); ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); @@ -310,7 +313,8 @@ public void testTimestampParsingFromIngestDocument() { ingestDocument = new IngestDocument( "index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("_timestamp", ts)); - timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + timeInMillis = + BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata()); ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); @@ -320,7 +324,8 @@ public void testTimestampParsingFromIngestDocument() { ingestDocument = new IngestDocument( "index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("timestamp", ts)); - timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + timeInMillis = + BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata()); ingestDocumentTime = Instant.ofEpochMilli(timeInMillis); assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue(); assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue(); @@ -333,7 +338,8 @@ public void testTimestampParsingFromIngestDocument() { 1L, VersionType.INTERNAL, Map.of("timestamp", providedTimeStamp.toEpochMilli())); - timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + timeInMillis = + BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata()); assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli()); ingestDocument = @@ -344,7 +350,8 @@ public void testTimestampParsingFromIngestDocument() { 1L, VersionType.INTERNAL, Map.of("_timestamp", providedTimeStamp.toEpochMilli())); - timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument); + timeInMillis = + BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata()); assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli()); } } diff --git a/astra/src/test/java/com/slack/astra/schema/SpanFormatterWithSchemaTest.java b/astra/src/test/java/com/slack/astra/schema/SpanFormatterWithSchemaTest.java index f410c3744a..3d45721d22 100644 --- a/astra/src/test/java/com/slack/astra/schema/SpanFormatterWithSchemaTest.java +++ b/astra/src/test/java/com/slack/astra/schema/SpanFormatterWithSchemaTest.java @@ -11,6 +11,7 @@ import com.google.common.io.Files; import com.google.protobuf.ByteString; +import com.google.protobuf.Timestamp; import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.astra.logstore.LogStore; import com.slack.astra.logstore.LuceneIndexStoreConfig; @@ -228,8 +229,25 @@ public void testSimpleSchema() { assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.IP); String myTimestamp = "2021-01-01T00:00:00Z"; + Instant myDateInstant = Instant.parse(myTimestamp); + Timestamp myDateTimestamp = + Timestamp.newBuilder() + .setSeconds(myDateInstant.getEpochSecond()) + .setNanos(myDateInstant.getNano()) + .build(); + kv = SpanFormatter.convertKVtoProto("myTimestamp", myTimestamp, schema).get(0); + assertThat(kv.getVDate()).isEqualTo(myDateTimestamp); + + myTimestamp = "2021-01-01T00:00:00Z"; + myDateInstant = Instant.parse(myTimestamp); + myDateTimestamp = + Timestamp.newBuilder() + .setSeconds(myDateInstant.getEpochSecond()) + .setNanos(myDateInstant.getNano()) + .build(); kv = SpanFormatter.convertKVtoProto("myTimestamp", myTimestamp, schema).get(0); - assertThat(kv.getVInt64()).isEqualTo(Instant.parse(myTimestamp).toEpochMilli()); + assertThat(kv.getVDate()).isEqualTo(myDateTimestamp); + assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DATE); kv = SpanFormatter.convertKVtoProto("success", "true", schema).get(0); @@ -451,8 +469,13 @@ public void testTraceProtoToLuceneDocumentTest() throws Exception { assertThat(tags.get("ip").getVStr()).isEqualTo("192.168.1.1"); assertThat(tags.get("count").getVInt64()).isEqualTo(3); assertThat(tags.get("count_short").getVInt32()).isEqualTo(10); - assertThat(tags.get("my_date").getVInt64()) - .isEqualTo(Instant.parse("2014-09-01T12:00:00Z").toEpochMilli()); + Instant myDateInstant = Instant.parse("2014-09-01T12:00:00Z"); + Timestamp myDateTimestamp = + Timestamp.newBuilder() + .setSeconds(myDateInstant.getEpochSecond()) + .setNanos(myDateInstant.getNano()) + .build(); + assertThat(tags.get("my_date").getVDate()).isEqualTo(myDateTimestamp); assertThat(tags.get("bucket").getVInt32()).isEqualTo(20); assertThat(tags.get("success").getVBool()).isEqualTo(true); assertThat(tags.get("count_scaled_long").getVInt64()).isEqualTo(80); diff --git a/astra/src/test/java/com/slack/astra/writer/LogMessageWriterImplTest.java b/astra/src/test/java/com/slack/astra/writer/LogMessageWriterImplTest.java index ffd2dee307..a7b312a69a 100644 --- a/astra/src/test/java/com/slack/astra/writer/LogMessageWriterImplTest.java +++ b/astra/src/test/java/com/slack/astra/writer/LogMessageWriterImplTest.java @@ -17,6 +17,7 @@ import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser; import com.slack.astra.chunkManager.IndexingChunkManager; import com.slack.astra.logstore.LogMessage; +import com.slack.astra.logstore.schema.ReservedFields; import com.slack.astra.logstore.search.SearchQuery; import com.slack.astra.logstore.search.SearchResult; import com.slack.astra.logstore.search.aggregations.DateHistogramAggBuilder; @@ -405,6 +406,52 @@ public void indexAndSearchAllFieldTypes() throws IOException { assertThat(results.hits.size()).isEqualTo(1); } + @Test + public void testReservedFields() throws IOException { + Schema.IngestSchema schema = Schema.IngestSchema.newBuilder().build(); + schema = ReservedFields.addPredefinedFields(schema); + + String request = + """ + { "index" : { "_index" : "test", "_id" : "1" } } + { "@timestamp" : "2014-09-01T12:00:10Z"} + { "index" : { "_index" : "test", "_id" : "2" } } + { "@timestamp" : "2014-09-01T12:10:10Z"} + """; + List indexRequests = + BulkApiRequestParser.parseBulkRequest(request.getBytes(StandardCharsets.UTF_8)); + assertThat(indexRequests.size()).isEqualTo(2); + + for (IndexRequest indexRequest : indexRequests) { + IngestDocument ingestDocument = convertRequestToDocument(indexRequest); + Trace.Span span = BulkApiRequestParser.fromIngestDocument(ingestDocument, schema); + ConsumerRecord spanRecord = consumerRecordWithValue(span.toByteArray()); + LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager); + assertThat(messageWriter.insertRecord(spanRecord)).isTrue(); + } + + assertThat(getCount(MESSAGES_RECEIVED_COUNTER, metricsRegistry)).isEqualTo(2); + assertThat(getCount(MESSAGES_FAILED_COUNTER, metricsRegistry)).isEqualTo(0); + chunkManagerUtil.chunkManager.getActiveChunk().commit(); + + SearchResult results = searchChunkManager("test", "_id:1"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "_id:2"); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "@timestamp:\"2014-09-01T12:00:10Z\""); + assertThat(results.hits.size()).isEqualTo(1); + + results = searchChunkManager("test", "@timestamp:\"2014-09-01T12:10:10Z\""); + assertThat(results.hits.size()).isEqualTo(1); + + results = + searchChunkManager( + "test", "@timestamp:[\"2014-09-01T12:00:10Z\" TO \"2014-09-01T12:10:10Z\"]"); + assertThat(results.hits.size()).isEqualTo(2); + } + @Test public void testNullTraceSpan() throws IOException { LogMessageWriterImpl messageWriter = new LogMessageWriterImpl(chunkManagerUtil.chunkManager);