Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use goroutine/chan to init, avoid hard failure #128

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/quickwit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
)

type ReadyStatus struct {
IsReady bool
Err error
}

type DatasourceInfo struct {
ID int64
HTTPClient *http.Client
URL string
Database string
ConfiguredFields ConfiguredFields
MaxConcurrentShardRequests int64
IsReady bool
ReadyStatus chan ReadyStatus
ShouldInit bool
}

type ConfiguredFields struct {
Expand Down
109 changes: 67 additions & 42 deletions pkg/quickwit/quickwit.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,46 +104,62 @@ func NewQuickwitDatasource(settings backend.DataSourceInstanceSettings) (instanc
Database: index,
MaxConcurrentShardRequests: int64(maxConcurrentShardRequests),
ConfiguredFields: configuredFields,
IsReady: false,
}
return &QuickwitDatasource{dsInfo: model}, nil
}

// Network dependent datasource initialization.
// This is not done in the "constructor" function to allow saving the ds
// even if the server is not responsive.
func (ds *QuickwitDatasource) initDatasource(force bool) error {
if ds.dsInfo.IsReady && !force {
return nil
}

indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient)
if err != nil {
return fmt.Errorf("failed to get index metadata : %w", err)
}

if len(indexMetadataList) == 0 {
return fmt.Errorf("no index found for %s", ds.dsInfo.Database)
}

timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList)
if nil != err {
return err
ReadyStatus: make(chan es.ReadyStatus, 1),
ShouldInit: true,
}

ds.dsInfo.ConfiguredFields.TimeField = timeField
ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat
ds := &QuickwitDatasource{dsInfo: model}

ds.dsInfo.IsReady = true
return nil
// Create an initialization goroutine
go func(ds *QuickwitDatasource, readyStatus chan<- es.ReadyStatus) {
var status es.ReadyStatus = es.ReadyStatus{
IsReady: false,
Err: nil,
}
for {
// Will retry init everytime the channel is consumed until ready
if !status.IsReady || ds.dsInfo.ShouldInit {
qwlog.Debug("Initializing Datasource")
status.IsReady = true
status.Err = nil

indexMetadataList, err := GetIndexesMetadata(ds.dsInfo.Database, ds.dsInfo.URL, ds.dsInfo.HTTPClient)
if err != nil {
status.IsReady = false
status.Err = fmt.Errorf("failed to get index metadata : %w", err)
} else if len(indexMetadataList) == 0 {
status.IsReady = false
status.Err = fmt.Errorf("no index found for %s", ds.dsInfo.Database)
} else {
timeField, timeOutputFormat, err := GetTimestampFieldInfos(indexMetadataList)
if nil != err {
status.IsReady = false
status.Err = err
} else if "" == timeField {
status.IsReady = false
status.Err = fmt.Errorf("timefield is empty for %s", ds.dsInfo.Database)
} else if "" == timeOutputFormat {
status.Err = fmt.Errorf("timefield's output_format is empty, logs timestamps will not be parsed correctly for %s", ds.dsInfo.Database)
}

ds.dsInfo.ConfiguredFields.TimeField = timeField
ds.dsInfo.ConfiguredFields.TimeOutputFormat = timeOutputFormat
ds.dsInfo.ShouldInit = false
}
}
readyStatus <- status
}
}(ds, model.ReadyStatus)
return ds, nil
}

// Dispose here tells plugin SDK that plugin wants to clean up resources when a new instance
// created. As soon as datasource settings change detected by SDK old datasource instance will
// be disposed and a new one will be created using NewSampleDatasource factory function.
func (ds *QuickwitDatasource) Dispose() {
// Clean up datasource instance resources.
// TODO
// FIXME: The ReadyStatus channel should probably be closed here, but doing it
// causes odd calls to healthcheck to fail. Needs investigation
// close(ds.dsInfo.ReadyStatus)
}

// CheckHealth handles health checks sent from Grafana to the plugin.
Expand All @@ -152,28 +168,37 @@ func (ds *QuickwitDatasource) Dispose() {
// a datasource is working as expected.
func (ds *QuickwitDatasource) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) {
res := &backend.CheckHealthResult{}
res.Status = backend.HealthStatusOk
res.Message = "plugin is running"

if err := ds.initDatasource(true); err != nil {
res.Status = backend.HealthStatusError
res.Message = fmt.Errorf("Failed to initialize datasource: %w", err).Error()
return res, nil
}
ds.dsInfo.ShouldInit = true
status := <-ds.dsInfo.ReadyStatus

if ds.dsInfo.ConfiguredFields.TimeField == "" || ds.dsInfo.ConfiguredFields.TimeOutputFormat == "" {
if nil != status.Err {
res.Status = backend.HealthStatusError
res.Message = fmt.Errorf("Failed to initialize datasource: %w", status.Err).Error()
} else if "" == ds.dsInfo.ConfiguredFields.TimeField {
res.Status = backend.HealthStatusError
res.Message = fmt.Sprintf("timefield is missing from index config \"%s\"", ds.dsInfo.Database)
return res, nil
} else if "" == ds.dsInfo.ConfiguredFields.TimeOutputFormat {
res.Status = backend.HealthStatusError
res.Message = fmt.Sprintf("timefield's output_format is missing from index config \"%s\"", ds.dsInfo.Database)
}
qwlog.Debug(res.Message)

res.Status = backend.HealthStatusOk
res.Message = "plugin is running"
return res, nil
}

func (ds *QuickwitDatasource) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
// Ensure ds is initialized, we need timestamp infos
if err := ds.initDatasource(false); err != nil {
return &backend.QueryDataResponse{}, fmt.Errorf("Failed to initialize datasource")
status := <-ds.dsInfo.ReadyStatus
if !status.IsReady {
qwlog.Debug(fmt.Errorf("Datasource initialization failed: %w", status.Err).Error())
response := &backend.QueryDataResponse{
Responses: backend.Responses{},
}
response.Responses["__qwQueryDataError"] = backend.ErrDataResponse(backend.StatusInternal, "Datasource initialization failed")
return response, nil
}

return queryData(ctx, req.Queries, &ds.dsInfo)
Expand Down
Loading