Skip to content

Commit

Permalink
Handle /_async_search/status/:id endpoint (#982)
Browse files Browse the repository at this point in the history
Initially this meant to be just addition of an endpoint, but eventually
it exposed quite serious bug in `quesma/quesma/router.go`.


### 1. Adding `/_async_search/status/:id` handling 

Before that change, requests targeting this endpoint were routed to
Elasticsearch which of course had no recollection of our own async IDs
`quesma_async_019329bf-4330-7007-970d-e87734bca7c3`.

This fixes the following behavior, esp. visible when querying larger
data sets:
<img width="1425" alt="image"
src="https://github.com/user-attachments/assets/c2bd4720-3377-44f5-9dee-5b926d20184f">

### 2. Fixing `router.go`

The root cause of this bug was in `findHandler` function of
`quesma/mux/mux.go`. Basically, whenever we registered same route twice
with diffrent matchers (e.g. for two separate HTTP methods),
`findHandler` always ditched the second registered func. See screenshot
below:
<img width="724" alt="image"
src="https://github.com/user-attachments/assets/9b2c022a-eb0f-42f7-a1ed-c7d1547ae325">


Of course I had to add a pretty large test to make sure I don't break
stuff.

Closes: #995
  • Loading branch information
mieciu authored Nov 15, 2024
1 parent ac1152b commit d36b617
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 44 deletions.
2 changes: 1 addition & 1 deletion quesma/model/search_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type AsyncSearchEntireResp struct {
// For example, 200 indicates that the async search was successfully completed.
// 503 indicates that the async search was completed with an error.
CompletionStatus *int `json:"completion_status,omitempty"`
Response SearchResp `json:"response"`
Response SearchResp `json:"response,omitempty"`
}

func (response *AsyncSearchEntireResp) Marshal() ([]byte, error) {
Expand Down
17 changes: 17 additions & 0 deletions quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func EmptyAsyncSearchResponse(id string, isPartial bool, completionStatus int) (
return asyncSearchResp.Marshal() // error should never ever happen here
}

func EmptyAsyncSearchStatusResponse(id string, isPartial, isRunning bool, completionStatus int) ([]byte, error) {
asyncSearchResp := AsyncSearchStatusResponse(id, isPartial, isRunning, completionStatus)
return asyncSearchResp.Marshal()
}

func (cw *ClickhouseQueryTranslator) MakeAsyncSearchResponse(ResultSet []model.QueryResultRow, query *model.Query, asyncId string, isPartial bool) (*model.AsyncSearchEntireResp, error) {
searchResponse := cw.MakeSearchResponse([]*model.Query{query}, [][]model.QueryResultRow{ResultSet})
id := new(string)
Expand Down Expand Up @@ -272,6 +277,18 @@ func SearchToAsyncSearchResponse(searchResponse *model.SearchResp, asyncId strin
return &response
}

func AsyncSearchStatusResponse(asyncId string, isPartial, isRunning bool, completionStatus int) *model.AsyncSearchEntireResp {
response := model.AsyncSearchEntireResp{
ID: &asyncId,
IsPartial: isPartial,
IsRunning: isRunning,
}
if completionStatus == 200 || completionStatus == 503 {
response.CompletionStatus = &completionStatus
}
return &response
}

func (cw *ClickhouseQueryTranslator) BuildCountQuery(whereClause model.Expr, sampleLimit int) *model.Query {
return &model.Query{
SelectCommand: *model.NewSelectCommand(
Expand Down
26 changes: 11 additions & 15 deletions quesma/quesma/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,36 +86,32 @@ func (p *PathRouter) Register(pattern string, predicate RequestMatcher, handler

}

func (p *PathRouter) Matches(req *Request) (Handler, bool, *table_resolver.Decision) {
handler, found, decision := p.findHandler(req)
if found {
func (p *PathRouter) Matches(req *Request) (Handler, *table_resolver.Decision) {
handler, decision := p.findHandler(req)
if handler != nil {
routerStatistics.addMatched(req.Path)
logger.Debug().Msgf("Matched path: %s", req.Path)
return handler, true, decision
return handler, decision
} else {
routerStatistics.addUnmatched(req.Path)
logger.Debug().Msgf("Non-matched path: %s", req.Path)
return handler, false, decision
return handler, decision
}
}

func (p *PathRouter) findHandler(req *Request) (Handler, bool, *table_resolver.Decision) {
func (p *PathRouter) findHandler(req *Request) (handler Handler, decision *table_resolver.Decision) {
path := strings.TrimSuffix(req.Path, "/")
for _, m := range p.mappings {
meta, match := m.compiledPath.Match(path)

if match {
req.Params = meta.Params
if pathData, pathMatches := m.compiledPath.Match(path); pathMatches {
req.Params = pathData.Params
predicateResult := m.predicate.Matches(req)

if predicateResult.Matched {
return m.handler, true, predicateResult.Decision
} else {
return nil, false, predicateResult.Decision
handler = m.handler
decision = predicateResult.Decision
}
}
}
return nil, false, nil
return handler, decision
}

type httpMethodPredicate struct {
Expand Down
8 changes: 4 additions & 4 deletions quesma/quesma/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func TestPathRouter_Matches_ShouldIgnoreTrailingSlash(t *testing.T) {
t.Run(tt.httpMethod+" "+tt.path, func(t *testing.T) {

req := toRequest(tt.path, tt.httpMethod, tt.body)
_, found, _ := router.Matches(req)
assert.Equalf(t, tt.want, found, "Matches(%v, %v, %v)", tt.path, tt.httpMethod, tt.body)
handler, _ := router.Matches(req)
assert.Equalf(t, tt.want, handler != nil, "Matches(%v, %v, %v)", tt.path, tt.httpMethod, tt.body)
})
}

Expand All @@ -61,8 +61,8 @@ func TestShouldMatchMultipleHttpMethods(t *testing.T) {

req := toRequest(tt.path, tt.httpMethod, tt.body)

_, found, _ := router.Matches(req)
assert.Equalf(t, tt.want, found, "Matches(%v, %v, %v)", tt.path, tt.httpMethod, tt.body)
handler, _ := router.Matches(req)
assert.Equalf(t, tt.want, handler != nil, "Matches(%v, %v, %v)", tt.path, tt.httpMethod, tt.body)
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, logManager *clickhous
// tests should not be run with optimization enabled by default
queryRunner.EnableQueryOptimization(config)

router := configureRouter(config, schemaRegistry, logManager, ingestProcessor, quesmaManagementConsole, phoneHomeAgent, queryRunner, resolver)
router := ConfigureRouter(config, schemaRegistry, logManager, ingestProcessor, quesmaManagementConsole, phoneHomeAgent, queryRunner, resolver)
return &Quesma{
telemetryAgent: phoneHomeAgent,
processor: newDualWriteProxy(schemaLoader, logManager, indexManager, schemaRegistry, config, router, quesmaManagementConsole, phoneHomeAgent, queryRunner),
Expand Down Expand Up @@ -213,15 +213,15 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R

quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)

handler, found, decision := router.Matches(quesmaRequest)
handler, decision := router.Matches(quesmaRequest)

if decision != nil {
w.Header().Set(quesmaTableResolverHeader, decision.String())
} else {
w.Header().Set(quesmaTableResolverHeader, "n/a")
}

if found {
if handler != nil {
quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) {
return handler(ctx, quesmaRequest)
})
Expand Down
13 changes: 10 additions & 3 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"time"
)

func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, console *ui.QuesmaManagementConsole, phoneHomeAgent telemetry.PhoneHomeAgent, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) *mux.PathRouter {
func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, console *ui.QuesmaManagementConsole, phoneHomeAgent telemetry.PhoneHomeAgent, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) *mux.PathRouter {

// some syntactic sugar
method := mux.IsHTTPMethod
Expand Down Expand Up @@ -229,6 +229,14 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return getIndexMappingResult(index, mappings)
})

router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
responseBody, err := queryRunner.handleAsyncSearchStatus(ctx, req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})

router.Register(routes.AsyncSearchIdPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"])
Expand Down Expand Up @@ -337,8 +345,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return getIndexResult(index, mappings)
})

router.Register(routes.QuesmaTableResolverPath, mux.Always(), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {

router.Register(routes.QuesmaTableResolverPath, method("GET"), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
indexPattern := req.Params["index"]

decisions := make(map[string]*table_resolver.Decision)
Expand Down
116 changes: 116 additions & 0 deletions quesma/quesma/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,18 @@
package quesma

import (
"fmt"
"github.com/stretchr/testify/assert"
"quesma/clickhouse"
"quesma/ingest"
"quesma/quesma/config"
"quesma/quesma/mux"
"quesma/quesma/routes"
"quesma/quesma/ui"
"quesma/schema"
"quesma/table_resolver"
"quesma/telemetry"
"strings"
"testing"
)

Expand Down Expand Up @@ -263,3 +270,112 @@ func Test_matchedAgainstBulkBody(t *testing.T) {
})
}
}

const testIndexName = "indexName"

func TestConfigureRouter(t *testing.T) {
cfg := &config.QuesmaConfiguration{
IndexConfig: map[string]config.IndexConfiguration{
testIndexName: {
Name: testIndexName,
},
},
}
tr := TestTableResolver{}
testRouter := ConfigureRouter(cfg, schema.NewSchemaRegistry(fixedTableProvider{}, cfg, clickhouse.SchemaTypeAdapter{}), &clickhouse.LogManager{}, &ingest.IngestProcessor{}, &ui.QuesmaManagementConsole{}, telemetry.NewPhoneHomeAgent(cfg, nil, ""), &QueryRunner{}, tr)

tests := []struct {
path string
method string
shouldReturnHandler bool
}{
// Routes explicitly registered in the router code
{routes.ClusterHealthPath, "GET", true},
// {routes.BulkPath, "POST", true}, // TODO later on, it requires body parsing
{routes.IndexRefreshPath, "POST", true},
{routes.IndexDocPath, "POST", true},
{routes.IndexBulkPath, "POST", true},
{routes.IndexBulkPath, "PUT", true},
{routes.ResolveIndexPath, "GET", true},
{routes.IndexCountPath, "GET", true},
{routes.GlobalSearchPath, "GET", false},
{routes.GlobalSearchPath, "POST", false},
{routes.GlobalSearchPath, "PUT", false},
{routes.IndexSearchPath, "GET", true},
{routes.IndexSearchPath, "POST", true},
{routes.IndexAsyncSearchPath, "POST", true},
{routes.IndexMappingPath, "PUT", true},
{routes.IndexMappingPath, "GET", true},
{routes.AsyncSearchStatusPath, "GET", true},
{routes.AsyncSearchIdPath, "GET", true},
{routes.AsyncSearchIdPath, "DELETE", true},
{routes.FieldCapsPath, "GET", true},
{routes.FieldCapsPath, "POST", true},
{routes.TermsEnumPath, "POST", true},
{routes.EQLSearch, "GET", true},
{routes.EQLSearch, "POST", true},
{routes.IndexPath, "PUT", true},
{routes.IndexPath, "GET", true},
{routes.QuesmaTableResolverPath, "GET", true},
// Few cases where the router should not match
{"/invalid/path", "GET", false},
{routes.ClusterHealthPath, "POST", false},
//{routes.BulkPath, "GET", false}, // TODO later on, it requires body parsing
{routes.IndexRefreshPath, "GET", false},
{routes.IndexDocPath, "GET", false},
{routes.IndexBulkPath, "DELETE", false},
{routes.ResolveIndexPath, "POST", false},
{routes.IndexCountPath, "POST", false},
{routes.IndexSearchPath, "DELETE", false},
{routes.IndexAsyncSearchPath, "GET", false},
{routes.IndexMappingPath, "POST", false},
{routes.AsyncSearchStatusPath, "POST", false},
{routes.AsyncSearchIdPath, "PUT", false},
{routes.FieldCapsPath, "DELETE", false},
{routes.TermsEnumPath, "GET", false},
{routes.EQLSearch, "DELETE", false},
{routes.IndexPath, "POST", false},
{routes.QuesmaTableResolverPath, "POST", false},
{routes.QuesmaTableResolverPath, "PUT", false},
{routes.QuesmaTableResolverPath, "DELETE", false},
}

for _, tt := range tests {
tt.path = strings.Replace(tt.path, ":id", "quesma_async_absurd_test_id", -1)
tt.path = strings.Replace(tt.path, ":index", testIndexName, -1)
t.Run(tt.method+"-at-"+tt.path, func(t *testing.T) {
req := &mux.Request{Path: tt.path, Method: tt.method}
reqHandler, _ := testRouter.Matches(req)
assert.Equal(t, tt.shouldReturnHandler, reqHandler != nil, "Expected route match result for path: %s and method: %s", tt.path, tt.method)
})
}
}

// TestTableResolver should be used only within tests
type TestTableResolver struct{}

func (t TestTableResolver) Start() {}

func (t TestTableResolver) Stop() {}

func (t TestTableResolver) Resolve(_ string, indexPattern string) *table_resolver.Decision {
if indexPattern == testIndexName {
return &table_resolver.Decision{
UseConnectors: []table_resolver.ConnectorDecision{
&table_resolver.ConnectorDecisionClickhouse{},
},
}
} else {
return &table_resolver.Decision{
Err: fmt.Errorf("TestTableResolver err"),
Reason: "TestTableResolver reason",
ResolverName: "TestTableResolver",
}
}
}

func (t TestTableResolver) Pipelines() []string { return []string{} }

func (t TestTableResolver) RecentDecisions() []table_resolver.PatternDecisions {
return []table_resolver.PatternDecisions{}
}
37 changes: 19 additions & 18 deletions quesma/quesma/routes/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@ import (
)

const (
GlobalSearchPath = "/_search"
IndexSearchPath = "/:index/_search"
IndexAsyncSearchPath = "/:index/_async_search"
IndexCountPath = "/:index/_count"
IndexDocPath = "/:index/_doc"
IndexRefreshPath = "/:index/_refresh"
IndexBulkPath = "/:index/_bulk"
IndexMappingPath = "/:index/_mapping"
FieldCapsPath = "/:index/_field_caps"
TermsEnumPath = "/:index/_terms_enum"
EQLSearch = "/:index/_eql/search"
ResolveIndexPath = "/_resolve/index/:index"
ClusterHealthPath = "/_cluster/health"
BulkPath = "/_bulk"
AsyncSearchIdPrefix = "/_async_search/"
AsyncSearchIdPath = "/_async_search/:id"
KibanaInternalPrefix = "/.kibana_"
IndexPath = "/:index"
GlobalSearchPath = "/_search"
IndexSearchPath = "/:index/_search"
IndexAsyncSearchPath = "/:index/_async_search"
IndexCountPath = "/:index/_count"
IndexDocPath = "/:index/_doc"
IndexRefreshPath = "/:index/_refresh"
IndexBulkPath = "/:index/_bulk"
IndexMappingPath = "/:index/_mapping"
FieldCapsPath = "/:index/_field_caps"
TermsEnumPath = "/:index/_terms_enum"
EQLSearch = "/:index/_eql/search"
ResolveIndexPath = "/_resolve/index/:index"
ClusterHealthPath = "/_cluster/health"
BulkPath = "/_bulk"
AsyncSearchIdPrefix = "/_async_search/"
AsyncSearchIdPath = "/_async_search/:id"
AsyncSearchStatusPath = "/_async_search/status/:id"
KibanaInternalPrefix = "/.kibana_"
IndexPath = "/:index"

// Quesma internal paths

Expand Down
10 changes: 10 additions & 0 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,16 @@ func (q *QueryRunner) asyncQueriesCumulatedBodySize() int {
return size
}

func (q *QueryRunner) handleAsyncSearchStatus(_ context.Context, id string) ([]byte, error) {
if _, ok := q.AsyncRequestStorage.Load(id); ok { // there IS a result in storage, so query is completed/no longer running,
return queryparser.EmptyAsyncSearchStatusResponse(id, false, false, 200)
} else { // there is no result so query is might be(*) running
return queryparser.EmptyAsyncSearchStatusResponse(id, true, true, 0) // 0 is a placeholder for missing completion status
}
// (*) - it is an oversimplification as we're responding with "still running" status even for queries that might not exist.
// However since you're referring to async ID given from Quesma, we naively assume it *does* exist.
}

func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) {
if !strings.Contains(id, tracing.AsyncIdPrefix) {
logger.ErrorWithCtx(ctx).Msgf("non quesma async id: %v", id)
Expand Down

0 comments on commit d36b617

Please sign in to comment.