diff --git a/pkg/quickwit/client/client.go b/pkg/quickwit/client/client.go index 3ab42cb..790155c 100644 --- a/pkg/quickwit/client/client.go +++ b/pkg/quickwit/client/client.go @@ -16,6 +16,11 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend/log" ) +type ReadyStatus struct { + IsReady bool + Err error +} + type DatasourceInfo struct { ID int64 HTTPClient *http.Client @@ -23,7 +28,8 @@ type DatasourceInfo struct { Database string ConfiguredFields ConfiguredFields MaxConcurrentShardRequests int64 - IsReady bool + ReadyStatus chan ReadyStatus + ShouldInit bool } type ConfiguredFields struct { diff --git a/pkg/quickwit/quickwit.go b/pkg/quickwit/quickwit.go index eb89a1a..09249ae 100644 --- a/pkg/quickwit/quickwit.go +++ b/pkg/quickwit/quickwit.go @@ -104,46 +104,62 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc Database: index, MaxConcurrentShardRequests: int64(maxConcurrentShardRequests), ConfiguredFields: configuredFields, - IsReady: false, - } - return &QuickwitDatasource{dsInfo: model}, nil -} - -// Network dependent datasource initialization. -// This is not done in the "constructor" function to allow saving the ds -// even if the server is not responsive. -func (ds *QuickwitDatasource) initDatasource(force bool) error { - if ds.dsInfo.IsReady && !force { - return nil - } - - indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient) - if err != nil { - return fmt.Errorf("failed to get index metadata : %w", err) - } - - if len(indexMetadataList) == 0 { - return fmt.Errorf("no index found for %s", ds.dsInfo.Database) - } - - timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList) - if nil != err { - return err + ReadyStatus: make(chan es.ReadyStatus, 1), + ShouldInit: true, } - ds.dsInfo.ConfiguredFields.TimeField = timeField - ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat + ds := &QuickwitDatasource{dsInfo: model} - ds.dsInfo.IsReady = true - return nil + // Create an initialization goroutine + go func(ds *QuickwitDatasource, readyStatus chan<- es.ReadyStatus) { + var status es.ReadyStatus = es.ReadyStatus{ + IsReady: false, + Err: nil, + } + for { + // Will retry init everytime the channel is consumed until ready + if !status.IsReady || ds.dsInfo.ShouldInit { + qwlog.Debug("Initializing Datasource") + status.IsReady = true + status.Err = nil + + indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient) + if err != nil { + status.IsReady = false + status.Err = fmt.Errorf("failed to get index metadata : %w", err) + } else if len(indexMetadataList) == 0 { + status.IsReady = false + status.Err = fmt.Errorf("no index found for %s", ds.dsInfo.Database) + } else { + timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList) + if nil != err { + status.IsReady = false + status.Err = err + } else if "" == timeField { + status.IsReady = false + status.Err = fmt.Errorf("timefield is empty for %s", ds.dsInfo.Database) + } else if "" == timeOutputFormat { + status.Err = fmt.Errorf("timefield's output_format is empty, logs timestamps will not be parsed correctly for %s", ds.dsInfo.Database) + } + + ds.dsInfo.ConfiguredFields.TimeField = timeField + ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat + ds.dsInfo.ShouldInit = false + } + } + readyStatus <- status + } + }(ds, model.ReadyStatus) + return ds, nil } // Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance // created. As soon as datasource settings change detected by SDK old datasource instance will // be disposed and a new one will be created using NewSampleDatasource factory function. func (ds *QuickwitDatasource) Dispose() { - // Clean up datasource instance resources. - // TODO + // FIXME: The ReadyStatus channel should probably be closed here, but doing it + // causes odd calls to healthcheck to fail. Needs investigation + // close(ds.dsInfo.ReadyStatus) } // CheckHealth handles health checks sent from Grafana to the plugin. @@ -152,28 +168,37 @@ func (ds *QuickwitDatasource) Dispose() { // a datasource is working as expected. func (ds *QuickwitDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { res := &backend.CheckHealthResult{} + res.Status = backend.HealthStatusOk + res.Message = "plugin is running" - if err := ds.initDatasource(true); err != nil { - res.Status = backend.HealthStatusError - res.Message = fmt.Errorf("Failed to initialize datasource: %w", err).Error() - return res, nil - } + ds.dsInfo.ShouldInit = true + status := <-ds.dsInfo.ReadyStatus - if ds.dsInfo.ConfiguredFields.TimeField == "" || ds.dsInfo.ConfiguredFields.TimeOutputFormat == "" { + if nil != status.Err { + res.Status = backend.HealthStatusError + res.Message = fmt.Errorf("Failed to initialize datasource: %w", status.Err).Error() + } else if "" == ds.dsInfo.ConfiguredFields.TimeField { res.Status = backend.HealthStatusError res.Message = fmt.Sprintf("timefield is missing from index config \"%s\"", ds.dsInfo.Database) - return res, nil + } else if "" == ds.dsInfo.ConfiguredFields.TimeOutputFormat { + res.Status = backend.HealthStatusError + res.Message = fmt.Sprintf("timefield's output_format is missing from index config \"%s\"", ds.dsInfo.Database) } + qwlog.Debug(res.Message) - res.Status = backend.HealthStatusOk - res.Message = "plugin is running" return res, nil } func (ds *QuickwitDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { // Ensure ds is initialized, we need timestamp infos - if err := ds.initDatasource(false); err != nil { - return &backend.QueryDataResponse{}, fmt.Errorf("Failed to initialize datasource") + status := <-ds.dsInfo.ReadyStatus + if !status.IsReady { + qwlog.Debug(fmt.Errorf("Datasource initialization failed: %w", status.Err).Error()) + response := &backend.QueryDataResponse{ + Responses: backend.Responses{}, + } + response.Responses["__qwQueryDataError"] = backend.ErrDataResponse(backend.StatusInternal, "Datasource initialization failed") + return response, nil } return queryData(ctx, req.Queries, &ds.dsInfo)