Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert to using timestamp output format #110

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type DatasourceInfo struct {
}

type ConfiguredFields struct {
TimeField string
LogMessageField string
LogLevelField string
TimeField string
TimeOutputFormat string
LogMessageField string
LogLevelField string
}

// Client represents a client which can interact with elasticsearch api
Expand Down
28 changes: 16 additions & 12 deletions pkg/quickwit/error_handling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ func TestErrorAvgMissingField(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}

result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields)
Expand Down Expand Up @@ -71,9 +72,10 @@ func TestErrorAvgMissingFieldNoDetailedErrors(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}

result, err := queryDataTestWithResponseCode(query, 400, response, configuredFields)
Expand Down Expand Up @@ -117,9 +119,10 @@ func TestErrorTooManyDateHistogramBuckets(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, err := queryDataTestWithResponseCode(query, 200, response, configuredFields)
require.NoError(t, err)
Expand Down Expand Up @@ -154,9 +157,10 @@ func TestNonElasticError(t *testing.T) {
response := []byte(`Access to the database is forbidden`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}

result, err := queryDataTestWithResponseCode(query, 403, response, configuredFields)
Expand Down
10 changes: 10 additions & 0 deletions pkg/quickwit/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,13 @@ func describeMetric(metricType, field string) string {
}
return text + " " + field
}

const (
Iso8601 string = "iso8601"
Rfc2822 string = "rfc2822"
Rfc3339 string = "rfc3339"
TimestampSecs string = "unix_timestamp_secs"
TimestampMillis string = "unix_timestamp_millis"
TimestampMicros string = "unix_timestamp_micros"
TimestampNanos string = "unix_timestamp_nanos"
)
7 changes: 4 additions & 3 deletions pkg/quickwit/querydata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,10 @@ func queryDataTestWithResponseCode(queriesBytes []byte, responseStatusCode int,

func queryDataTest(queriesBytes []byte, responseBytes []byte) (queryDataTestResult, error) {
configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
return queryDataTestWithResponseCode(queriesBytes, 200, responseBytes, configuredFields)
}
12 changes: 7 additions & 5 deletions pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
}

timeField, toOk := jsonData["timeField"].(string)
timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string)

logLevelField, ok := jsonData["logLevelField"].(string)
if !ok {
Expand Down Expand Up @@ -91,17 +92,18 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
maxConcurrentShardRequests = 256
}

if !toOk {
timeField, err = GetTimestampField(index, settings.URL, httpCli)
if !toOk || !tofOk {
timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli)
if nil != err {
return nil, err
}
}

configuredFields := es.ConfiguredFields{
TimeField: timeField,
LogLevelField: logLevelField,
LogMessageField: logMessageField,
TimeField: timeField,
TimeOutputFormat: timeOutputFormat,
LogLevelField: logLevelField,
LogMessageField: logMessageField,
}

model := es.DatasourceInfo{
Expand Down
41 changes: 34 additions & 7 deletions pkg/quickwit/response_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"golang.org/x/exp/slices"

es "github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/client"
"github.com/quickwit-oss/quickwit-datasource/pkg/quickwit/simplejson"
Expand Down Expand Up @@ -246,7 +247,7 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str
if propName == configuredFields.TimeField {
timeVector := make([]*time.Time, size)
for i, doc := range docs {
timeValue, err := ParseToTime(doc["sort"].([]any)[0])
timeValue, err := ParseToTime(doc[configuredFields.TimeField], configuredFields.TimeOutputFormat)
if err != nil {
continue
}
Expand Down Expand Up @@ -293,13 +294,39 @@ func processDocsToDataFrameFields(docs []map[string]interface{}, propNames []str
// Parses a value into Time given a timeOutputFormat. The conversion
// only works with float64 as this is what we get when parsing a response.
// TODO: understand why we get a float64?
func ParseToTime(value interface{}) (time.Time, error) {
typed_value, ok := value.(float64)
if !ok {
return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format")
func ParseToTime(value interface{}, timeOutputFormat string) (time.Time, error) {

if timeOutputFormat == Iso8601 || timeOutputFormat == Rfc3339 {
value_string := value.(string)
timeValue, err := time.Parse(time.RFC3339, value_string)
if err != nil {
return time.Time{}, err
}
return timeValue, nil
} else if timeOutputFormat == Rfc2822 {
value_string := value.(string)
timeValue, err := time.Parse(time.RFC822Z, value_string)
if err != nil {
return time.Time{}, err
}
return timeValue, nil
} else if slices.Contains([]string{TimestampSecs, TimestampMillis, TimestampMicros, TimestampNanos}, timeOutputFormat) {
typed_value, ok := value.(float64)
if !ok {
return time.Time{}, errors.New("parse time only accepts float64 with timestamp based format")
}
int64_value := int64(typed_value)
if timeOutputFormat == TimestampSecs {
return time.Unix(int64_value, 0), nil
} else if timeOutputFormat == TimestampMillis {
return time.Unix(0, int64_value*1_000_000), nil
} else if timeOutputFormat == TimestampMicros {
return time.Unix(0, int64_value*1_000), nil
} else if timeOutputFormat == TimestampNanos {
return time.Unix(0, int64_value), nil
}
}
int64_value := int64(typed_value)
return time.Unix(0, int64_value), nil
return time.Time{}, fmt.Errorf("timeOutputFormat not supported yet %s", timeOutputFormat)
}

func processBuckets(aggs map[string]interface{}, target *Query,
Expand Down
30 changes: 17 additions & 13 deletions pkg/quickwit/response_parser_qw_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampNanos,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand Down Expand Up @@ -125,9 +126,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampMicros,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand Down Expand Up @@ -191,9 +193,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampMillis,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand Down Expand Up @@ -257,9 +260,10 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
`)

configuredFields := es.ConfiguredFields{
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: TimestampSecs,
TimeField: "testtime",
LogMessageField: "line",
LogLevelField: "lvl",
}
result, _ := queryDataTestWithResponseCode(query, 200, response, configuredFields)
frames := result.response.Responses["A"].Frames
Expand All @@ -278,7 +282,7 @@ func TestProcessLogsResponseWithDifferentTimeOutputFormat(t *testing.T) {
func TestConvertToTime(t *testing.T) {
t.Run("Test parse unix timestamps nanosecs of float type", func(t *testing.T) {
inputValue := interface{}(1234567890000000000.0)
value, _ := ParseToTime(inputValue)
value, _ := ParseToTime(inputValue, "unix_timestamp_nanos")
require.Equal(t, time.Unix(1234567890, 0), value)
})
}
7 changes: 4 additions & 3 deletions pkg/quickwit/response_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3228,9 +3228,10 @@ func parseTestResponse(tsdbQueries map[string]string, responseBody string) (*bac
from := time.Date(2018, 5, 15, 17, 50, 0, 0, time.UTC)
to := time.Date(2018, 5, 15, 17, 55, 0, 0, time.UTC)
configuredFields := es.ConfiguredFields{
TimeField: "@timestamp",
LogMessageField: "line",
LogLevelField: "lvl",
TimeOutputFormat: Rfc3339,
TimeField: "@timestamp",
LogMessageField: "line",
LogLevelField: "lvl",
}
timeRange := backend.TimeRange{
From: from,
Expand Down
Loading
Loading