-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Search worker unification - removing duplicated ones #141
Changes from 17 commits
da13c9e
2d459fd
2bd9f35
e872cee
9ceecaa
489bc10
7fa303e
d4c9b1f
de6b97b
ac13ba1
121ce3d
f0e99e8
bf2c2c6
c68db46
dee0fe1
1dae47e
2da8250
aebab60
c7fbaaa
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -232,28 +232,43 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin | |||||
} | ||||||
oldHandlingUsed = true | ||||||
fullQuery, columns := q.makeBasicQuery(ctx, queryTranslator, table, simpleQuery, queryInfo, highlighter) | ||||||
var columnsSlice [][]string | ||||||
if optAsync != nil { | ||||||
go func() { | ||||||
defer recovery.LogAndHandlePanic(ctx, func() { | ||||||
optAsync.doneCh <- AsyncSearchWithError{err: errors.New("panic")} | ||||||
}) | ||||||
q.searchWorker(ctx, *fullQuery, columns, queryTranslator, table, optAsync) | ||||||
translatedQueryBody, hitsSlice := q.searchWorker(ctx, []model.Query{*fullQuery}, append(columnsSlice, columns), table, false, optAsync) | ||||||
searchResponse, err := queryTranslator.MakeSearchResponse(hitsSlice[0], fullQuery.QueryInfo.Typ, fullQuery.Highlighter) | ||||||
if err != nil { | ||||||
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, fullQuery.QueryInfo, hits) | ||||||
optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} | ||||||
return | ||||||
} | ||||||
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would omit empty nil struct fields as we do above.
Suggested change
|
||||||
}() | ||||||
} else { | ||||||
translatedQueryBody, hits = q.searchWorker(ctx, *fullQuery, columns, queryTranslator, table, nil) | ||||||
var hitsSlice [][]model.QueryResultRow | ||||||
translatedQueryBody, hitsSlice = q.searchWorker(ctx, []model.Query{*fullQuery}, append(columnsSlice, columns), table, false, nil) | ||||||
if len(hitsSlice) > 0 { | ||||||
// there is only one query | ||||||
hits = hitsSlice[0] | ||||||
} | ||||||
} | ||||||
} else if aggregations, err = queryTranslator.ParseAggregationJson(string(body)); err == nil { | ||||||
newAggregationHandlingUsed = true | ||||||
columns := []string{} | ||||||
columns := make([][]string, len(aggregations)) | ||||||
if optAsync != nil { | ||||||
go func() { | ||||||
defer recovery.LogAndHandlePanic(ctx, func() { | ||||||
optAsync.doneCh <- AsyncSearchWithError{err: errors.New("panic")} | ||||||
}) | ||||||
q.searchAggregationWorker(ctx, aggregations, columns, queryTranslator, table, optAsync) | ||||||
translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, optAsync) | ||||||
searchResponse := queryTranslator.MakeResponseAggregation(aggregations, aggregationResults) | ||||||
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As above:
Suggested change
|
||||||
}() | ||||||
} else { | ||||||
translatedQueryBody, aggregationResults = q.searchAggregationWorker(ctx, aggregations, columns, queryTranslator, table, nil) | ||||||
translatedQueryBody, aggregationResults = q.searchWorker(ctx, aggregations, columns, table, true, nil) | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -492,49 +507,13 @@ func (q *QueryRunner) makeBasicQuery(ctx context.Context, | |||||
return fullQuery, columns | ||||||
} | ||||||
|
||||||
func (q *QueryRunner) searchWorkerCommon(ctx context.Context, fullQuery model.Query, columns []string, queryTranslator IQueryTranslator, | ||||||
table *clickhouse.Table, optAsync *AsyncQuery) (translatedQueryBody []byte, hits []model.QueryResultRow) { | ||||||
|
||||||
if optAsync != nil && q.reachedQueriesLimit(ctx, optAsync.asyncRequestIdStr, optAsync.doneCh) { | ||||||
return | ||||||
} | ||||||
|
||||||
var err error | ||||||
|
||||||
var dbQueryCtx context.Context | ||||||
if optAsync != nil { | ||||||
var dbCancel context.CancelFunc | ||||||
dbQueryCtx, dbCancel = context.WithCancel(context.Background()) | ||||||
q.addAsyncQueryContext(dbQueryCtx, dbCancel, optAsync.asyncRequestIdStr) | ||||||
} else { | ||||||
dbQueryCtx = ctx | ||||||
} | ||||||
|
||||||
hits, err = q.logManager.ProcessQuery(dbQueryCtx, table, &fullQuery, columns) | ||||||
translatedQueryBody = []byte(fullQuery.String()) | ||||||
if err != nil { | ||||||
logger.ErrorWithCtx(ctx).Msgf("Rows: %+v, err: %+v", hits, err) | ||||||
if optAsync != nil { | ||||||
optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} | ||||||
return | ||||||
} | ||||||
} | ||||||
if optAsync != nil { | ||||||
searchResponse, err := queryTranslator.MakeSearchResponse(hits, fullQuery.QueryInfo.Typ, fullQuery.Highlighter) | ||||||
if err != nil { | ||||||
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, fullQuery.QueryInfo, hits) | ||||||
optAsync.doneCh <- AsyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err} | ||||||
return | ||||||
} | ||||||
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil} | ||||||
} | ||||||
return | ||||||
} | ||||||
|
||||||
func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggregations []model.Query, | ||||||
columns []string, | ||||||
queryTranslator IQueryTranslator, table *clickhouse.Table, | ||||||
optAsync *AsyncQuery) (translatedQueryBody []byte, resultRows [][]model.QueryResultRow) { | ||||||
func (q *QueryRunner) searchWorkerCommon( | ||||||
ctx context.Context, | ||||||
queries []model.Query, | ||||||
columns [][]string, | ||||||
table *clickhouse.Table, | ||||||
doPostProcessing bool, | ||||||
optAsync *AsyncQuery) (translatedQueryBody []byte, hits [][]model.QueryResultRow) { | ||||||
|
||||||
if optAsync != nil && q.reachedQueriesLimit(ctx, optAsync.asyncRequestIdStr, optAsync.doneCh) { | ||||||
return | ||||||
|
@@ -549,66 +528,46 @@ func (q *QueryRunner) searchAggregationWorkerCommon(ctx context.Context, aggrega | |||||
} else { | ||||||
dbQueryCtx = ctx | ||||||
} | ||||||
logger.InfoWithCtx(ctx).Msg("we're using new Aggregation handling.") | ||||||
for _, agg := range aggregations { | ||||||
logger.InfoWithCtx(ctx).Msgf("aggregation: %+v", agg) | ||||||
if agg.NoDBQuery { | ||||||
logger.InfoWithCtx(ctx).Msgf("pipeline query: %+v", agg) | ||||||
columnsIndex := 0 | ||||||
for _, query := range queries { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this fixes a bug and also you can simplify code:
Suggested change
|
||||||
if query.NoDBQuery { | ||||||
logger.InfoWithCtx(ctx).Msgf("pipeline query: %+v", query) | ||||||
} else { | ||||||
logger.InfoWithCtx(ctx).Msgf("SQL: %s", agg.String()) | ||||||
sqls += agg.String() + "\n" | ||||||
logger.InfoWithCtx(ctx).Msgf("SQL: %s", query.String()) | ||||||
sqls += query.String() + "\n" | ||||||
} | ||||||
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &agg, nil) | ||||||
rows, err := q.logManager.ProcessQuery(dbQueryCtx, table, &query, columns[columnsIndex]) | ||||||
if err != nil { | ||||||
logger.ErrorWithCtx(ctx).Msg(err.Error()) | ||||||
continue | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure why here we are not incrementing |
||||||
} | ||||||
postprocessedRows := agg.Type.PostprocessResults(rows) | ||||||
resultRows = append(resultRows, postprocessedRows) | ||||||
if doPostProcessing { | ||||||
postprocessedRows := query.Type.PostprocessResults(rows) | ||||||
hits = append(hits, postprocessedRows) | ||||||
} else { | ||||||
hits = append(hits, rows) | ||||||
} | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Woulnd't it be simpler if we:
|
||||||
columnsIndex++ | ||||||
} | ||||||
translatedQueryBody = []byte(sqls) | ||||||
if optAsync != nil { | ||||||
searchResponse := queryTranslator.MakeResponseAggregation(aggregations, resultRows) | ||||||
optAsync.doneCh <- AsyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: nil} | ||||||
} | ||||||
return | ||||||
} | ||||||
|
||||||
func (q *QueryRunner) searchWorker(ctx context.Context, | ||||||
fullQuery model.Query, | ||||||
columns []string, | ||||||
queryTranslator IQueryTranslator, | ||||||
table *clickhouse.Table, | ||||||
optAsync *AsyncQuery) (translatedQueryBody []byte, hits []model.QueryResultRow) { | ||||||
if optAsync == nil { | ||||||
return q.searchWorkerCommon(ctx, fullQuery, columns, queryTranslator, table, nil) | ||||||
} else { | ||||||
select { | ||||||
case <-q.executionCtx.Done(): | ||||||
return | ||||||
default: | ||||||
_, _ = q.searchWorkerCommon(ctx, fullQuery, columns, queryTranslator, table, optAsync) | ||||||
return | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
func (q *QueryRunner) searchAggregationWorker(ctx context.Context, | ||||||
aggregations []model.Query, | ||||||
columns []string, | ||||||
queryTranslator IQueryTranslator, | ||||||
columns [][]string, | ||||||
table *clickhouse.Table, | ||||||
doPostProcessing bool, | ||||||
optAsync *AsyncQuery) (translatedQueryBody []byte, resultRows [][]model.QueryResultRow) { | ||||||
if optAsync == nil { | ||||||
return q.searchAggregationWorkerCommon(ctx, aggregations, columns, queryTranslator, table, nil) | ||||||
return q.searchWorkerCommon(ctx, aggregations, columns, table, doPostProcessing, nil) | ||||||
|
||||||
} else { | ||||||
select { | ||||||
case <-q.executionCtx.Done(): | ||||||
return | ||||||
default: | ||||||
_, _ = q.searchAggregationWorkerCommon(ctx, aggregations, columns, queryTranslator, table, optAsync) | ||||||
return | ||||||
return q.searchWorkerCommon(ctx, aggregations, columns, table, doPostProcessing, optAsync) | ||||||
} | ||||||
} | ||||||
} | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This branch and the one that starts in line
258
will be combined next time