Skip to content

Commit

Permalink
Simplifying QueryRunner method signatures, some parameters are fields…
Browse files Browse the repository at this point in the history
… now (#53)
  • Loading branch information
pdelewski authored May 7, 2024
1 parent 8ed5e55 commit c80cb98
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 59 deletions.
2 changes: 1 addition & 1 deletion quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewQuesmaTcpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, config config.Qu

func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, config config.QuesmaConfiguration, logChan <-chan tracing.LogWithLevel) *Quesma {
quesmaManagementConsole := ui.NewQuesmaManagementConsole(config, logManager, indexManager, logChan, phoneHomeAgent)
queryRunner := NewQueryRunner(logManager)
queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole)
router := configureRouter(config, logManager, indexManager, quesmaManagementConsole, phoneHomeAgent, queryRunner)
return &Quesma{
telemetryAgent: phoneHomeAgent,
Expand Down
8 changes: 4 additions & 4 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
router.RegisterPathMatcher(routes.GlobalSearchPath, "POST", func(_ map[string]string, _ string) bool {
return true // for now, always route to Quesma, in the near future: combine results from both sources
}, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body), cfg, im, console)
responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body))
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand All @@ -154,7 +154,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
})

router.RegisterPathMatcher(routes.IndexSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body), cfg, im, console)
responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body))
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand All @@ -179,7 +179,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
keepOnCompletion = true
}
}
responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), im, console, waitForResultsMs, keepOnCompletion)
responseBody, err := queryRunner.handleAsyncSearch(ctx, params["index"], []byte(body), waitForResultsMs, keepOnCompletion)
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand Down Expand Up @@ -240,7 +240,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
})

router.RegisterPathMatcher(routes.EQLSearch, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body), cfg, im, console)
responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body))
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand Down
57 changes: 26 additions & 31 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,19 @@ type AsyncQueryContext struct {
}

type QueryRunner struct {
executionCtx context.Context
cancel context.CancelFunc
AsyncRequestStorage *concurrent.Map[string, AsyncRequestResult]
AsyncQueriesContexts *concurrent.Map[string, *AsyncQueryContext]
logManager *clickhouse.LogManager
executionCtx context.Context
cancel context.CancelFunc
AsyncRequestStorage *concurrent.Map[string, AsyncRequestResult]
AsyncQueriesContexts *concurrent.Map[string, *AsyncQueryContext]
logManager *clickhouse.LogManager
cfg config.QuesmaConfiguration
im elasticsearch.IndexManagement
quesmaManagementConsole *ui.QuesmaManagementConsole
}

func NewQueryRunner(lm *clickhouse.LogManager) *QueryRunner {
func NewQueryRunner(lm *clickhouse.LogManager, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement, qmc *ui.QuesmaManagementConsole) *QueryRunner {
ctx, cancel := context.WithCancel(context.Background())
return &QueryRunner{logManager: lm, executionCtx: ctx, cancel: cancel, AsyncRequestStorage: concurrent.NewMap[string, AsyncRequestResult](), AsyncQueriesContexts: concurrent.NewMap[string, *AsyncQueryContext]()}
return &QueryRunner{logManager: lm, cfg: cfg, im: im, quesmaManagementConsole: qmc, executionCtx: ctx, cancel: cancel, AsyncRequestStorage: concurrent.NewMap[string, AsyncRequestResult](), AsyncQueriesContexts: concurrent.NewMap[string, *AsyncQueryContext]()}
}

func NewAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, id string) *AsyncQueryContext {
Expand All @@ -78,22 +81,16 @@ func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int
}
}

func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body []byte,
cfg config.QuesmaConfiguration,
im elasticsearch.IndexManagement,
quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) {
return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageDefault)
func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body []byte) ([]byte, error) {
return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageDefault)
}

func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, body []byte,
cfg config.QuesmaConfiguration,
im elasticsearch.IndexManagement,
quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) {
return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageEQL)
func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, body []byte) ([]byte, error) {
return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageEQL)
}

func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte,
im elasticsearch.IndexManagement, quesmaManagementConsole *ui.QuesmaManagementConsole, waitForResultsMs int, keepOnCompletion bool) ([]byte, error) {
func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string, body []byte,
waitForResultsMs int, keepOnCompletion bool) ([]byte, error) {
async := AsyncQuery{
asyncRequestIdStr: generateAsyncRequestId(),
doneCh: make(chan AsyncSearchWithError, 1),
Expand All @@ -103,7 +100,7 @@ func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaCo
}
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, async.asyncRequestIdStr)
logger.InfoWithCtx(ctx).Msgf("async search request id: %s started", async.asyncRequestIdStr)
return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, &async, QueryLanguageDefault)
return q.handleSearchCommon(ctx, indexPattern, body, &async, QueryLanguageDefault)
}

type AsyncSearchWithError struct {
Expand All @@ -120,11 +117,9 @@ type AsyncQuery struct {
startTime time.Time
}

func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte,
im elasticsearch.IndexManagement,
qmc *ui.QuesmaManagementConsole, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) {
func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body []byte, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) {

sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, cfg, im, q.logManager)
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im, q.logManager)

switch sources {
case sourceBoth:
Expand Down Expand Up @@ -249,14 +244,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery)
pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}
countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery)
pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}
if len(countResult) > 0 {
Expand All @@ -274,7 +269,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
} else {
responseBody = []byte("Invalid Query, err: " + simpleQuery.Sql.Stmt)
logger.ErrorWithCtxAndReason(ctx, "Quesma generated invalid SQL query").Msg(string(responseBody))
pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, errors.New(string(responseBody))
}

