Skip to content

Commit

Permalink
Add true date field support and also index "@timestamp" as a date (#909)
Browse files Browse the repository at this point in the history
* index an additioanl @timestamp field

* cleaner

* more changes
  • Loading branch information
vthacker authored May 2, 2024
1 parent 608d45e commit 348a088
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.logstore.schema.ReservedFields;
import com.slack.astra.proto.schema.Schema;
import com.slack.astra.writer.SpanFormatter;
import com.slack.service.murron.trace.Trace;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -49,47 +46,40 @@ public static Map<String, List<Trace.Span>> parseRequest(
* in millis 2. Check if a field called `@timestamp` exists and parse that as a date (since
* logstash sets that) 3. Use the current time from the ingestMetadata
*/
public static long getTimestampFromIngestDocument(IngestDocument ingestDocument) {
public static long getTimestampFromIngestDocument(Map<String, Object> sourceAndMetadata) {

try {
if (ingestDocument.hasField("@timestamp")) {
String dateString = ingestDocument.getFieldValue("@timestamp", String.class);
LocalDateTime localDateTime =
LocalDateTime.parse(dateString, DateTimeFormatter.ISO_DATE_TIME);
Instant instant = localDateTime.toInstant(ZoneOffset.UTC);
if (sourceAndMetadata.containsKey(ReservedFields.TIMESTAMP)) {
String dateString = (String) sourceAndMetadata.get(ReservedFields.TIMESTAMP);
Instant instant = Instant.parse(dateString);
return instant.toEpochMilli();
}

// assumption that the provided timestamp is in millis
// at some point both th unit and field need to be configurable
// when we do that, remember to change the called to appropriately remove the field
if (ingestDocument.hasField("timestamp")) {
return ingestDocument.getFieldValue("timestamp", Long.class);
// at some point both the unit and field need to be configurable
if (sourceAndMetadata.containsKey("timestamp")) {
return (long) sourceAndMetadata.get("timestamp");
}

if (ingestDocument.hasField("_timestamp")) {
return ingestDocument.getFieldValue("_timestamp", Long.class);
if (sourceAndMetadata.containsKey("_timestamp")) {
return (long) sourceAndMetadata.get("_timestamp");
}
} catch (Exception e) {
LOG.warn(
"Unable to parse timestamp from ingest document. Using current time as timestamp", e);
}

return ((ZonedDateTime)
ingestDocument
.getIngestMetadata()
.getOrDefault("timestamp", ZonedDateTime.now(ZoneOffset.UTC)))
.toInstant()
.toEpochMilli();
// We tried parsing 3 timestamp fields and failed. Use the current time
return Instant.now().toEpochMilli();
}

@VisibleForTesting
public static Trace.Span fromIngestDocument(
IngestDocument ingestDocument, Schema.IngestSchema schema) {

long timestampInMillis = getTimestampFromIngestDocument(ingestDocument);

Map<String, Object> sourceAndMetadata = ingestDocument.getSourceAndMetadata();

long timestampInMillis = getTimestampFromIngestDocument(sourceAndMetadata);
String id = (String) sourceAndMetadata.get(IngestDocument.Metadata.ID.getFieldName());
// See https://blog.mikemccandless.com/2014/05/choosing-fast-unique-identifier-uuid.html on how
// to improve this
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.slack.astra.logstore.schema;

import com.slack.astra.proto.schema.Schema;

public class ReservedFields {

public static final String TIMESTAMP = "@timestamp";

public static Schema.IngestSchema addPredefinedFields(Schema.IngestSchema currentSchema) {
Schema.SchemaField timestampField =
Schema.SchemaField.newBuilder().setType(Schema.SchemaFieldType.DATE).build();
return Schema.IngestSchema.newBuilder()
.putAllFields(currentSchema.getFieldsMap())
.putFields(TIMESTAMP, timestampField)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,10 @@ public Document fromMessage(Trace.Span message) throws JsonProcessingException {
addField(doc, keyValue.getKey(), keyValue.getVStr(), Schema.SchemaFieldType.IP, "", 0);
jsonMap.put(keyValue.getKey(), keyValue.getVStr());
} else if (schemaFieldType == Schema.SchemaFieldType.DATE) {
addField(doc, keyValue.getKey(), keyValue.getVInt64(), Schema.SchemaFieldType.DATE, "", 0);
jsonMap.put(keyValue.getKey(), keyValue.getVInt64());
Instant instant =
Instant.ofEpochSecond(keyValue.getVDate().getSeconds(), keyValue.getVDate().getNanos());
addField(doc, keyValue.getKey(), instant, Schema.SchemaFieldType.DATE, "", 0);
jsonMap.put(keyValue.getKey(), instant.toString());
} else if (schemaFieldType == Schema.SchemaFieldType.BOOLEAN) {
addField(
doc, keyValue.getKey(), keyValue.getVBool(), Schema.SchemaFieldType.BOOLEAN, "", 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.protobuf.ByteString;
import com.slack.astra.proto.schema.Schema;
import java.net.InetAddress;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import org.apache.lucene.document.Document;
Expand Down Expand Up @@ -133,7 +134,21 @@ public Schema.SchemaFieldType toSchemaFieldType() {
DATE("date") {
@Override
public void addField(Document doc, String name, Object value, LuceneFieldDef fieldDef) {
LONG.addField(doc, name, value, fieldDef);
if (value instanceof Instant instant) {
long timeSinceEpoch = instant.toEpochMilli();
if (fieldDef.isIndexed) {
doc.add(new LongPoint(name, timeSinceEpoch));
}
if (fieldDef.isStored) {
doc.add(new StoredField(name, instant.toString()));
}
if (fieldDef.storeDocValue) {
doc.add(new NumericDocValuesField(name, timeSinceEpoch));
}
} else {
// back-compat
LONG.addField(doc, name, value, fieldDef);
}
}

@Override
Expand Down
2 changes: 2 additions & 0 deletions astra/src/main/java/com/slack/astra/server/Astra.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.slack.astra.clusterManager.SnapshotDeletionService;
import com.slack.astra.elasticsearchApi.ElasticsearchApiService;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.logstore.schema.ReservedFields;
import com.slack.astra.logstore.search.AstraDistributedQueryService;
import com.slack.astra.logstore.search.AstraLocalQueryService;
import com.slack.astra.metadata.cache.CacheSlotMetadataStore;
Expand Down Expand Up @@ -408,6 +409,7 @@ private static Set<Service> getServices(
} else {
LOG.info("No schema file provided, using default schema");
}
schema = ReservedFields.addPredefinedFields(schema);
BulkIngestApi openSearchBulkApiService =
new BulkIngestApi(
bulkIngestKafkaProducer,
Expand Down
25 changes: 25 additions & 0 deletions astra/src/main/java/com/slack/astra/writer/SpanFormatter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import com.slack.astra.logstore.LogMessage;
import com.slack.astra.proto.schema.Schema;
import com.slack.service.murron.Murron;
Expand All @@ -12,10 +13,14 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A utility class that converts a Span into a LogMessage, Json map to Span */
public class SpanFormatter {

private static final Logger LOG = LoggerFactory.getLogger(SpanFormatter.class);

public static final String DEFAULT_LOG_MESSAGE_TYPE = "INFO";
public static final String DEFAULT_INDEX_NAME = "unknown";

Expand Down Expand Up @@ -87,6 +92,23 @@ public static Trace.Span toSpan(
return spanBuilder.build();
}

public static Timestamp parseDate(String dateStr, Schema.SchemaFieldType type) {
Instant instant;
try {
// type will expose parsing params in the future
// for now we'll just use Instant.parse
instant = Instant.parse(dateStr);
} catch (Exception e) {
// easier to debug rather than to skip or put current value
LOG.warn("Failed to parse date: {}", dateStr, e);
instant = Instant.EPOCH;
}
return Timestamp.newBuilder()
.setSeconds(instant.getEpochSecond())
.setNanos(instant.getNano())
.build();
}

public static Trace.KeyValue makeTraceKV(String key, Object value, Schema.SchemaFieldType type) {
Trace.KeyValue.Builder tagBuilder = Trace.KeyValue.newBuilder();
tagBuilder.setKey(key);
Expand All @@ -106,6 +128,9 @@ public static Trace.KeyValue makeTraceKV(String key, Object value, Schema.Schema
}
case DATE -> {
tagBuilder.setFieldType(Schema.SchemaFieldType.DATE);
tagBuilder.setVDate(parseDate(value.toString(), type));
// setting both for backward compatibility while deploying preprocessor and indexer
// I however commented it while testing to make sure all tests use the new field
tagBuilder.setVInt64(Instant.parse(value.toString()).toEpochMilli());
}
case BOOLEAN -> {
Expand Down
3 changes: 3 additions & 0 deletions astra/src/main/proto/trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ option java_package = "com.slack.service.murron.trace";
option go_package = "com.slack/astra/gen/proto/tracepb";

import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

import "schema.proto";

message KeyValue {
Expand All @@ -19,6 +21,7 @@ message KeyValue {
bytes v_binary = 7;
int32 v_int32 = 8;
float v_float32 = 9;
google.protobuf.Timestamp v_date = 11;
slack.proto.astra.schema.SchemaFieldType fieldType = 10;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ public void testTraceSpanGeneratedTimestamp() throws IOException {
public void testTimestampParsingFromIngestDocument() {
IngestDocument ingestDocument =
new IngestDocument("index", "1", "routing", 1L, VersionType.INTERNAL, Map.of());
long timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
long timeInMillis =
BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata());
Instant ingestDocumentTime = Instant.ofEpochMilli(timeInMillis);

// this tests that the parser inserted a timestamp close to the current time
Expand All @@ -287,7 +288,8 @@ public void testTimestampParsingFromIngestDocument() {
ingestDocument =
new IngestDocument(
"index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("@timestamp", ts));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
timeInMillis =
BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata());
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());

// we put a long in the @timestamp field, which today we don't parse
Expand All @@ -300,7 +302,8 @@ public void testTimestampParsingFromIngestDocument() {
1L,
VersionType.INTERNAL,
Map.of("@timestamp", providedTimeStamp.toEpochMilli()));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
timeInMillis =
BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata());
ingestDocumentTime = Instant.ofEpochMilli(timeInMillis);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();
assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue();
Expand All @@ -310,7 +313,8 @@ public void testTimestampParsingFromIngestDocument() {
ingestDocument =
new IngestDocument(
"index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("_timestamp", ts));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
timeInMillis =
BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata());
ingestDocumentTime = Instant.ofEpochMilli(timeInMillis);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();
assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue();
Expand All @@ -320,7 +324,8 @@ public void testTimestampParsingFromIngestDocument() {
ingestDocument =
new IngestDocument(
"index", "1", "routing", 1L, VersionType.INTERNAL, Map.of("timestamp", ts));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
timeInMillis =
BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata());
ingestDocumentTime = Instant.ofEpochMilli(timeInMillis);
assertThat(oneMinuteBefore.isBefore(ingestDocumentTime)).isTrue();
assertThat(ingestDocumentTime.isBefore(oneMinuteAfter)).isTrue();
Expand All @@ -333,7 +338,8 @@ public void testTimestampParsingFromIngestDocument() {
1L,
VersionType.INTERNAL,
Map.of("timestamp", providedTimeStamp.toEpochMilli()));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
timeInMillis =
BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata());
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());

ingestDocument =
Expand All @@ -344,7 +350,8 @@ public void testTimestampParsingFromIngestDocument() {
1L,
VersionType.INTERNAL,
Map.of("_timestamp", providedTimeStamp.toEpochMilli()));
timeInMillis = BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument);
timeInMillis =
BulkApiRequestParser.getTimestampFromIngestDocument(ingestDocument.getSourceAndMetadata());
assertThat(timeInMillis).isEqualTo(providedTimeStamp.toEpochMilli());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import com.google.common.io.Files;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.slack.astra.bulkIngestApi.opensearch.BulkApiRequestParser;
import com.slack.astra.logstore.LogStore;
import com.slack.astra.logstore.LuceneIndexStoreConfig;
Expand Down Expand Up @@ -228,8 +229,25 @@ public void testSimpleSchema() {
assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.IP);

String myTimestamp = "2021-01-01T00:00:00Z";
Instant myDateInstant = Instant.parse(myTimestamp);
Timestamp myDateTimestamp =
Timestamp.newBuilder()
.setSeconds(myDateInstant.getEpochSecond())
.setNanos(myDateInstant.getNano())
.build();
kv = SpanFormatter.convertKVtoProto("myTimestamp", myTimestamp, schema).get(0);
assertThat(kv.getVDate()).isEqualTo(myDateTimestamp);

myTimestamp = "2021-01-01T00:00:00Z";
myDateInstant = Instant.parse(myTimestamp);
myDateTimestamp =
Timestamp.newBuilder()
.setSeconds(myDateInstant.getEpochSecond())
.setNanos(myDateInstant.getNano())
.build();
kv = SpanFormatter.convertKVtoProto("myTimestamp", myTimestamp, schema).get(0);
assertThat(kv.getVInt64()).isEqualTo(Instant.parse(myTimestamp).toEpochMilli());
assertThat(kv.getVDate()).isEqualTo(myDateTimestamp);

assertThat(kv.getFieldType()).isEqualTo(Schema.SchemaFieldType.DATE);

kv = SpanFormatter.convertKVtoProto("success", "true", schema).get(0);
Expand Down Expand Up @@ -451,8 +469,13 @@ public void testTraceProtoToLuceneDocumentTest() throws Exception {
assertThat(tags.get("ip").getVStr()).isEqualTo("192.168.1.1");
assertThat(tags.get("count").getVInt64()).isEqualTo(3);
assertThat(tags.get("count_short").getVInt32()).isEqualTo(10);
assertThat(tags.get("my_date").getVInt64())
.isEqualTo(Instant.parse("2014-09-01T12:00:00Z").toEpochMilli());
Instant myDateInstant = Instant.parse("2014-09-01T12:00:00Z");
Timestamp myDateTimestamp =
Timestamp.newBuilder()
.setSeconds(myDateInstant.getEpochSecond())
.setNanos(myDateInstant.getNano())
.build();
assertThat(tags.get("my_date").getVDate()).isEqualTo(myDateTimestamp);
assertThat(tags.get("bucket").getVInt32()).isEqualTo(20);
assertThat(tags.get("success").getVBool()).isEqualTo(true);
assertThat(tags.get("count_scaled_long").getVInt64()).isEqualTo(80);
Expand Down
Loading

0 comments on commit 348a088

Please sign in to comment.