diff --git a/quesma/quesma/quesma.go b/quesma/quesma/quesma.go index 954da9b83..c3e449480 100644 --- a/quesma/quesma/quesma.go +++ b/quesma/quesma/quesma.go @@ -100,7 +100,7 @@ func NewQuesmaTcpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, config config.Qu func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, config config.QuesmaConfiguration, logChan <-chan tracing.LogWithLevel) *Quesma { quesmaManagementConsole := ui.NewQuesmaManagementConsole(config, logManager, indexManager, logChan, phoneHomeAgent) - queryRunner := NewQueryRunner(logManager) + queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole) router := configureRouter(config, logManager, indexManager, quesmaManagementConsole, phoneHomeAgent, queryRunner) return &Quesma{ telemetryAgent: phoneHomeAgent, diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index 9ba39a299..ddabc6e44 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -142,7 +142,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, router.RegisterPathMatcher(routes.GlobalSearchPath, "POST", func(_ map[string]string, _ string) bool { return true // for now, always route to Quesma, in the near future: combine results from both sources }, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { - responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body), cfg, im, console) + responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body)) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil @@ -154,7 +154,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, }) router.RegisterPathMatcher(routes.IndexSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { - responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body), cfg, im, console) + responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body)) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil @@ -179,7 +179,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, keepOnCompletion = true } } - responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), im, console, waitForResultsMs, keepOnCompletion) + responseBody, err := queryRunner.handleAsyncSearch(ctx, params["index"], []byte(body), waitForResultsMs, keepOnCompletion) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil @@ -240,7 +240,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, }) router.RegisterPathMatcher(routes.EQLSearch, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { - responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body), cfg, im, console) + responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body)) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index b72df08d6..aec96ebec 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -43,16 +43,19 @@ type AsyncQueryContext struct { } type QueryRunner struct { - executionCtx context.Context - cancel context.CancelFunc - AsyncRequestStorage *concurrent.Map[string, AsyncRequestResult] - AsyncQueriesContexts *concurrent.Map[string, *AsyncQueryContext] - logManager *clickhouse.LogManager + executionCtx context.Context + cancel context.CancelFunc + AsyncRequestStorage *concurrent.Map[string, AsyncRequestResult] + AsyncQueriesContexts *concurrent.Map[string, *AsyncQueryContext] + logManager *clickhouse.LogManager + cfg config.QuesmaConfiguration + im elasticsearch.IndexManagement + quesmaManagementConsole *ui.QuesmaManagementConsole } -func NewQueryRunner(lm *clickhouse.LogManager) *QueryRunner { +func NewQueryRunner(lm *clickhouse.LogManager, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement, qmc *ui.QuesmaManagementConsole) *QueryRunner { ctx, cancel := context.WithCancel(context.Background()) - return &QueryRunner{logManager: lm, executionCtx: ctx, cancel: cancel, AsyncRequestStorage: concurrent.NewMap[string, AsyncRequestResult](), AsyncQueriesContexts: concurrent.NewMap[string, *AsyncQueryContext]()} + return &QueryRunner{logManager: lm, cfg: cfg, im: im, quesmaManagementConsole: qmc, executionCtx: ctx, cancel: cancel, AsyncRequestStorage: concurrent.NewMap[string, AsyncRequestResult](), AsyncQueriesContexts: concurrent.NewMap[string, *AsyncQueryContext]()} } func NewAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, id string) *AsyncQueryContext { @@ -78,22 +81,16 @@ func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int } } -func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body []byte, - cfg config.QuesmaConfiguration, - im elasticsearch.IndexManagement, - quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) { - return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageDefault) +func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body []byte) ([]byte, error) { + return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageDefault) } -func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, body []byte, - cfg config.QuesmaConfiguration, - im elasticsearch.IndexManagement, - quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) { - return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageEQL) +func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, body []byte) ([]byte, error) { + return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageEQL) } -func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte, - im elasticsearch.IndexManagement, quesmaManagementConsole *ui.QuesmaManagementConsole, waitForResultsMs int, keepOnCompletion bool) ([]byte, error) { +func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string, body []byte, + waitForResultsMs int, keepOnCompletion bool) ([]byte, error) { async := AsyncQuery{ asyncRequestIdStr: generateAsyncRequestId(), doneCh: make(chan AsyncSearchWithError, 1), @@ -103,7 +100,7 @@ func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaCo } ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, async.asyncRequestIdStr) logger.InfoWithCtx(ctx).Msgf("async search request id: %s started", async.asyncRequestIdStr) - return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, &async, QueryLanguageDefault) + return q.handleSearchCommon(ctx, indexPattern, body, &async, QueryLanguageDefault) } type AsyncSearchWithError struct { @@ -120,11 +117,9 @@ type AsyncQuery struct { startTime time.Time } -func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte, - im elasticsearch.IndexManagement, - qmc *ui.QuesmaManagementConsole, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) { +func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body []byte, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) { - sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, cfg, im, q.logManager) + sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im, q.logManager) switch sources { case sourceBoth: @@ -249,14 +244,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery) - pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime) + pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery) - pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime) + pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } if len(countResult) > 0 { @@ -274,7 +269,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC } else { responseBody = []byte("Invalid Query, err: " + simpleQuery.Sql.Stmt) logger.ErrorWithCtxAndReason(ctx, "Quesma generated invalid SQL query").Msg(string(responseBody)) - pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime) + pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, errors.New(string(responseBody)) } @@ -288,7 +283,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC } if err != nil { logger.ErrorWithCtx(ctx).Msgf("error making response: %v, queryInfo: %+v, rows: %v", err, queryInfo, hits) - pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime) + pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } @@ -306,7 +301,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC } responseBody, err = response.Marshal() - pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime) + pushSecondaryInfo(q.quesmaManagementConsole, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } else { select { @@ -314,11 +309,11 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC go func() { // Async search takes longer. Return partial results and wait for recovery.LogPanicWithCtx(ctx) res := <-optAsync.doneCh - q.storeAsyncSearch(qmc, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res, true) + q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res, true) }() return q.handlePartialAsyncSearch(ctx, optAsync.asyncRequestIdStr) case res := <-optAsync.doneCh: - responseBody, err = q.storeAsyncSearch(qmc, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res, + responseBody, err = q.storeAsyncSearch(q.quesmaManagementConsole, id, optAsync.asyncRequestIdStr, optAsync.startTime, path, body, res, optAsync.keepOnCompletion) return responseBody, err diff --git a/quesma/quesma/search_opensearch_test.go b/quesma/quesma/search_opensearch_test.go index b01476545..2bf26bc98 100644 --- a/quesma/quesma/search_opensearch_test.go +++ b/quesma/quesma/search_opensearch_test.go @@ -49,8 +49,8 @@ func TestSearchOpensearch(t *testing.T) { mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm) - _, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + _, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson)) assert.NoError(t, err) if err = mock.ExpectationsWereMet(); err != nil { @@ -178,8 +178,8 @@ func TestHighlighter(t *testing.T) { AddRow("text-to-highlight", "text-to-highlight", "text-to-highlight"). AddRow("text", "text", "text")) - queryRunner := NewQueryRunner(lm) - response, err := queryRunner.handleSearch(ctx, tableName, []byte(query), cfg, nil, managementConsole) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + response, err := queryRunner.handleSearch(ctx, tableName, []byte(query)) assert.NoError(t, err) if err = mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) diff --git a/quesma/quesma/search_test.go b/quesma/quesma/search_test.go index 8cb411fc4..9ca9a0a66 100644 --- a/quesma/quesma/search_test.go +++ b/quesma/quesma/search_test.go @@ -99,8 +99,8 @@ func TestAsyncSearchHandler(t *testing.T) { } mock.ExpectQuery(wantedRegex).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm) - _, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + _, err = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryJson), defaultAsyncSearchTimeout, true) assert.NoError(t, err) if err := mock.ExpectationsWereMet(); err != nil { @@ -138,8 +138,8 @@ func TestAsyncSearchHandlerSpecialCharacters(t *testing.T) { mock.ExpectQuery(testdata.EscapeBrackets(expectedSql)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm) - _, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryRequestJson), nil, managementConsole, defaultAsyncSearchTimeout, true) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + _, err = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryRequestJson), defaultAsyncSearchTimeout, true) assert.NoError(t, err) if err = mock.ExpectationsWereMet(); err != nil { @@ -181,8 +181,8 @@ func TestSearchHandler(t *testing.T) { mock.ExpectQuery(testdata.EscapeWildcard(testdata.EscapeBrackets(wantedRegex))). WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm) - _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson)) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) @@ -208,8 +208,8 @@ func TestSearchHandlerNoAttrsConfig(t *testing.T) { for _, wantedRegex := range tt.WantedRegexes { mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm) - _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson)) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) @@ -234,8 +234,8 @@ func TestAsyncSearchFilter(t *testing.T) { for _, wantedRegex := range tt.WantedRegexes { mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm) - _, _ = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + _, _ = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryJson), defaultAsyncSearchTimeout, true) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) } @@ -308,8 +308,8 @@ func TestHandlingDateTimeFields(t *testing.T) { mock.ExpectQuery(testdata.EscapeBrackets(expectedSelectStatementRegex[fieldName])). WillReturnRows(sqlmock.NewRows([]string{"key", "doc_count"})) // .AddRow(1000, uint64(10)).AddRow(1001, uint64(20))) // here rows should be added if uint64 were supported - queryRunner := NewQueryRunner(lm) - response, err := queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(query(fieldName)), nil, managementConsole, defaultAsyncSearchTimeout, true) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) + response, err := queryRunner.handleAsyncSearch(ctx, tableName, []byte(query(fieldName)), defaultAsyncSearchTimeout, true) assert.NoError(t, err) var responseMap model.JsonMap @@ -363,12 +363,12 @@ func TestNumericFacetsQueries(t *testing.T) { // Don't care about the query's SQL in this test, it's thoroughly tested in different tests, thus "" mock.ExpectQuery("").WillReturnRows(returnedBuckets) - queryRunner := NewQueryRunner(lm) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) var response []byte if handlerName == "handleSearch" { - response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) + response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson)) } else if handlerName == "handleAsyncSearch" { - response, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true) + response, err = queryRunner.handleAsyncSearch(ctx, tableName, []byte(tt.QueryJson), defaultAsyncSearchTimeout, true) } assert.NoError(t, err) @@ -420,9 +420,9 @@ func TestAllUnsupportedQueryTypesAreProperlyRecorded(t *testing.T) { managementConsole := ui.NewQuesmaManagementConsole(cfg, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent()) go managementConsole.RunOnlyChannelProcessor() - queryRunner := NewQueryRunner(lm) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId()) - _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson), cfg, nil, managementConsole) + _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson)) for _, queryType := range model.AggregationQueryTypes { if queryType != tt.AggregationName { @@ -471,10 +471,10 @@ func TestDifferentUnsupportedQueries(t *testing.T) { managementConsole := ui.NewQuesmaManagementConsole(cfg, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent()) go managementConsole.RunOnlyChannelProcessor() - queryRunner := NewQueryRunner(lm) + queryRunner := NewQueryRunner(lm, cfg, nil, managementConsole) for _, testNr := range testNrs { newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId()) - _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson), cfg, nil, managementConsole) + _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson)) }