Expand All @@ -288,7 +283,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
}
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, queryInfo, hits)
pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}

Expand All @@ -306,19 +301,19 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
}
responseBody, err = response.Marshal()

pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime)
pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
} else {
select {
case <-time.After(time.Duration(optAsync.waitForResultsMs) * time.Millisecond):
go func() { // Async search takes longer. Return partial results and wait for
recovery.LogPanicWithCtx(ctx)
res := <-optAsync.doneCh
q.storeAsyncSearch(qmc, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res, true)
q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res, true)
}()
return q.handlePartialAsyncSearch(ctx, optAsync.asyncRequestIdStr)
case res := <-optAsync.doneCh:
responseBody, err = q.storeAsyncSearch(qmc, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res,
responseBody, err = q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res,
optAsync.keepOnCompletion)

return responseBody, err
Expand Down
8 changes: 4 additions & 4 deletions quesma/quesma/search_opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func TestSearchOpensearch(t *testing.T) {
mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}

queryRunner := NewQueryRunner(lm)
_, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
_, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson))
assert.NoError(t, err)

if err = mock.ExpectationsWereMet(); err != nil {
Expand Down Expand Up @@ -178,8 +178,8 @@ func TestHighlighter(t *testing.T) {
AddRow("text-to-highlight", "text-to-highlight", "text-to-highlight").
AddRow("text", "text", "text"))

queryRunner := NewQueryRunner(lm)
response, err := queryRunner.handleSearch(ctx, tableName, []byte(query), cfg, nil, managementConsole)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
response, err := queryRunner.handleSearch(ctx, tableName, []byte(query))
assert.NoError(t, err)
if err = mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down
38 changes: 19 additions & 19 deletions quesma/quesma/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func TestAsyncSearchHandler(t *testing.T) {
}
mock.ExpectQuery(wantedRegex).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
_, err = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryJson), defaultAsyncSearchTimeout, true)
assert.NoError(t, err)

if err := mock.ExpectationsWereMet(); err != nil {
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestAsyncSearchHandlerSpecialCharacters(t *testing.T) {
mock.ExpectQuery(testdata.EscapeBrackets(expectedSql)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}

queryRunner := NewQueryRunner(lm)
_, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryRequestJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
_, err = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryRequestJson), defaultAsyncSearchTimeout, true)
assert.NoError(t, err)

if err = mock.ExpectationsWereMet(); err != nil {
Expand Down Expand Up @@ -181,8 +181,8 @@ func TestSearchHandler(t *testing.T) {
mock.ExpectQuery(testdata.EscapeWildcard(testdata.EscapeBrackets(wantedRegex))).
WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson))

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand All @@ -208,8 +208,8 @@ func TestSearchHandlerNoAttrsConfig(t *testing.T) {
for _, wantedRegex := range tt.WantedRegexes {
mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson))

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand All @@ -234,8 +234,8 @@ func TestAsyncSearchFilter(t *testing.T) {
for _, wantedRegex := range tt.WantedRegexes {
mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, _ = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
_, _ = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryJson), defaultAsyncSearchTimeout, true)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
}
Expand Down Expand Up @@ -308,8 +308,8 @@ func TestHandlingDateTimeFields(t *testing.T) {
mock.ExpectQuery(testdata.EscapeBrackets(expectedSelectStatementRegex[fieldName])).
WillReturnRows(sqlmock.NewRows([]string{"key", "doc_count"}))
// .AddRow(1000, uint64(10)).AddRow(1001, uint64(20))) // here rows should be added if uint64 were supported
queryRunner := NewQueryRunner(lm)
response, err := queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(query(fieldName)), nil, managementConsole, defaultAsyncSearchTimeout, true)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
response, err := queryRunner.handleAsyncSearch(ctx, tableName, []byte(query(fieldName)), defaultAsyncSearchTimeout, true)
assert.NoError(t, err)

var responseMap model.JsonMap
Expand Down Expand Up @@ -363,12 +363,12 @@ func TestNumericFacetsQueries(t *testing.T) {
// Don't care about the query's SQL in this test, it's thoroughly tested in different tests, thus ""
mock.ExpectQuery("").WillReturnRows(returnedBuckets)

queryRunner := NewQueryRunner(lm)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
var response []byte
if handlerName == "handleSearch" {
response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)
response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson))
} else if handlerName == "handleAsyncSearch" {
response, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
response, err = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryJson), defaultAsyncSearchTimeout, true)
}
assert.NoError(t, err)

Expand Down Expand Up @@ -420,9 +420,9 @@ func TestAllUnsupportedQueryTypesAreProperlyRecorded(t *testing.T) {
managementConsole := ui.NewQuesmaManagementConsole(cfg, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent())
go managementConsole.RunOnlyChannelProcessor()

queryRunner := NewQueryRunner(lm)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId())
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson), cfg, nil, managementConsole)
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson))

for _, queryType := range model.AggregationQueryTypes {
if queryType != tt.AggregationName {
Expand Down Expand Up @@ -471,10 +471,10 @@ func TestDifferentUnsupportedQueries(t *testing.T) {
managementConsole := ui.NewQuesmaManagementConsole(cfg, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent())
go managementConsole.RunOnlyChannelProcessor()

queryRunner := NewQueryRunner(lm)
queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole)
for _, testNr := range testNrs {
newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId())
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson), cfg, nil, managementConsole)
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson))

}

Expand Down

0 comments on commit c80cb98

Please sign in to comment.