Skip to content

Commit

Permalink
Merge branch 'main' into pr-e2e-1
Browse files Browse the repository at this point in the history
  • Loading branch information
nablaone authored May 14, 2024
2 parents c034623 + 4be20c4 commit 51d9985
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
11 changes: 7 additions & 4 deletions quesma/quesma/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"github.com/ucarion/urlpath"
"mitmproxy/quesma/logger"
"net/http"
"strings"
)

Expand Down Expand Up @@ -39,12 +40,14 @@ func (p *PathRouter) RegisterPath(pattern, httpMethod string, handler handler) {
p.mappings = append(p.mappings, mapping)
}

func (p *PathRouter) RegisterPathMatcher(pattern, httpMethod string, predicate MatchPredicate, handler handler) {
mapping := mapping{pattern, urlpath.New(pattern), httpMethod, predicate, handler}
p.mappings = append(p.mappings, mapping)
func (p *PathRouter) RegisterPathMatcher(pattern string, httpMethods []string, predicate MatchPredicate, handler handler) {
for _, httpMethod := range httpMethods {
mapping := mapping{pattern, urlpath.New(pattern), httpMethod, predicate, handler}
p.mappings = append(p.mappings, mapping)
}
}

func (p *PathRouter) Execute(ctx context.Context, path, body, httpMethod string) (*Result, error) {
func (p *PathRouter) Execute(ctx context.Context, path, body, httpMethod string, headers http.Header) (*Result, error) {
handler, meta, found := p.findHandler(path, httpMethod, body)
if found {
return handler(ctx, body, path, meta.Params)
Expand Down
13 changes: 12 additions & 1 deletion quesma/quesma/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,20 @@ func TestMatches_ShouldIgnoreTrailingSlash(t *testing.T) {

assert.True(t, router.Matches("/indexABC/_bulk", "POST", ""))
assert.True(t, router.Matches("/indexABC/_bulk/", "POST", ""))
}

func TestShouldMatchMultipleHttpMethods(t *testing.T) {
router := NewPathRouter()
router.RegisterPathMatcher("/:index/_bulk", []string{"POST", "GET"}, always, mockHandler)

assert.True(t, router.Matches("/index1/_bulk", "POST", ""))
assert.True(t, router.Matches("/index1/_bulk", "GET", ""))
}

func always(_ map[string]string, _ string) bool {
return true
}

func mockHandler(ctx context.Context, body, uri string, params map[string]string) (*Result, error) {
func mockHandler(_ context.Context, _, _ string, _ map[string]string) (*Result, error) {
return &Result{}, nil
}
2 changes: 1 addition & 1 deletion quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
}

quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) {
return router.Execute(ctx, req.URL.Path, string(reqBody), req.Method)
return router.Execute(ctx, req.URL.Path, string(reqBody), req.Method, req.Header)
})
var elkRawResponse elasticResult
var elkResponse *http.Response
Expand Down
40 changes: 15 additions & 25 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,33 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, httpOk), nil
})

router.RegisterPathMatcher(routes.BulkPath, "POST", matchedAgainstBulkBody(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.BulkPath, []string{"POST"}, matchedAgainstBulkBody(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
results := dualWriteBulk(ctx, nil, body, lm, cfg, phoneHomeAgent)
return bulkInsertResult(results), nil
})

router.RegisterPathMatcher(routes.IndexRefreshPath, "POST", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexRefreshPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
return elasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, httpOk), nil
})

router.RegisterPathMatcher(routes.IndexDocPath, "POST", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexDocPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
dualWrite(ctx, params["index"], body, lm, cfg)
return indexDocResult(params["index"], httpOk), nil
})

router.RegisterPathMatcher(routes.IndexBulkPath, "POST", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexBulkPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
index := params["index"]
results := dualWriteBulk(ctx, &index, body, lm, cfg, phoneHomeAgent)
return bulkInsertResult(results), nil
})

router.RegisterPathMatcher(routes.IndexBulkPath, "PUT", matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexBulkPath, []string{"PUT"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
index := params["index"]
results := dualWriteBulk(ctx, &index, body, lm, cfg, phoneHomeAgent)
return bulkInsertResult(results), nil
})

router.RegisterPathMatcher(routes.ResolveIndexPath, "GET", always(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.ResolveIndexPath, []string{"GET"}, always(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
pattern := params["index"]
if elasticsearch.IsIndexPattern(pattern) {
// todo avoid creating new instances all the time
Expand Down Expand Up @@ -122,7 +122,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
})

router.RegisterPathMatcher(routes.IndexCountPath, "GET", matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexCountPath, []string{"GET"}, matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string) (*mux.Result, error) {
cnt, err := queryRunner.handleCount(ctx, params["index"])
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand All @@ -139,7 +139,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
})

router.RegisterPathMatcher(routes.GlobalSearchPath, "POST", func(_ map[string]string, _ string) bool {
router.RegisterPathMatcher(routes.GlobalSearchPath, []string{"GET", "POST"}, func(_ map[string]string, _ string) bool {
return true // for now, always route to Quesma, in the near future: combine results from both sources
}, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body))
Expand All @@ -153,7 +153,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.IndexSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexSearchPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body))
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand All @@ -164,7 +164,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})
router.RegisterPathMatcher(routes.IndexAsyncSearchPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
waitForResultsMs := 1000 // Defaults to 1 second as in docs
if v, ok := params["wait_for_completion_timeout"]; ok {
if w, err := time.ParseDuration(v); err == nil {
Expand All @@ -190,7 +190,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.AsyncSearchIdPath, "GET", matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"POST"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"])
if err != nil {
Expand All @@ -199,24 +199,15 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.AsyncSearchIdPath, "POST", matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.AsyncSearchIdPath, "DELETE", matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"DELETE"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := queryRunner.deleteAsyncSeach(params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.FieldCapsPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.FieldCapsPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
responseBody, err := handleFieldCaps(ctx, params["index"], []byte(body), lm)
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand All @@ -227,7 +218,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})
router.RegisterPathMatcher(routes.TermsEnumPath, "POST", matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.TermsEnumPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
if strings.Contains(params["index"], ",") {
return nil, errors.New("multi index terms enum is not yet supported")
} else {
Expand All @@ -251,8 +242,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
}

router.RegisterPathMatcher(routes.EQLSearch, "GET", matchedAgainstPattern(cfg), eqlHandler)
router.RegisterPathMatcher(routes.EQLSearch, "POST", matchedAgainstPattern(cfg), eqlHandler)
router.RegisterPathMatcher(routes.EQLSearch, []string{"GET", "POST"}, matchedAgainstPattern(cfg), eqlHandler)

return router
}
Expand Down

0 comments on commit 51d9985

Please sign in to comment.