Skip to content

Commit

Permalink
Removing redundant log manager parameter (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski authored May 7, 2024
1 parent 24d5a52 commit 8ed5e55
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 33 deletions.
10 changes: 5 additions & 5 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
})

router.RegisterPathMatcher(routes.IndexCountPath, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string) (*mux.Result, error) {
cnt, err := queryRunner.handleCount(ctx, params["index"], lm)
cnt, err := queryRunner.handleCount(ctx, params["index"])
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand All @@ -142,7 +142,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
router.RegisterPathMatcher(routes.GlobalSearchPath, "POST", func(_ map[string]string, _ string) bool {
return true // for now, always route to Quesma, in the near future: combine results from both sources
}, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body), cfg, lm, im, console)
responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body), cfg, im, console)
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand All @@ -154,7 +154,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
})

router.RegisterPathMatcher(routes.IndexSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body), cfg, lm, im, console)
responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body), cfg, im, console)
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand All @@ -179,7 +179,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
keepOnCompletion = true
}
}
responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), lm, im, console, waitForResultsMs, keepOnCompletion)
responseBody, err := queryRunner.handleAsyncSearch(ctx, cfg, params["index"], []byte(body), im, console, waitForResultsMs, keepOnCompletion)
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand Down Expand Up @@ -240,7 +240,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
})

router.RegisterPathMatcher(routes.EQLSearch, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body), cfg, lm, im, console)
responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body), cfg, im, console)
if err != nil {
if errors.Is(errIndexNotExists, err) {
return &mux.Result{StatusCode: 404}, nil
Expand Down
29 changes: 13 additions & 16 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func NewAsyncQueryContext(ctx context.Context, cancel context.CancelFunc, id str
}

// returns -1 when table name could not be resolved
func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string, lm *clickhouse.LogManager) (int64, error) {
indexes := lm.ResolveIndexes(ctx, indexPattern)
func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int64, error) {
indexes := q.logManager.ResolveIndexes(ctx, indexPattern)
if len(indexes) == 0 {
if elasticsearch.IsIndexPattern(indexPattern) {
return 0, nil
Expand All @@ -72,29 +72,27 @@ func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string, lm *
}

if len(indexes) == 1 {
return lm.Count(ctx, indexes[0])
return q.logManager.Count(ctx, indexes[0])
} else {
return lm.CountMultiple(ctx, indexes...)
return q.logManager.CountMultiple(ctx, indexes...)
}
}

func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body []byte,
cfg config.QuesmaConfiguration,
lm *clickhouse.LogManager,
im elasticsearch.IndexManagement,
quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) {
return q.handleSearchCommon(ctx, cfg, indexPattern, body, lm, im, quesmaManagementConsole, nil, QueryLanguageDefault)
return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageDefault)
}

func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, body []byte,
cfg config.QuesmaConfiguration,
lm *clickhouse.LogManager,
im elasticsearch.IndexManagement,
quesmaManagementConsole *ui.QuesmaManagementConsole) ([]byte, error) {
return q.handleSearchCommon(ctx, cfg, indexPattern, body, lm, im, quesmaManagementConsole, nil, QueryLanguageEQL)
return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, nil, QueryLanguageEQL)
}

func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte, lm *clickhouse.LogManager,
func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte,
im elasticsearch.IndexManagement, quesmaManagementConsole *ui.QuesmaManagementConsole, waitForResultsMs int, keepOnCompletion bool) ([]byte, error) {
async := AsyncQuery{
asyncRequestIdStr: generateAsyncRequestId(),
Expand All @@ -105,7 +103,7 @@ func (q *QueryRunner) handleAsyncSearch(ctx context.Context, cfg config.QuesmaCo
}
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, async.asyncRequestIdStr)
logger.InfoWithCtx(ctx).Msgf("async search request id: %s started", async.asyncRequestIdStr)
return q.handleSearchCommon(ctx, cfg, indexPattern, body, lm, im, quesmaManagementConsole, &async, QueryLanguageDefault)
return q.handleSearchCommon(ctx, cfg, indexPattern, body, im, quesmaManagementConsole, &async, QueryLanguageDefault)
}

type AsyncSearchWithError struct {
Expand All @@ -123,11 +121,10 @@ type AsyncQuery struct {
}

func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaConfiguration, indexPattern string, body []byte,
lm *clickhouse.LogManager,
im elasticsearch.IndexManagement,
qmc *ui.QuesmaManagementConsole, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) {

sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, cfg, im, lm)
sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, cfg, im, q.logManager)

