From d2bc540a4ed522d21f38a06f59be769a07b63f53 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Tue, 14 May 2024 15:33:44 +0200 Subject: [PATCH 1/4] Allow multiple HTTP methods in route definitions (#106) This change adds the capability of adding routing for multiple HTTP methods and is meant to be transparent. Additional mappings will be added in another change! --- quesma/quesma/mux/mux.go | 8 +++++--- quesma/quesma/mux/mux_test.go | 13 ++++++++++++- quesma/quesma/router.go | 34 +++++++++++++++++----------------- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/quesma/quesma/mux/mux.go b/quesma/quesma/mux/mux.go index b78264b72..d585ac5da 100644 --- a/quesma/quesma/mux/mux.go +++ b/quesma/quesma/mux/mux.go @@ -39,9 +39,11 @@ func (p *PathRouter) RegisterPath(pattern, httpMethod string, handler handler) { p.mappings = append(p.mappings, mapping) } -func (p *PathRouter) RegisterPathMatcher(pattern, httpMethod string, predicate MatchPredicate, handler handler) { - mapping := mapping{pattern, urlpath.New(pattern), httpMethod, predicate, handler} - p.mappings = append(p.mappings, mapping) +func (p *PathRouter) RegisterPathMatcher(pattern string, httpMethods []string, predicate MatchPredicate, handler handler) { + for _, httpMethod := range httpMethods { + mapping := mapping{pattern, urlpath.New(pattern), httpMethod, predicate, handler} + p.mappings = append(p.mappings, mapping) + } } func (p *PathRouter) Execute(ctx context.Context, path, body, httpMethod string) (*Result, error) { diff --git a/quesma/quesma/mux/mux_test.go b/quesma/quesma/mux/mux_test.go index ea657e149..3f556c715 100644 --- a/quesma/quesma/mux/mux_test.go +++ b/quesma/quesma/mux/mux_test.go @@ -20,9 +20,20 @@ func TestMatches_ShouldIgnoreTrailingSlash(t *testing.T) { assert.True(t, router.Matches("/indexABC/_bulk", "POST", "")) assert.True(t, router.Matches("/indexABC/_bulk/", "POST", "")) +} + +func TestShouldMatchMultipleHttpMethods(t *testing.T) { + router := NewPathRouter() + router.RegisterPathMatcher("/:index/_bulk", []string{"POST", "GET"}, always, mockHandler) + + assert.True(t, router.Matches("/index1/_bulk", "POST", "")) + assert.True(t, router.Matches("/index1/_bulk", "GET", "")) +} +func always(_ map[string]string, _ string) bool { + return true } -func mockHandler(ctx context.Context, body, uri string, params map[string]string) (*Result, error) { +func mockHandler(_ context.Context, _, _ string, _ map[string]string) (*Result, error) { return &Result{}, nil } diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index e52ac0a23..1203ea108 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -31,33 +31,33 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, httpOk), nil }) - router.RegisterPathMatcher(routes.BulkPath, "POST", matchedAgainstBulkBody(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.BulkPath, []string{"POST"}, matchedAgainstBulkBody(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { results := dualWriteBulk(ctx, nil, body, lm, cfg, phoneHomeAgent) return bulkInsertResult(results), nil }) - router.RegisterPathMatcher(routes.IndexRefreshPath, "POST", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexRefreshPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { return elasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, httpOk), nil }) - router.RegisterPathMatcher(routes.IndexDocPath, "POST", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexDocPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { dualWrite(ctx, params["index"], body, lm, cfg) return indexDocResult(params["index"], httpOk), nil }) - router.RegisterPathMatcher(routes.IndexBulkPath, "POST", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexBulkPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { index := params["index"] results := dualWriteBulk(ctx, &index, body, lm, cfg, phoneHomeAgent) return bulkInsertResult(results), nil }) - router.RegisterPathMatcher(routes.IndexBulkPath, "PUT", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexBulkPath, []string{"PUT"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { index := params["index"] results := dualWriteBulk(ctx, &index, body, lm, cfg, phoneHomeAgent) return bulkInsertResult(results), nil }) - router.RegisterPathMatcher(routes.ResolveIndexPath, "GET", always(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.ResolveIndexPath, []string{"GET"}, always(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { pattern := params["index"] if elasticsearch.IsIndexPattern(pattern) { // todo avoid creating new instances all the time @@ -122,7 +122,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, } }) - router.RegisterPathMatcher(routes.IndexCountPath, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexCountPath, []string{"GET"}, matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string) (*mux.Result, error) { cnt, err := queryRunner.handleCount(ctx, params["index"]) if err != nil { if errors.Is(errIndexNotExists, err) { @@ -139,7 +139,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, } }) - router.RegisterPathMatcher(routes.GlobalSearchPath, "POST", func(_ map[string]string, _ string) bool { + router.RegisterPathMatcher(routes.GlobalSearchPath, []string{"POST"}, func(_ map[string]string, _ string) bool { return true // for now, always route to Quesma, in the near future: combine results from both sources }, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body)) @@ -153,7 +153,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.IndexSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body)) if err != nil { if errors.Is(errIndexNotExists, err) { @@ -164,7 +164,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, } return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.IndexAsyncSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { waitForResultsMs := 1000 // Defaults to 1 second as in docs if v, ok := params["wait_for_completion_timeout"]; ok { if w, err := time.ParseDuration(v); err == nil { @@ -190,7 +190,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.AsyncSearchIdPath, "GET", matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"GET"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"]) responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"]) if err != nil { @@ -199,7 +199,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.AsyncSearchIdPath, "POST", matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"POST"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"]) responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"]) if err != nil { @@ -208,7 +208,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.AsyncSearchIdPath, "DELETE", matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"DELETE"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { responseBody, err := queryRunner.deleteAsyncSeach(params["id"]) if err != nil { return nil, err @@ -216,7 +216,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.FieldCapsPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.FieldCapsPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { responseBody, err := handleFieldCaps(ctx, params["index"], []byte(body), lm) if err != nil { if errors.Is(errIndexNotExists, err) { @@ -227,7 +227,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, } return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.TermsEnumPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.TermsEnumPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { if strings.Contains(params["index"], ",") { return nil, errors.New("multi index terms enum is not yet supported") } else { @@ -251,8 +251,8 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil } - router.RegisterPathMatcher(routes.EQLSearch, "GET", matchedAgainstPattern(cfg), eqlHandler) - router.RegisterPathMatcher(routes.EQLSearch, "POST", matchedAgainstPattern(cfg), eqlHandler) + router.RegisterPathMatcher(routes.EQLSearch, []string{"GET"}, matchedAgainstPattern(cfg), eqlHandler) + router.RegisterPathMatcher(routes.EQLSearch, []string{"POST"}, matchedAgainstPattern(cfg), eqlHandler) return router } From 4886afc3fd0ebaedb0c367ecd2f5d9064aac2085 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Tue, 14 May 2024 15:43:43 +0200 Subject: [PATCH 2/4] Support GET for search-related queries (#107) --- quesma/quesma/router.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index 1203ea108..c0359f151 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -139,7 +139,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, } }) - router.RegisterPathMatcher(routes.GlobalSearchPath, []string{"POST"}, func(_ map[string]string, _ string) bool { + router.RegisterPathMatcher(routes.GlobalSearchPath, []string{"GET", "POST"}, func(_ map[string]string, _ string) bool { return true // for now, always route to Quesma, in the near future: combine results from both sources }, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body)) @@ -153,7 +153,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.IndexSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexSearchPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body)) if err != nil { if errors.Is(errIndexNotExists, err) { @@ -164,7 +164,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, } return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { waitForResultsMs := 1000 // Defaults to 1 second as in docs if v, ok := params["wait_for_completion_timeout"]; ok { if w, err := time.ParseDuration(v); err == nil { @@ -190,16 +190,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"GET"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { - ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"]) - responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"]) - if err != nil { - return nil, err - } - return elasticsearchQueryResult(string(responseBody), httpOk), nil - }) - - router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"POST"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"GET", "POST"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"]) responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"]) if err != nil { @@ -216,7 +207,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.FieldCapsPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.FieldCapsPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { responseBody, err := handleFieldCaps(ctx, params["index"], []byte(body), lm) if err != nil { if errors.Is(errIndexNotExists, err) { @@ -251,8 +242,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil } - router.RegisterPathMatcher(routes.EQLSearch, []string{"GET"}, matchedAgainstPattern(cfg), eqlHandler) - router.RegisterPathMatcher(routes.EQLSearch, []string{"POST"}, matchedAgainstPattern(cfg), eqlHandler) + router.RegisterPathMatcher(routes.EQLSearch, []string{"GET", "POST"}, matchedAgainstPattern(cfg), eqlHandler) return router } From 412eeb494b273cf10cd1046f899dc54417a3c9b2 Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Tue, 14 May 2024 15:57:13 +0200 Subject: [PATCH 3/4] Our router is missing http header parameters (#104) Pass HTTP headers to the internal router --- quesma/quesma/mux/mux.go | 3 ++- quesma/quesma/quesma.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/quesma/quesma/mux/mux.go b/quesma/quesma/mux/mux.go index d585ac5da..29dc9f0cc 100644 --- a/quesma/quesma/mux/mux.go +++ b/quesma/quesma/mux/mux.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ucarion/urlpath" "mitmproxy/quesma/logger" + "net/http" "strings" ) @@ -46,7 +47,7 @@ func (p *PathRouter) RegisterPathMatcher(pattern string, httpMethods []string, p } } -func (p *PathRouter) Execute(ctx context.Context, path, body, httpMethod string) (*Result, error) { +func (p *PathRouter) Execute(ctx context.Context, path, body, httpMethod string, headers http.Header) (*Result, error) { handler, meta, found := p.findHandler(path, httpMethod, body) if found { return handler(ctx, body, path, meta.Params) diff --git a/quesma/quesma/quesma.go b/quesma/quesma/quesma.go index ec217a710..73a5c7214 100644 --- a/quesma/quesma/quesma.go +++ b/quesma/quesma/quesma.go @@ -128,7 +128,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R } quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) { - return router.Execute(ctx, req.URL.Path, string(reqBody), req.Method) + return router.Execute(ctx, req.URL.Path, string(reqBody), req.Method, req.Header) }) var elkRawResponse elasticResult var elkResponse *http.Response From 4be20c440cce9babfe096f830ba25f87ceeedf0e Mon Sep 17 00:00:00 2001 From: Grzegorz Piwowarek Date: Tue, 14 May 2024 16:04:48 +0200 Subject: [PATCH 4/4] GET support for /_search only, excluding /_async_search (#108) --- quesma/quesma/router.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index c0359f151..af319ba2a 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -164,7 +164,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, } return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { waitForResultsMs := 1000 // Defaults to 1 second as in docs if v, ok := params["wait_for_completion_timeout"]; ok { if w, err := time.ParseDuration(v); err == nil { @@ -190,7 +190,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return elasticsearchQueryResult(string(responseBody), httpOk), nil }) - router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"GET", "POST"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { + router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"POST"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) { ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"]) responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"]) if err != nil {