diff --git a/quesma/quesma/async_search_storage/in_memory.go b/quesma/quesma/async_search_storage/in_memory.go index b8e6c3c03..e0a0f29f1 100644 --- a/quesma/quesma/async_search_storage/in_memory.go +++ b/quesma/quesma/async_search_storage/in_memory.go @@ -75,16 +75,16 @@ func elapsedTime(t time.Time) time.Duration { return time.Since(t) } -type AsyncQueryIdWithTime struct { +type asyncQueryIdWithTime struct { id string time time.Time } func (e *AsyncQueriesEvictor) tryEvictAsyncRequests(timeFun func(time.Time) time.Duration) { - var ids []AsyncQueryIdWithTime + var ids []asyncQueryIdWithTime e.AsyncRequestStorage.Range(func(key string, value *AsyncRequestResult) bool { if timeFun(value.added) > EvictionInterval { - ids = append(ids, AsyncQueryIdWithTime{id: key, time: value.added}) + ids = append(ids, asyncQueryIdWithTime{id: key, time: value.added}) } return true }) diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index c610fb5da..d23c7cb5b 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -135,7 +135,7 @@ func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string return q.handleSearchCommon(ctx, indexPattern, body, &async, QueryLanguageDefault) } -type AsyncSearchWithError struct { +type asyncSearchWithError struct { response *model.SearchResp translatedQueryBody []types.TranslatedSQLQuery err error @@ -173,33 +173,33 @@ func (q *QueryRunner) checkProperties(ctx context.Context, plan *model.Execution return nil, nil } -func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, doneCh chan AsyncSearchWithError, optAsync *AsyncQuery) { +func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, doneCh chan asyncSearchWithError, optAsync *AsyncQuery) { go func() { defer recovery.LogAndHandlePanic(ctx, func(err error) { - doneCh <- AsyncSearchWithError{err: err} + doneCh <- asyncSearchWithError{err: err} }) translatedQueryBody, results, err := q.searchWorker(ctx, plan, table, doneCh, optAsync) if err != nil { - doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} + doneCh <- asyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} return } if len(plan.Queries) > 0 && len(results) == 0 { // if there are no queries, empty results are fine logger.ErrorWithCtx(ctx).Msgf("no hits, sqls: %v", translatedQueryBody) - doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: errors.New("no hits")} + doneCh <- asyncSearchWithError{translatedQueryBody: translatedQueryBody, err: errors.New("no hits")} return } results, err = q.postProcessResults(plan, results) if err != nil { - doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} + doneCh <- asyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} } searchResponse := queryTranslator.MakeSearchResponse(plan.Queries, results) - doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: err} + doneCh <- asyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: err} }() } @@ -209,7 +209,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan path := contextValues.RequestPath opaqueId := contextValues.OpaqueId - doneCh := make(chan AsyncSearchWithError, 1) + doneCh := make(chan asyncSearchWithError, 1) sendMainPlanResult := func(responseBody []byte, err error) { if optComparePlansCh != nil { @@ -460,32 +460,19 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin } func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyncId string, - startTime time.Time, path string, body types.JSON, result AsyncSearchWithError, keep bool, opaqueId string) (responseBody []byte, err error) { + startTime time.Time, path string, body types.JSON, result asyncSearchWithError, keep bool, opaqueId string) (responseBody []byte, err error) { + took := time.Since(startTime) - if result.err != nil { - if keep { - q.AsyncRequestStorage.Store(asyncId, // maybe responseBody line below should be an empty array? - async_search_storage.NewAsyncRequestResult(nil, result.err, time.Now(), false)) - } + bodyAsBytes, _ := body.Bytes() + if result.err == nil { + okStatus := 200 + asyncResponse := queryparser.SearchToAsyncSearchResponse(result.response, asyncId, false, &okStatus) + responseBody, err = asyncResponse.Marshal() + } else { responseBody, _ = queryparser.EmptyAsyncSearchResponse(asyncId, false, 503) err = result.err - bodyAsBytes, _ := body.Bytes() - qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{ - Id: id, - AsyncId: asyncId, - OpaqueId: opaqueId, - Path: path, - IncomingQueryBody: bodyAsBytes, - QueryBodyTranslated: result.translatedQueryBody, - QueryTranslatedResults: responseBody, - SecondaryTook: took, - }) - return } - okStatus := 200 - asyncResponse := queryparser.SearchToAsyncSearchResponse(result.response, asyncId, false, &okStatus) - responseBody, err = asyncResponse.Marshal() - bodyAsBytes, _ := body.Bytes() + qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{ Id: id, AsyncId: asyncId, @@ -496,6 +483,7 @@ func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyn QueryTranslatedResults: responseBody, SecondaryTook: took, }) + if keep { compressedBody := responseBody isCompressed := false @@ -505,9 +493,9 @@ func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyn isCompressed = true } } - q.AsyncRequestStorage.Store(asyncId, - async_search_storage.NewAsyncRequestResult(compressedBody, err, time.Now(), isCompressed)) + q.AsyncRequestStorage.Store(asyncId, async_search_storage.NewAsyncRequestResult(compressedBody, err, time.Now(), isCompressed)) } + return } @@ -563,13 +551,13 @@ func (q *QueryRunner) deleteAsyncSeach(id string) ([]byte, error) { return []byte(`{"acknowledged":true}`), nil } -func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, doneCh chan<- AsyncSearchWithError) bool { +func (q *QueryRunner) reachedQueriesLimit(ctx context.Context, asyncId string, doneCh chan<- asyncSearchWithError) bool { if q.AsyncRequestStorage.Size() < asyncQueriesLimit && q.asyncQueriesCumulatedBodySize() < asyncQueriesLimitBytes { return false } err := errors.New("too many async queries") logger.ErrorWithCtx(ctx).Msgf("cannot handle %s, too many async queries", asyncId) - doneCh <- AsyncSearchWithError{err: err} + doneCh <- asyncSearchWithError{err: err} return true } @@ -775,7 +763,7 @@ func (q *QueryRunner) searchWorkerCommon( func (q *QueryRunner) searchWorker(ctx context.Context, plan *model.ExecutionPlan, table *clickhouse.Table, - doneCh chan<- AsyncSearchWithError, + doneCh chan<- asyncSearchWithError, optAsync *AsyncQuery) (translatedQueryBody []types.TranslatedSQLQuery, resultRows [][]model.QueryResultRow, err error) { if optAsync != nil { if q.reachedQueriesLimit(ctx, optAsync.asyncId, doneCh) {