Skip to content

Commit

Permalink
Get rid of additional QueryRunner (#1156)
Browse files Browse the repository at this point in the history
Changing the QueryRunner so that it relies on
`clickhouse.LogManagerIFace` instead of `*clickhouse.LogManager`.

Having that, we won't need a duplicate of `QueryRunner` in
#1119.

Caveat: we're adding a that dumb dummy table resolver, but that's way
simpler than having a copy of QueryRunner.

Lots of LoCs to save here 😉
  • Loading branch information
mieciu authored Jan 7, 2025
1 parent c54fcc2 commit bdef811
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 17 deletions.
3 changes: 3 additions & 0 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type LogManagerIFace interface {
ResolveIndexPattern(ctx context.Context, schema schema.Registry, pattern string) (results []string, err error)
FindTable(tableName string) (result *Table)
ProcessQuery(ctx context.Context, table *Table, query *model.Query) (rows []model.QueryResultRow, performanceResult PerformanceResult, err error)
CountMultiple(ctx context.Context, tables ...string) (int64, error)
Count(ctx context.Context, table string) (int64, error)
GetTableDefinitions() (TableMap, error)
}

func NewTableMap() *TableMap {
Expand Down
2 changes: 1 addition & 1 deletion quesma/eql/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// It implements quesma.IQueryTranslator for EQL queries.

type ClickhouseEQLQueryTranslator struct {
ClickhouseLM *clickhouse.LogManager
ClickhouseLM clickhouse.LogManagerIFace
Table *clickhouse.Table
Ctx context.Context
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
QueryLanguageEQL = "eql"
)

func NewQueryTranslator(ctx context.Context, language QueryLanguage, schema schema.Schema, table *clickhouse.Table, logManager *clickhouse.LogManager, dateMathRenderer string, indexes []string, configuration *config.QuesmaConfiguration) (queryTranslator IQueryTranslator) {
func NewQueryTranslator(ctx context.Context, language QueryLanguage, schema schema.Schema, table *clickhouse.Table, logManager clickhouse.LogManagerIFace, dateMathRenderer string, indexes []string, configuration *config.QuesmaConfiguration) (queryTranslator IQueryTranslator) {
switch language {
case QueryLanguageEQL:
return &eql.ClickhouseEQLQueryTranslator{ClickhouseLM: logManager, Table: table, Ctx: ctx}
Expand Down
35 changes: 20 additions & 15 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type QueryRunner struct {
cancel context.CancelFunc
AsyncRequestStorage async_search_storage.AsyncRequestResultStorage
AsyncQueriesContexts async_search_storage.AsyncQueryContextStorage
logManager *clickhouse.LogManager
logManager clickhouse.LogManagerIFace
cfg *config.QuesmaConfiguration
im elasticsearch.IndexManagement
debugInfoCollector diag.DebugInfoCollector
Expand Down Expand Up @@ -83,7 +83,7 @@ func (q *QueryRunner) EnableQueryOptimization(cfg *config.QuesmaConfiguration) {
q.transformationPipeline.transformers = append(q.transformationPipeline.transformers, optimize.NewOptimizePipeline(cfg))
}

func NewQueryRunner(lm *clickhouse.LogManager,
func NewQueryRunner(lm clickhouse.LogManagerIFace,
cfg *config.QuesmaConfiguration,
im elasticsearch.IndexManagement,
qmc diag.DebugInfoCollector,
Expand Down Expand Up @@ -360,7 +360,10 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
var responseBody []byte

startTime := time.Now()
id := ctx.Value(tracing.RequestIdCtxKey).(string)
id := "FAKE_ID"
if val := ctx.Value(tracing.RequestIdCtxKey); val != nil {
id = val.(string)
}
path := ""
if value := ctx.Value(tracing.RequestPath); value != nil {
if str, ok := value.(string); ok {
Expand Down Expand Up @@ -480,8 +483,6 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
func (q *QueryRunner) storeAsyncSearch(qmc diag.DebugInfoCollector, id, asyncId string,
startTime time.Time, path string, body types.JSON, result asyncSearchWithError, keep bool, opaqueId string) (responseBody []byte, err error) {

took := time.Since(startTime)
bodyAsBytes, _ := body.Bytes()
if result.err == nil {
okStatus := 200
asyncResponse := queryparser.SearchToAsyncSearchResponse(result.response, asyncId, false, &okStatus)
Expand All @@ -491,16 +492,20 @@ func (q *QueryRunner) storeAsyncSearch(qmc diag.DebugInfoCollector, id, asyncId
err = result.err
}

qmc.PushSecondaryInfo(&diag.QueryDebugSecondarySource{
Id: id,
AsyncId: asyncId,
OpaqueId: opaqueId,
Path: path,
IncomingQueryBody: bodyAsBytes,
QueryBodyTranslated: result.translatedQueryBody,
QueryTranslatedResults: responseBody,
SecondaryTook: took,
})
if qmc != nil {
took := time.Since(startTime)
bodyAsBytes, _ := body.Bytes()
qmc.PushSecondaryInfo(&diag.QueryDebugSecondarySource{
Id: id,
AsyncId: asyncId,
OpaqueId: opaqueId,
Path: path,
IncomingQueryBody: bodyAsBytes,
QueryBodyTranslated: result.translatedQueryBody,
QueryTranslatedResults: responseBody,
SecondaryTook: took,
})
}

if keep {
compressedBody := responseBody
Expand Down
39 changes: 39 additions & 0 deletions quesma/table_resolver/table_resolver_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package table_resolver

import (
mux "quesma_v2/core"
)

// DummyTableResolver is a dummy implementation of TableResolver to satisfy the QueryRunner and make it be compatible with the v2 api
// thanks to this we can reuse the existing QueryRunner implementation without any changes.
type DummyTableResolver struct{}

func NewDummyTableResolver() *DummyTableResolver {
return &DummyTableResolver{}
}

func (t DummyTableResolver) Start() {}

func (t DummyTableResolver) Stop() {}

func (t DummyTableResolver) Resolve(_ string, indexPattern string) *mux.Decision {
return &mux.Decision{
UseConnectors: []mux.ConnectorDecision{
&mux.ConnectorDecisionClickhouse{
ClickhouseTableName: indexPattern,
ClickhouseIndexes: []string{indexPattern}, // TODO this won't work for 'common table' feature
//IsCommonTable: false,
},
},
}

}

func (t DummyTableResolver) Pipelines() []string { return []string{} }

func (t DummyTableResolver) RecentDecisions() []mux.PatternDecisions {
return []mux.PatternDecisions{}
}

0 comments on commit bdef811

Please sign in to comment.