Skip to content

Commit

Permalink
Propagate query params inside router handlers (#126)
Browse files Browse the repository at this point in the history
This exposes query params as parameters for Quesma `handlers`
  • Loading branch information
pivovarit authored May 16, 2024
1 parent 4b66626 commit 7f80784
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 21 deletions.
7 changes: 4 additions & 3 deletions quesma/quesma/mux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/ucarion/urlpath"
"mitmproxy/quesma/logger"
"net/http"
"net/url"
"strings"
)

Expand All @@ -24,7 +25,7 @@ type (
Meta map[string]string
StatusCode int
}
handler func(ctx context.Context, body, uri string, params map[string]string) (*Result, error)
handler func(ctx context.Context, body, uri string, params map[string]string, headers http.Header, queryParams url.Values) (*Result, error)
MatchPredicate func(params map[string]string, body string) bool
)

Expand All @@ -47,10 +48,10 @@ func (p *PathRouter) RegisterPathMatcher(pattern string, httpMethods []string, p
}
}

func (p *PathRouter) Execute(ctx context.Context, path, body, httpMethod string, headers http.Header) (*Result, error) {
func (p *PathRouter) Execute(ctx context.Context, path, body, httpMethod string, headers http.Header, queryParams url.Values) (*Result, error) {
handler, meta, found := p.findHandler(path, httpMethod, body)
if found {
return handler(ctx, body, path, meta.Params)
return handler(ctx, body, path, meta.Params, headers, queryParams)
}
return nil, nil
}
Expand Down
4 changes: 3 additions & 1 deletion quesma/quesma/mux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package mux
import (
"context"
"github.com/stretchr/testify/assert"
"net/http"
"net/url"
"testing"
)

Expand Down Expand Up @@ -34,6 +36,6 @@ func always(_ map[string]string, _ string) bool {
return true
}

func mockHandler(_ context.Context, _, _ string, _ map[string]string) (*Result, error) {
func mockHandler(_ context.Context, _, _ string, _ map[string]string, _ http.Header, _ url.Values) (*Result, error) {
return &Result{}, nil
}
3 changes: 2 additions & 1 deletion quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
elkResponseChan = r.sendHttpRequestToElastic(ctx, req, reqBody, false)
}

req.URL.Query()
quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) {
return router.Execute(ctx, req.URL.Path, string(reqBody), req.Method, req.Header)
return router.Execute(ctx, req.URL.Path, string(reqBody), req.Method, req.Header, req.URL.Query())
})
var elkRawResponse elasticResult
var elkResponse *http.Response
Expand Down
34 changes: 18 additions & 16 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"mitmproxy/quesma/quesma/ui"
"mitmproxy/quesma/telemetry"
"mitmproxy/quesma/tracing"
"net/http"
"net/url"
"regexp"
"slices"
"strings"
Expand All @@ -28,37 +30,37 @@ const (

func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, console *ui.QuesmaManagementConsole, phoneHomeAgent telemetry.PhoneHomeAgent, queryRunner *QueryRunner) *mux.PathRouter {
router := mux.NewPathRouter()
router.RegisterPath(routes.ClusterHealthPath, "GET", func(_ context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPath(routes.ClusterHealthPath, "GET", func(_ context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, httpOk), nil
})

router.RegisterPathMatcher(routes.BulkPath, []string{"POST"}, matchedAgainstBulkBody(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.BulkPath, []string{"POST"}, matchedAgainstBulkBody(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
results := dualWriteBulk(ctx, nil, body, lm, cfg, phoneHomeAgent)
return bulkInsertResult(results), nil
})

router.RegisterPathMatcher(routes.IndexRefreshPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexRefreshPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
return elasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, httpOk), nil
})

router.RegisterPathMatcher(routes.IndexDocPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexDocPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
dualWrite(ctx, params["index"], body, lm, cfg)
return indexDocResult(params["index"], httpOk), nil
})

router.RegisterPathMatcher(routes.IndexBulkPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexBulkPath, []string{"POST"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
index := params["index"]
results := dualWriteBulk(ctx, &index, body, lm, cfg, phoneHomeAgent)
return bulkInsertResult(results), nil
})

router.RegisterPathMatcher(routes.IndexBulkPath, []string{"PUT"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexBulkPath, []string{"PUT"}, matchedExact(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
index := params["index"]
results := dualWriteBulk(ctx, &index, body, lm, cfg, phoneHomeAgent)
return bulkInsertResult(results), nil
})

router.RegisterPathMatcher(routes.ResolveIndexPath, []string{"GET"}, always(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.ResolveIndexPath, []string{"GET"}, always(), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
pattern := elasticsearch.NormalizePattern(params["index"])
if elasticsearch.IsIndexPattern(pattern) {
// todo avoid creating new instances all the time
Expand Down Expand Up @@ -123,7 +125,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
})

router.RegisterPathMatcher(routes.IndexCountPath, []string{"GET"}, matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexCountPath, []string{"GET"}, matchedAgainstPattern(cfg), func(ctx context.Context, _ string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
cnt, err := queryRunner.handleCount(ctx, params["index"])
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand All @@ -142,7 +144,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,

router.RegisterPathMatcher(routes.GlobalSearchPath, []string{"GET", "POST"}, func(_ map[string]string, _ string) bool {
return true // for now, always route to Quesma, in the near future: combine results from both sources
}, func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
}, func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, "*", []byte(body))
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand All @@ -154,7 +156,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.IndexSearchPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexSearchPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
responseBody, err := queryRunner.handleSearch(ctx, params["index"], []byte(body))
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand All @@ -170,7 +172,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})
router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.IndexAsyncSearchPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
waitForResultsMs := 1000 // Defaults to 1 second as in docs
if v, ok := params["wait_for_completion_timeout"]; ok {
if w, err := time.ParseDuration(v); err == nil {
Expand Down Expand Up @@ -201,7 +203,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"GET"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"GET"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, params["id"])
if err != nil {
Expand All @@ -210,15 +212,15 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"DELETE"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.AsyncSearchIdPath, []string{"DELETE"}, matchedAgainstAsyncId(), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
responseBody, err := queryRunner.deleteAsyncSeach(params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})

router.RegisterPathMatcher(routes.FieldCapsPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.FieldCapsPath, []string{"GET", "POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
responseBody, err := handleFieldCaps(ctx, params["index"], []byte(body), lm)
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand All @@ -229,7 +231,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
return elasticsearchQueryResult(string(responseBody), httpOk), nil
})
router.RegisterPathMatcher(routes.TermsEnumPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
router.RegisterPathMatcher(routes.TermsEnumPath, []string{"POST"}, matchedAgainstPattern(cfg), func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, queryParams url.Values) (*mux.Result, error) {
if strings.Contains(params["index"], ",") {
return nil, errors.New("multi index terms enum is not yet supported")
} else {
Expand All @@ -241,7 +243,7 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager,
}
})

eqlHandler := func(ctx context.Context, body string, _ string, params map[string]string) (*mux.Result, error) {
eqlHandler := func(ctx context.Context, body string, _ string, params map[string]string, _ http.Header, _ url.Values) (*mux.Result, error) {
responseBody, err := queryRunner.handleEQLSearch(ctx, params["index"], []byte(body))
if err != nil {
if errors.Is(errIndexNotExists, err) {
Expand Down

0 comments on commit 7f80784

Please sign in to comment.