From 8ed5e55757e773c1d54c8887357eefc48ad4e618 Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Tue, 7 May 2024 16:13:32 +0200 Subject: [PATCH] Removing redundant log manager parameter (#52) --- quesma/quesma/router.go | 10 ++++----- quesma/quesma/search.go | 29 +++++++++++-------------- quesma/quesma/search_opensearch_test.go | 4 ++-- quesma/quesma/search_test.go | 20 ++++++++--------- 4 files changed, 30 insertions(+), 33 deletions(-) diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index acca9cf03..9ba39a299 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -123,7 +123,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, }) router.RegisterPathMatcher(routes.IndexCountPath, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string) (*mux.Result, error) { - cnt, err := queryRunner.handleCount(ctx, params["index"], lm) + cnt, err := queryRunner.handleCount(ctx, params["index"]) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil @@ -142,7 +142,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, router.RegisterPathMatcher(routes.GlobalSearchPath, "POST", func(_ map[string]string, _ string) bool { return true // for now, always route to Quesma, in the near future: combine results from both sources }, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { - responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body), cfg, lm, im, console) + responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body), cfg, im, console) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil @@ -154,7 +154,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, }) router.RegisterPathMatcher(routes.IndexSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { - responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body), cfg, lm, im, console) + responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body), cfg, im, console) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil @@ -179,7 +179,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, keepOnCompletion = true } } - responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), lm, im, console, waitForResultsMs, keepOnCompletion) + responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), im, console, waitForResultsMs, keepOnCompletion) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil @@ -240,7 +240,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, }) router.RegisterPathMatcher(routes.EQLSearch, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { - responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body), cfg, lm, im, console) + responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body), cfg, im, console) if err != nil { if errors.Is(errIndexNotExists, err) { return &mux.Result{StatusCode: 404}, nil diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 109d8a814..b72df08d6 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -60,8 +60,8 @@ func NewAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, id str } // returns -1 when table name could not be resolved -func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string, lm *clickhouse.LogManager) (int64, error) { - indexes := lm.ResolveIndexes(ctx, indexPattern) +func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int64, error) { + indexes := q.logManager.ResolveIndexes(ctx, indexPattern) if len(indexes) == 0 { if elasticsearch.IsIndexPattern(indexPattern) { return 0, nil @@ -72,29 +72,27 @@ func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string, lm * } if len(indexes) == 1 { - return lm.Count(ctx, indexes[0]) + return q.logManager.Count(ctx, indexes[0]) } else { - return lm.CountMultiple(ctx, indexes...) + return q.logManager.CountMultiple(ctx, indexes...) } } func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body []byte, cfg config.QuesmaConfiguration, - lm *clickhouse.LogManager, im elasticsearch.IndexManagement, quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) { - return q.handleSearchCommon(ctx, cfg, indexPattern, body, lm, im, quesmaManagementConsole, nil, QueryLanguageDefault) + return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageDefault) } func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, body []byte, cfg config.QuesmaConfiguration, - lm *clickhouse.LogManager, im elasticsearch.IndexManagement, quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) { - return q.handleSearchCommon(ctx, cfg, indexPattern, body, lm, im, quesmaManagementConsole, nil, QueryLanguageEQL) + return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageEQL) } -func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte, lm *clickhouse.LogManager, +func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte, im elasticsearch.IndexManagement, quesmaManagementConsole *ui.QuesmaManagementConsole, waitForResultsMs int, keepOnCompletion bool) ([]byte, error) { async := AsyncQuery{ asyncRequestIdStr: generateAsyncRequestId(), @@ -105,7 +103,7 @@ func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaCo } ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, async.asyncRequestIdStr) logger.InfoWithCtx(ctx).Msgf("async search request id: %s started", async.asyncRequestIdStr) - return q.handleSearchCommon(ctx, cfg, indexPattern, body, lm, im, quesmaManagementConsole, &async, QueryLanguageDefault) + return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, &async, QueryLanguageDefault) } type AsyncSearchWithError struct { @@ -123,11 +121,10 @@ type AsyncQuery struct { } func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte, - lm *clickhouse.LogManager, im elasticsearch.IndexManagement, qmc *ui.QuesmaManagementConsole, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) { - sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, cfg, im, lm) + sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, cfg, im, q.logManager) switch sources { case sourceBoth: @@ -185,7 +182,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC newAggregationHandlingUsed := false hitsPresent := false - tables := lm.GetTableDefinitions() + tables := q.logManager.GetTableDefinitions() for _, resolvedTableName := range sourcesClickhouse { var queryTranslator IQueryTranslator @@ -199,7 +196,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC var simpleQuery queryparser.SimpleQuery - queryTranslator = NewQueryTranslator(ctx, queryLanguage, table, lm) + queryTranslator = NewQueryTranslator(ctx, queryLanguage, table, q.logManager) simpleQuery, queryInfo, highlighter = queryTranslator.ParseQuery(string(body)) @@ -249,14 +246,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC fieldName = "*" } listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size) - hitsFallback, err = lm.ProcessSelectQuery(ctx, table, listQuery) + hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery) pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime) return responseBody, err } countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt) - countResult, err := lm.ProcessSelectQuery(ctx, table, countQuery) + countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery) if err != nil { logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery) pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime) diff --git a/quesma/quesma/search_opensearch_test.go b/quesma/quesma/search_opensearch_test.go index 41ff9a0d1..b01476545 100644 --- a/quesma/quesma/search_opensearch_test.go +++ b/quesma/quesma/search_opensearch_test.go @@ -50,7 +50,7 @@ func TestSearchOpensearch(t *testing.T) { } queryRunner := NewQueryRunner(lm) - _, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole) + _, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) assert.NoError(t, err) if err = mock.ExpectationsWereMet(); err != nil { @@ -179,7 +179,7 @@ func TestHighlighter(t *testing.T) { AddRow("text", "text", "text")) queryRunner := NewQueryRunner(lm) - response, err := queryRunner.handleSearch(ctx, tableName, []byte(query), cfg, lm, nil, managementConsole) + response, err := queryRunner.handleSearch(ctx, tableName, []byte(query), cfg, nil, managementConsole) assert.NoError(t, err) if err = mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) diff --git a/quesma/quesma/search_test.go b/quesma/quesma/search_test.go index d95b69433..8cb411fc4 100644 --- a/quesma/quesma/search_test.go +++ b/quesma/quesma/search_test.go @@ -100,7 +100,7 @@ func TestAsyncSearchHandler(t *testing.T) { mock.ExpectQuery(wantedRegex).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } queryRunner := NewQueryRunner(lm) - _, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true) + _, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true) assert.NoError(t, err) if err := mock.ExpectationsWereMet(); err != nil { @@ -139,7 +139,7 @@ func TestAsyncSearchHandlerSpecialCharacters(t *testing.T) { } queryRunner := NewQueryRunner(lm) - _, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryRequestJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true) + _, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryRequestJson), nil, managementConsole, defaultAsyncSearchTimeout, true) assert.NoError(t, err) if err = mock.ExpectationsWereMet(); err != nil { @@ -182,7 +182,7 @@ func TestSearchHandler(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } queryRunner := NewQueryRunner(lm) - _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole) + _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) @@ -209,7 +209,7 @@ func TestSearchHandlerNoAttrsConfig(t *testing.T) { mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } queryRunner := NewQueryRunner(lm) - _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole) + _, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) @@ -235,7 +235,7 @@ func TestAsyncSearchFilter(t *testing.T) { mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } queryRunner := NewQueryRunner(lm) - _, _ = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true) + _, _ = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) } @@ -309,7 +309,7 @@ func TestHandlingDateTimeFields(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{"key", "doc_count"})) // .AddRow(1000, uint64(10)).AddRow(1001, uint64(20))) // here rows should be added if uint64 were supported queryRunner := NewQueryRunner(lm) - response, err := queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(query(fieldName)), lm, nil, managementConsole, defaultAsyncSearchTimeout, true) + response, err := queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(query(fieldName)), nil, managementConsole, defaultAsyncSearchTimeout, true) assert.NoError(t, err) var responseMap model.JsonMap @@ -366,9 +366,9 @@ func TestNumericFacetsQueries(t *testing.T) { queryRunner := NewQueryRunner(lm) var response []byte if handlerName == "handleSearch" { - response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole) + response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole) } else if handlerName == "handleAsyncSearch" { - response, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true) + response, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true) } assert.NoError(t, err) @@ -422,7 +422,7 @@ func TestAllUnsupportedQueryTypesAreProperlyRecorded(t *testing.T) { queryRunner := NewQueryRunner(lm) newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId()) - _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson), cfg, lm, nil, managementConsole) + _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson), cfg, nil, managementConsole) for _, queryType := range model.AggregationQueryTypes { if queryType != tt.AggregationName { @@ -474,7 +474,7 @@ func TestDifferentUnsupportedQueries(t *testing.T) { queryRunner := NewQueryRunner(lm) for _, testNr := range testNrs { newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId()) - _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson), cfg, lm, nil, managementConsole) + _, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson), cfg, nil, managementConsole) }