switch sources {
case sourceBoth:
Expand Down Expand Up @@ -185,7 +182,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
newAggregationHandlingUsed := false
hitsPresent := false

tables := lm.GetTableDefinitions()
tables := q.logManager.GetTableDefinitions()

for _, resolvedTableName := range sourcesClickhouse {
var queryTranslator IQueryTranslator
Expand All @@ -199,7 +196,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC

var simpleQuery queryparser.SimpleQuery

queryTranslator = NewQueryTranslator(ctx, queryLanguage, table, lm)
queryTranslator = NewQueryTranslator(ctx, queryLanguage, table, q.logManager)

simpleQuery, queryInfo, highlighter = queryTranslator.ParseQuery(string(body))

Expand Down Expand Up @@ -249,14 +246,14 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, cfg config.QuesmaC
fieldName = "*"
}
listQuery := queryTranslator.BuildNRowsQuery(fieldName, simpleQuery, queryInfo.Size)
hitsFallback, err = lm.ProcessSelectQuery(ctx, table, listQuery)
hitsFallback, err = q.logManager.ProcessSelectQuery(ctx, table, listQuery)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing fallback query. Err: %v, query: %+v", err, listQuery)
pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime)
return responseBody, err
}
countQuery := queryTranslator.BuildSimpleCountQuery(simpleQuery.Sql.Stmt)
countResult, err := lm.ProcessSelectQuery(ctx, table, countQuery)
countResult, err := q.logManager.ProcessSelectQuery(ctx, table, countQuery)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("error processing count query. Err: %v, query: %+v", err, countQuery)
pushSecondaryInfo(qmc, id, path, body, translatedQueryBody, responseBody, startTime)
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/search_opensearch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestSearchOpensearch(t *testing.T) {
}

queryRunner := NewQueryRunner(lm)
_, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole)
_, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)
assert.NoError(t, err)

if err = mock.ExpectationsWereMet(); err != nil {
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestHighlighter(t *testing.T) {
AddRow("text", "text", "text"))

queryRunner := NewQueryRunner(lm)
response, err := queryRunner.handleSearch(ctx, tableName, []byte(query), cfg, lm, nil, managementConsole)
response, err := queryRunner.handleSearch(ctx, tableName, []byte(query), cfg, nil, managementConsole)
assert.NoError(t, err)
if err = mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down
20 changes: 10 additions & 10 deletions quesma/quesma/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestAsyncSearchHandler(t *testing.T) {
mock.ExpectQuery(wantedRegex).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true)
_, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
assert.NoError(t, err)

if err := mock.ExpectationsWereMet(); err != nil {
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestAsyncSearchHandlerSpecialCharacters(t *testing.T) {
}

queryRunner := NewQueryRunner(lm)
_, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryRequestJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true)
_, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryRequestJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
assert.NoError(t, err)

if err = mock.ExpectationsWereMet(); err != nil {
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestSearchHandler(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand All @@ -209,7 +209,7 @@ func TestSearchHandlerNoAttrsConfig(t *testing.T) {
mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole)
_, _ = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand All @@ -235,7 +235,7 @@ func TestAsyncSearchFilter(t *testing.T) {
mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"}))
}
queryRunner := NewQueryRunner(lm)
_, _ = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true)
_, _ = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
}
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestHandlingDateTimeFields(t *testing.T) {
WillReturnRows(sqlmock.NewRows([]string{"key", "doc_count"}))
// .AddRow(1000, uint64(10)).AddRow(1001, uint64(20))) // here rows should be added if uint64 were supported
queryRunner := NewQueryRunner(lm)
response, err := queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(query(fieldName)), lm, nil, managementConsole, defaultAsyncSearchTimeout, true)
response, err := queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(query(fieldName)), nil, managementConsole, defaultAsyncSearchTimeout, true)
assert.NoError(t, err)

var responseMap model.JsonMap
Expand Down Expand Up @@ -366,9 +366,9 @@ func TestNumericFacetsQueries(t *testing.T) {
queryRunner := NewQueryRunner(lm)
var response []byte
if handlerName == "handleSearch" {
response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, lm, nil, managementConsole)
response, err = queryRunner.handleSearch(ctx, tableName, []byte(tt.QueryJson), cfg, nil, managementConsole)
} else if handlerName == "handleAsyncSearch" {
response, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), lm, nil, managementConsole, defaultAsyncSearchTimeout, true)
response, err = queryRunner.handleAsyncSearch(ctx, cfg, tableName, []byte(tt.QueryJson), nil, managementConsole, defaultAsyncSearchTimeout, true)
}
assert.NoError(t, err)

Expand Down Expand Up @@ -422,7 +422,7 @@ func TestAllUnsupportedQueryTypesAreProperlyRecorded(t *testing.T) {

queryRunner := NewQueryRunner(lm)
newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId())
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson), cfg, lm, nil, managementConsole)
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson), cfg, nil, managementConsole)

for _, queryType := range model.AggregationQueryTypes {
if queryType != tt.AggregationName {
Expand Down Expand Up @@ -474,7 +474,7 @@ func TestDifferentUnsupportedQueries(t *testing.T) {
queryRunner := NewQueryRunner(lm)
for _, testNr := range testNrs {
newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId())
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson), cfg, lm, nil, managementConsole)
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson), cfg, nil, managementConsole)

}

Expand Down

0 comments on commit 8ed5e55

Please sign in to comment.