From 218f0c13902a55650079eb0171b845806d008cdf Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Sun, 19 May 2024 19:36:10 +0200 Subject: [PATCH] Search worker unification - removing duplicated ones (#141) This PR combines `searchAggregationWorker` with `searchWorker` and `searchAggregationWorkerCommon` with `searchWorkerCommon` --------- Co-authored-by: Jacek Migdal --- quesma/quesma/search.go | 128 +++++++++++++--------------------------- 1 file changed, 41 insertions(+), 87 deletions(-) diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 545f8ed48..12ca3dc50 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -232,28 +232,43 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin } oldHandlingUsed = true fullQuery, columns := q.makeBasicQuery(ctx, queryTranslator, table, simpleQuery, queryInfo, highlighter) + var columnsSlice [][]string if optAsync != nil { go func() { defer recovery.LogAndHandlePanic(ctx, func() { optAsync.doneCh <- AsyncSearchWithError{err: errors.New("panic")} }) - q.searchWorker(ctx, *fullQuery, columns, queryTranslator, table, optAsync) + translatedQueryBody, hitsSlice := q.searchWorker(ctx, []model.Query{*fullQuery}, append(columnsSlice, columns), table, false, optAsync) + searchResponse, err := queryTranslator.MakeSearchResponse(hitsSlice[0], fullQuery.QueryInfo.Typ, fullQuery.Highlighter) + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, fullQuery.QueryInfo, hits) + optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} + return + } + optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody} }() } else { - translatedQueryBody, hits = q.searchWorker(ctx, *fullQuery, columns, queryTranslator, table, nil) + var hitsSlice [][]model.QueryResultRow + translatedQueryBody, hitsSlice = q.searchWorker(ctx, []model.Query{*fullQuery}, append(columnsSlice, columns), table, false, nil) + if len(hitsSlice) > 0 { + // there is only one query + hits = hitsSlice[0] + } } } else if aggregations, err = queryTranslator.ParseAggregationJson(string(body)); err == nil { newAggregationHandlingUsed = true - columns := []string{} + columns := make([][]string, len(aggregations)) if optAsync != nil { go func() { defer recovery.LogAndHandlePanic(ctx, func() { optAsync.doneCh <- AsyncSearchWithError{err: errors.New("panic")} }) - q.searchAggregationWorker(ctx, aggregations, columns, queryTranslator, table, optAsync) + translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, optAsync) + searchResponse := queryTranslator.MakeResponseAggregation(aggregations, aggregationResults) + optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody} }() } else { - translatedQueryBody, aggregationResults = q.searchAggregationWorker(ctx, aggregations, columns, queryTranslator, table, nil) + translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, nil) } } @@ -492,49 +507,13 @@ func (q *QueryRunner) makeBasicQuery(ctx context.Context, return fullQuery, columns } -func (q *QueryRunner) searchWorkerCommon(ctx context.Context, fullQuery model.Query, columns []string, queryTranslator IQueryTranslator, - table *clickhouse.Table, optAsync *AsyncQuery) (translatedQueryBody []byte, hits []model.QueryResultRow) { - - if optAsync != nil && q.reachedQueriesLimit(ctx, optAsync.asyncRequestIdStr, optAsync.doneCh) { - return - } - - var err error - - var dbQueryCtx context.Context - if optAsync != nil { - var dbCancel context.CancelFunc - dbQueryCtx, dbCancel = context.WithCancel(context.Background()) - q.addAsyncQueryContext(dbQueryCtx, dbCancel, optAsync.asyncRequestIdStr) - } else { - dbQueryCtx = ctx - } - - hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, &fullQuery, columns) - translatedQueryBody = []byte(fullQuery.String()) - if err != nil { - logger.ErrorWithCtx(ctx).Msgf("Rows: %+v, err: %+v", hits, err) - if optAsync != nil { - optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} - return - } - } - if optAsync != nil { - searchResponse, err := queryTranslator.MakeSearchResponse(hits, fullQuery.QueryInfo.Typ, fullQuery.Highlighter) - if err != nil { - logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, fullQuery.QueryInfo, hits) - optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} - return - } - optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil} - } - return -} - -func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggregations []model.Query, - columns []string, - queryTranslator IQueryTranslator, table *clickhouse.Table, - optAsync *AsyncQuery) (translatedQueryBody []byte, resultRows [][]model.QueryResultRow) { +func (q *QueryRunner) searchWorkerCommon( + ctx context.Context, + queries []model.Query, + columns [][]string, + table *clickhouse.Table, + doPostProcessing bool, + optAsync *AsyncQuery) (translatedQueryBody []byte, hits [][]model.QueryResultRow) { if optAsync != nil && q.reachedQueriesLimit(ctx, optAsync.asyncRequestIdStr, optAsync.doneCh) { return @@ -549,66 +528,41 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega } else { dbQueryCtx = ctx } - logger.InfoWithCtx(ctx).Msg("we're using new Aggregation handling.") - for _, agg := range aggregations { - logger.InfoWithCtx(ctx).Msgf("aggregation: %+v", agg) - if agg.NoDBQuery { - logger.InfoWithCtx(ctx).Msgf("pipeline query: %+v", agg) + for columnsIndex, query := range queries { + if query.NoDBQuery { + logger.InfoWithCtx(ctx).Msgf("pipeline query: %+v", query) } else { - logger.InfoWithCtx(ctx).Msgf("SQL: %s", agg.String()) - sqls += agg.String() + "\n" + logger.InfoWithCtx(ctx).Msgf("SQL: %s", query.String()) + sqls += query.String() + "\n" } - rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg, nil) + rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &query, columns[columnsIndex]) if err != nil { logger.ErrorWithCtx(ctx).Msg(err.Error()) continue } - postprocessedRows := agg.Type.PostprocessResults(rows) - resultRows = append(resultRows, postprocessedRows) + if doPostProcessing { + rows = query.Type.PostprocessResults(rows) + } + hits = append(hits, rows) } translatedQueryBody = []byte(sqls) - if optAsync != nil { - searchResponse := queryTranslator.MakeResponseAggregation(aggregations, resultRows) - optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil} - } return } func (q *QueryRunner) searchWorker(ctx context.Context, - fullQuery model.Query, - columns []string, - queryTranslator IQueryTranslator, - table *clickhouse.Table, - optAsync *AsyncQuery) (translatedQueryBody []byte, hits []model.QueryResultRow) { - if optAsync == nil { - return q.searchWorkerCommon(ctx, fullQuery, columns, queryTranslator, table, nil) - } else { - select { - case <-q.executionCtx.Done(): - return - default: - _, _ = q.searchWorkerCommon(ctx, fullQuery, columns, queryTranslator, table, optAsync) - return - } - } -} - -func (q *QueryRunner) searchAggregationWorker(ctx context.Context, aggregations []model.Query, - columns []string, - queryTranslator IQueryTranslator, + columns [][]string, table *clickhouse.Table, + doPostProcessing bool, optAsync *AsyncQuery) (translatedQueryBody []byte, resultRows [][]model.QueryResultRow) { if optAsync == nil { - return q.searchAggregationWorkerCommon(ctx, aggregations, columns, queryTranslator, table, nil) - + return q.searchWorkerCommon(ctx, aggregations, columns, table, doPostProcessing, nil) } else { select { case <-q.executionCtx.Done(): return default: - _, _ = q.searchAggregationWorkerCommon(ctx, aggregations, columns, queryTranslator, table, optAsync) - return + return q.searchWorkerCommon(ctx, aggregations, columns, table, doPostProcessing, optAsync) } } }