Skip to content

Commit

Permalink
Search worker unification - removing duplicated ones (#141)
Browse files Browse the repository at this point in the history
This PR combines `searchAggregationWorker` with `searchWorker` and
`searchAggregationWorkerCommon` with `searchWorkerCommon`

---------

Co-authored-by: Jacek Migdal <[email protected]>
  • Loading branch information
pdelewski and jakozaur authored May 19, 2024
1 parent 440bfe0 commit 218f0c1
Showing 1 changed file with 41 additions and 87 deletions.
128 changes: 41 additions & 87 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,28 +232,43 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
}
oldHandlingUsed = true
fullQuery, columns := q.makeBasicQuery(ctx, queryTranslator, table, simpleQuery, queryInfo, highlighter)
var columnsSlice [][]string
if optAsync != nil {
go func() {
defer recovery.LogAndHandlePanic(ctx, func() {
optAsync.doneCh <- AsyncSearchWithError{err: errors.New("panic")}
})
q.searchWorker(ctx, *fullQuery, columns, queryTranslator, table, optAsync)
translatedQueryBody, hitsSlice := q.searchWorker(ctx, []model.Query{*fullQuery}, append(columnsSlice, columns), table, false, optAsync)
searchResponse, err := queryTranslator.MakeSearchResponse(hitsSlice[0], fullQuery.QueryInfo.Typ, fullQuery.Highlighter)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, fullQuery.QueryInfo, hits)
optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
return
}
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody}
}()
} else {
translatedQueryBody, hits = q.searchWorker(ctx, *fullQuery, columns, queryTranslator, table, nil)
var hitsSlice [][]model.QueryResultRow
translatedQueryBody, hitsSlice = q.searchWorker(ctx, []model.Query{*fullQuery}, append(columnsSlice, columns), table, false, nil)
if len(hitsSlice) > 0 {
// there is only one query
hits = hitsSlice[0]
}
}
} else if aggregations, err = queryTranslator.ParseAggregationJson(string(body)); err == nil {
newAggregationHandlingUsed = true
columns := []string{}
columns := make([][]string, len(aggregations))
if optAsync != nil {
go func() {
defer recovery.LogAndHandlePanic(ctx, func() {
optAsync.doneCh <- AsyncSearchWithError{err: errors.New("panic")}
})
q.searchAggregationWorker(ctx, aggregations, columns, queryTranslator, table, optAsync)
translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, optAsync)
searchResponse := queryTranslator.MakeResponseAggregation(aggregations, aggregationResults)
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody}
}()
} else {
translatedQueryBody, aggregationResults = q.searchAggregationWorker(ctx, aggregations, columns, queryTranslator, table, nil)
translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, nil)
}
}

Expand Down Expand Up @@ -492,49 +507,13 @@ func (q *QueryRunner) makeBasicQuery(ctx context.Context,
return fullQuery, columns
}

func (q *QueryRunner) searchWorkerCommon(ctx context.Context, fullQuery model.Query, columns []string, queryTranslator IQueryTranslator,
table *clickhouse.Table, optAsync *AsyncQuery) (translatedQueryBody []byte, hits []model.QueryResultRow) {

if optAsync != nil && q.reachedQueriesLimit(ctx, optAsync.asyncRequestIdStr, optAsync.doneCh) {
return
}

var err error

var dbQueryCtx context.Context
if optAsync != nil {
var dbCancel context.CancelFunc
dbQueryCtx, dbCancel = context.WithCancel(context.Background())
q.addAsyncQueryContext(dbQueryCtx, dbCancel, optAsync.asyncRequestIdStr)
} else {
dbQueryCtx = ctx
}

hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, &fullQuery, columns)
translatedQueryBody = []byte(fullQuery.String())
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("Rows: %+v, err: %+v", hits, err)
if optAsync != nil {
optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
return
}
}
if optAsync != nil {
searchResponse, err := queryTranslator.MakeSearchResponse(hits, fullQuery.QueryInfo.Typ, fullQuery.Highlighter)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, fullQuery.QueryInfo, hits)
optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
return
}
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil}
}
return
}

func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggregations []model.Query,
columns []string,
queryTranslator IQueryTranslator, table *clickhouse.Table,
optAsync *AsyncQuery) (translatedQueryBody []byte, resultRows [][]model.QueryResultRow) {
func (q *QueryRunner) searchWorkerCommon(
ctx context.Context,
queries []model.Query,
columns [][]string,
table *clickhouse.Table,
doPostProcessing bool,
optAsync *AsyncQuery) (translatedQueryBody []byte, hits [][]model.QueryResultRow) {

if optAsync != nil && q.reachedQueriesLimit(ctx, optAsync.asyncRequestIdStr, optAsync.doneCh) {
return
Expand All @@ -549,66 +528,41 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega
} else {
dbQueryCtx = ctx
}
logger.InfoWithCtx(ctx).Msg("we're using new Aggregation handling.")
for _, agg := range aggregations {
logger.InfoWithCtx(ctx).Msgf("aggregation: %+v", agg)
if agg.NoDBQuery {
logger.InfoWithCtx(ctx).Msgf("pipeline query: %+v", agg)
for columnsIndex, query := range queries {
if query.NoDBQuery {
logger.InfoWithCtx(ctx).Msgf("pipeline query: %+v", query)
} else {
logger.InfoWithCtx(ctx).Msgf("SQL: %s", agg.String())
sqls += agg.String() + "\n"
logger.InfoWithCtx(ctx).Msgf("SQL: %s", query.String())
sqls += query.String() + "\n"
}
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg, nil)
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &query, columns[columnsIndex])
if err != nil {
logger.ErrorWithCtx(ctx).Msg(err.Error())
continue
}
postprocessedRows := agg.Type.PostprocessResults(rows)
resultRows = append(resultRows, postprocessedRows)
if doPostProcessing {
rows = query.Type.PostprocessResults(rows)
}
hits = append(hits, rows)
}
translatedQueryBody = []byte(sqls)
if optAsync != nil {
searchResponse := queryTranslator.MakeResponseAggregation(aggregations, resultRows)
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil}
}
return
}

func (q *QueryRunner) searchWorker(ctx context.Context,
fullQuery model.Query,
columns []string,
queryTranslator IQueryTranslator,
table *clickhouse.Table,
optAsync *AsyncQuery) (translatedQueryBody []byte, hits []model.QueryResultRow) {
if optAsync == nil {
return q.searchWorkerCommon(ctx, fullQuery, columns, queryTranslator, table, nil)
} else {
select {
case <-q.executionCtx.Done():
return
default:
_, _ = q.searchWorkerCommon(ctx, fullQuery, columns, queryTranslator, table, optAsync)
return
}
}
}

func (q *QueryRunner) searchAggregationWorker(ctx context.Context,
aggregations []model.Query,
columns []string,
queryTranslator IQueryTranslator,
columns [][]string,
table *clickhouse.Table,
doPostProcessing bool,
optAsync *AsyncQuery) (translatedQueryBody []byte, resultRows [][]model.QueryResultRow) {
if optAsync == nil {
return q.searchAggregationWorkerCommon(ctx, aggregations, columns, queryTranslator, table, nil)

return q.searchWorkerCommon(ctx, aggregations, columns, table, doPostProcessing, nil)
} else {
select {
case <-q.executionCtx.Done():
return
default:
_, _ = q.searchAggregationWorkerCommon(ctx, aggregations, columns, queryTranslator, table, optAsync)
return
return q.searchWorkerCommon(ctx, aggregations, columns, table, doPostProcessing, optAsync)
}
}
}
Expand Down

0 comments on commit 218f0c1

Please sign in to comment.