From d89c3fae636fd8cba267f67818b5841a7a6ee4b9 Mon Sep 17 00:00:00 2001 From: Damien de Lemeny Date: Wed, 1 May 2024 18:47:16 -0500 Subject: [PATCH] Improve timestamp infos and DS init --- pkg/quickwit/client/client.go | 1 + pkg/quickwit/quickwit.go | 62 +++++-- pkg/quickwit/timestamp_infos.go | 126 +++++--------- pkg/quickwit/timestamp_infos_test.go | 236 +++++++++++++++------------ 4 files changed, 223 insertions(+), 202 deletions(-) diff --git a/pkg/quickwit/client/client.go b/pkg/quickwit/client/client.go index c11fb19..3ab42cb 100644 --- a/pkg/quickwit/client/client.go +++ b/pkg/quickwit/client/client.go @@ -23,6 +23,7 @@ type DatasourceInfo struct { Database string ConfiguredFields ConfiguredFields MaxConcurrentShardRequests int64 + IsReady bool } type ConfiguredFields struct { diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index b13d3b0..eb89a1a 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -57,9 +57,6 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc return nil, err } - timeField, toOk := jsonData["timeField"].(string) - timeOutputFormat, tofOk := jsonData["timeOutputFormat"].(string) - logLevelField, ok := jsonData["logLevelField"].(string) if !ok { logLevelField = "" @@ -74,6 +71,7 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc if !ok { index = "" } + // XXX : Legacy check, should not happen ? if index == "" { index = settings.Database } @@ -92,18 +90,11 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc maxConcurrentShardRequests = 256 } - if !toOk || !tofOk { - timeField, timeOutputFormat, err = GetTimestampFieldInfos(index, settings.URL, httpCli) - if nil != err { - return nil, err - } - } - configuredFields := es.ConfiguredFields{ - TimeField: timeField, - TimeOutputFormat: timeOutputFormat, LogLevelField: logLevelField, LogMessageField: logMessageField, + TimeField: "", + TimeOutputFormat: "", } model := es.DatasourceInfo{ @@ -113,10 +104,40 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc Database: index, MaxConcurrentShardRequests: int64(maxConcurrentShardRequests), ConfiguredFields: configuredFields, + IsReady: false, } return &QuickwitDatasource{dsInfo: model}, nil } +// Network dependent datasource initialization. +// This is not done in the "constructor" function to allow saving the ds +// even if the server is not responsive. +func (ds *QuickwitDatasource) initDatasource(force bool) error { + if ds.dsInfo.IsReady && !force { + return nil + } + + indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient) + if err != nil { + return fmt.Errorf("failed to get index metadata : %w", err) + } + + if len(indexMetadataList) == 0 { + return fmt.Errorf("no index found for %s", ds.dsInfo.Database) + } + + timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList) + if nil != err { + return err + } + + ds.dsInfo.ConfiguredFields.TimeField = timeField + ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat + + ds.dsInfo.IsReady = true + return nil +} + // Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance // created. As soon as datasource settings change detected by SDK old datasource instance will // be disposed and a new one will be created using NewSampleDatasource factory function. @@ -132,12 +153,29 @@ func (ds *QuickwitDatasource) Dispose() { func (ds *QuickwitDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { res := &backend.CheckHealthResult{} + if err := ds.initDatasource(true); err != nil { + res.Status = backend.HealthStatusError + res.Message = fmt.Errorf("Failed to initialize datasource: %w", err).Error() + return res, nil + } + + if ds.dsInfo.ConfiguredFields.TimeField == "" || ds.dsInfo.ConfiguredFields.TimeOutputFormat == "" { + res.Status = backend.HealthStatusError + res.Message = fmt.Sprintf("timefield is missing from index config \"%s\"", ds.dsInfo.Database) + return res, nil + } + res.Status = backend.HealthStatusOk res.Message = "plugin is running" return res, nil } func (ds *QuickwitDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + // Ensure ds is initialized, we need timestamp infos + if err := ds.initDatasource(false); err != nil { + return &backend.QueryDataResponse{}, fmt.Errorf("Failed to initialize datasource") + } + return queryData(ctx, req.Queries, &ds.dsInfo) } diff --git a/pkg/quickwit/timestamp_infos.go b/pkg/quickwit/timestamp_infos.go index 4237076..f30d0c9 100644 --- a/pkg/quickwit/timestamp_infos.go +++ b/pkg/quickwit/timestamp_infos.go @@ -6,11 +6,11 @@ import ( "fmt" "io" "net/http" - "strings" ) type QuickwitIndexMetadata struct { IndexConfig struct { + IndexID string `json:"index_id"` DocMapping struct { TimestampField string `json:"timestamp_field"` FieldMappings []FieldMappings `json:"field_mappings"` @@ -23,6 +23,7 @@ type QuickwitCreationErrorPayload struct { StatusCode int `json:"status"` } +// TODO: Revamp error handling func NewErrorCreationPayload(statusCode int, message string) error { var payload QuickwitCreationErrorPayload payload.Message = message @@ -35,124 +36,73 @@ func NewErrorCreationPayload(statusCode int, message string) error { return errors.New(string(json)) } -// TODO: refactor either by using a timestamp alias suppprted by quickwit -// or by only using the `GetTimestampFieldFromIndexPattern` once the endpoint -// /indexes?index_id_pattern= is supported, which is after the next quickwit release > 0.7.1 -func GetTimestampFieldInfos(index string, qwickwitUrl string, cli *http.Client) (string, string, error) { - if strings.Contains(index, "*") || strings.Contains(index, ",") { - return GetTimestampFieldFromIndexPattern(index, qwickwitUrl, cli) +func FilterErrorResponses(r *http.Response) (*http.Response, error) { + if r.StatusCode < 200 || r.StatusCode >= 400 { + body, err := io.ReadAll(r.Body) + if err != nil { + return nil, NewErrorCreationPayload(r.StatusCode, fmt.Errorf("failed to read error body: err = %w", err).Error()) + } + return nil, NewErrorCreationPayload(r.StatusCode, fmt.Sprintf("error = %s", (body))) } - return GetTimestampFieldFromIndex(index, qwickwitUrl, cli) + return r, nil } -func GetTimestampFieldFromIndex(index string, qwickwitUrl string, cli *http.Client) (string, string, error) { - mappingEndpointUrl := qwickwitUrl + "/indexes/" + index - qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) - r, err := cli.Get(mappingEndpointUrl) - if err != nil { - errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) - qwlog.Error(errMsg) - return "", "", err +func GetTimestampFieldInfos(indexMetadataList []QuickwitIndexMetadata) (string, string, error) { + if len(indexMetadataList) == 0 { + return "", "", fmt.Errorf("index metadata list is empty") } - defer r.Body.Close() - - statusCode := r.StatusCode - if statusCode < 200 || statusCode >= 400 { - errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) - qwlog.Error(errMsg) - return "", "", NewErrorCreationPayload(statusCode, errMsg) + refTimestampFieldName, refTimestampOutputFormat := FindTimestampFieldInfos(indexMetadataList[0]) + if refTimestampFieldName == "" || refTimestampOutputFormat == "" { + return "", "", fmt.Errorf("Invalid timestamp field infos for %s: %s, %s", indexMetadataList[0].IndexConfig.IndexID, refTimestampFieldName, refTimestampOutputFormat) } - body, err := io.ReadAll(r.Body) - if err != nil { - errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) - qwlog.Error(errMsg) - return "", "", NewErrorCreationPayload(statusCode, errMsg) + for _, indexMetadata := range indexMetadataList[1:] { + timestampFieldName, timestampOutputFormat := FindTimestampFieldInfos(indexMetadata) + + if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat { + return "", "", fmt.Errorf("Indexes matching pattern have incompatible timestamp fields, found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat) + } } - return DecodeTimestampFieldFromIndexConfig(body) + return refTimestampFieldName, refTimestampOutputFormat, nil } -func GetTimestampFieldFromIndexPattern(indexPattern string, qwickwitUrl string, cli *http.Client) (string, string, error) { +func GetIndexesMetadata(indexPattern string, qwickwitUrl string, cli *http.Client) ([]QuickwitIndexMetadata, error) { mappingEndpointUrl := qwickwitUrl + "/indexes?index_id_patterns=" + indexPattern qwlog.Debug("Calling quickwit endpoint: " + mappingEndpointUrl) r, err := cli.Get(mappingEndpointUrl) if err != nil { - errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) - qwlog.Error(errMsg) - return "", "", err + return nil, fmt.Errorf("Error when calling url = %s: %w", mappingEndpointUrl, err) } defer r.Body.Close() - statusCode := r.StatusCode - - if statusCode < 200 || statusCode >= 400 { - errMsg := fmt.Sprintf("Error when calling url = %s", mappingEndpointUrl) - qwlog.Error(errMsg) - return "", "", NewErrorCreationPayload(statusCode, errMsg) + r, err = FilterErrorResponses(r) + if err != nil { + return nil, fmt.Errorf("API returned invalid response: %w", err) } body, err := io.ReadAll(r.Body) if err != nil { - errMsg := fmt.Sprintf("Error when calling url = %s: err = %s", mappingEndpointUrl, err.Error()) - qwlog.Error(errMsg) - return "", "", NewErrorCreationPayload(statusCode, errMsg) + return nil, fmt.Errorf("failed to read response body: %w", err) } - return DecodeTimestampFieldFromIndexConfigs(body) -} - -func DecodeTimestampFieldFromIndexConfigs(body []byte) (string, string, error) { var payload []QuickwitIndexMetadata - err := json.Unmarshal(body, &payload) + err = json.Unmarshal(body, &payload) if err != nil { - errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) - qwlog.Error(errMsg) - return "", "", NewErrorCreationPayload(500, errMsg) - } - - var refTimestampFieldName string = "" - var refTimestampOutputFormat string = "" - var timestampFieldName string = "" - var timestampOutputFormat string = "" - - for _, indexMetadata := range payload { - timestampFieldName = indexMetadata.IndexConfig.DocMapping.TimestampField - timestampOutputFormat, _ = FindTimeStampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings) - - if refTimestampFieldName == "" { - refTimestampFieldName = timestampFieldName - refTimestampOutputFormat = timestampOutputFormat - continue - } - - if timestampFieldName != refTimestampFieldName || timestampOutputFormat != refTimestampOutputFormat { - errMsg := fmt.Sprintf("Index matching the pattern should have the same timestamp fields, two found: %s (%s) and %s (%s)", refTimestampFieldName, refTimestampOutputFormat, timestampFieldName, timestampOutputFormat) - qwlog.Error(errMsg) - return "", "", NewErrorCreationPayload(400, errMsg) - } + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) } - qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s, timestamptOutputFormat = %s", timestampFieldName, timestampOutputFormat)) - return timestampFieldName, timestampOutputFormat, nil + return payload, nil } -func DecodeTimestampFieldFromIndexConfig(body []byte) (string, string, error) { - var payload QuickwitIndexMetadata - err := json.Unmarshal(body, &payload) - if err != nil { - errMsg := fmt.Sprintf("Unmarshalling body error: err = %s, body = %s", err.Error(), (body)) - qwlog.Error(errMsg) - return "", "", NewErrorCreationPayload(500, errMsg) - } - timestampFieldName := payload.IndexConfig.DocMapping.TimestampField - timestampFieldFormat, _ := FindTimeStampFormat(timestampFieldName, nil, payload.IndexConfig.DocMapping.FieldMappings) - qwlog.Debug(fmt.Sprintf("Found timestampFieldName = %s", timestampFieldName)) - return timestampFieldName, timestampFieldFormat, nil +func FindTimestampFieldInfos(indexMetadata QuickwitIndexMetadata) (string, string) { + timestampFieldName := indexMetadata.IndexConfig.DocMapping.TimestampField + timestampOutputFormat, _ := FindTimestampFormat(timestampFieldName, nil, indexMetadata.IndexConfig.DocMapping.FieldMappings) + return timestampFieldName, timestampOutputFormat } -func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) { +func FindTimestampFormat(timestampFieldName string, parentName *string, fieldMappings []FieldMappings) (string, bool) { if nil == fieldMappings { return "", false } @@ -166,7 +116,7 @@ func FindTimeStampFormat(timestampFieldName string, parentName *string, fieldMap if field.Type == "datetime" && fieldName == timestampFieldName && nil != field.OutputFormat { return *field.OutputFormat, true } else if field.Type == "object" && nil != field.FieldMappings { - return FindTimeStampFormat(timestampFieldName, &field.Name, field.FieldMappings) + return FindTimestampFormat(timestampFieldName, &field.Name, field.FieldMappings) } } diff --git a/pkg/quickwit/timestamp_infos_test.go b/pkg/quickwit/timestamp_infos_test.go index 379b95b..b2db076 100644 --- a/pkg/quickwit/timestamp_infos_test.go +++ b/pkg/quickwit/timestamp_infos_test.go @@ -1,127 +1,159 @@ package quickwit import ( + "encoding/json" + "fmt" "testing" "github.com/stretchr/testify/require" ) func TestDecodeTimestampFieldInfos(t *testing.T) { - t.Run("Test decode timestamp field infos", func(t *testing.T) { - t.Run("Test decode simple fields", func(t *testing.T) { - // Given - query := []byte(` - { - "version": "0.6", - "index_config": { - "version": "0.6", - "doc_mapping": { - "timestamp_field": "timestamp", - "mode": "dynamic", - "tokenizers": [], - "field_mappings": [ - { - "name": "foo", - "type": "text", - "fast": false, - "fieldnorms": false, - "indexed": true, - "record": "basic", - "stored": true, - "tokenizer": "default" - }, - { - "name": "timestamp", - "type": "datetime", - "fast": true, - "fast_precision": "seconds", - "indexed": true, - "input_formats": [ - "rfc3339", - "unix_timestamp" - ], - "output_format": "rfc3339", - "stored": true - } - ] - }, - "retention": null - }, - "sources": [] - } - `) + t.Run("Test decode simple fields", func(t *testing.T) { + // Given + query := []byte(` + [{ + "version": "0.6", + "index_config": { + "version": "0.6", + "doc_mapping": { + "timestamp_field": "timestamp", + "mode": "dynamic", + "tokenizers": [], + "field_mappings": [ + { + "name": "foo", + "type": "text", + "fast": false, + "fieldnorms": false, + "indexed": true, + "record": "basic", + "stored": true, + "tokenizer": "default" + }, + { + "name": "timestamp", + "type": "datetime", + "fast": true, + "fast_precision": "seconds", + "indexed": true, + "input_formats": [ + "rfc3339", + "unix_timestamp" + ], + "output_format": "rfc3339", + "stored": true + } + ] + }, + "retention": null + }, + "sources": [] + }] + `) - // When - timestampFieldName, timestampOutputFormat, err := DecodeTimestampFieldFromIndexConfig(query) + // When + var payload []QuickwitIndexMetadata + err := json.Unmarshal(query, &payload) + timestampFieldName, timestampOutputFormat, err := GetTimestampFieldInfos(payload) - // Then - require.NoError(t, err) - require.Equal(t, timestampFieldName, "timestamp") - require.Equal(t, timestampOutputFormat, "rfc3339") - }) + // Then + require.NoError(t, err) + require.Equal(t, timestampFieldName, "timestamp") + require.Equal(t, timestampOutputFormat, "rfc3339") + }) - t.Run("Test decode from list of index config", func(t *testing.T) { - // Given - query := []byte(` - [ - { - "version": "0.6", - "index_config": { - "doc_mapping": { - "timestamp_field": "sub.timestamp" - }, - "indexing_settings": {}, - "retention": null - }, - "sources": [] - } - ] - `) + t.Run("Test decode from list of index config", func(t *testing.T) { + // Given + query := []byte(` + [ + { + "version": "0.6", + "index_config": { + "doc_mapping": { + "timestamp_field": "timestamp", + "field_mappings": [ + { + "name": "timestamp", + "type": "datetime", + "output_format": "rfc3339" + } + ] + }, + "indexing_settings": {}, + "retention": null + }, + "sources": [] + } + ] + `) - // When - timestampFieldName, _, err := DecodeTimestampFieldFromIndexConfigs(query) + // When + var payload []QuickwitIndexMetadata + err := json.Unmarshal(query, &payload) + require.NoError(t, err) + qwlog.Debug(fmt.Sprint(payload)) + timestampFieldName, _, err := GetTimestampFieldInfos(payload) + // timestampFieldName, _, err := DecodeTimestampFieldFromIndexConfigs(query) - // Then - require.NoError(t, err) - require.Equal(t, timestampFieldName, "sub.timestamp") - }) + // Then + require.NoError(t, err) + require.Equal(t, timestampFieldName, "timestamp") + }) - t.Run("Test decode from list of index config with different timestamp fields return an error", func(t *testing.T) { - // Given - query := []byte(` - [ - { - "version": "0.6", - "index_config": { + t.Run("Test decode from list of index config with different timestamp fields return an error", func(t *testing.T) { + // Given + query := []byte(` + [ + { + "version": "0.6", + "index_config": { + "doc_mapping": { + "timestamp_field": "timestamp", + "field_mappings": [ + { + "name": "timestamp", + "type": "datetime", + "output_format": "rfc3339" + } + ] + }, + "indexing_settings": {}, + "retention": null + }, + "sources": [] + }, + { + "version": "0.6", + "index_config": { "doc_mapping": { - "timestamp_field": "sub.timestamp" + "timestamp_field": "timestamp2", + "field_mappings": [ + { + "name": "timestamp2", + "type": "datetime", + "output_format": "rfc3339" + } + ] }, "indexing_settings": {}, "retention": null - }, - "sources": [] }, - { - "version": "0.6", - "index_config": { - "doc_mapping": { - "timestamp_field": "sub.timestamp2" - }, - "indexing_settings": {}, - "retention": null - }, - "sources": [] - } - ] - `) + "sources": [] + } + ] + `) - // When - _, _, err := DecodeTimestampFieldFromIndexConfigs(query) + // When + // _, _, err := DecodeTimestampFieldFromIndexConfigs(query) + var payload []QuickwitIndexMetadata + err := json.Unmarshal(query, &payload) + require.NoError(t, err) + _, _, err = GetTimestampFieldInfos(payload) - // Then - require.Error(t, err) - require.ErrorContains(t, err, "Index matching the pattern should have the same timestamp fields") - }) + // Then + require.Error(t, err) + require.ErrorContains(t, err, "Indexes matching pattern have incompatible timestamp fields") }) }