Skip to content

Commit

Permalink
router adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
mieciu committed Jan 2, 2025
1 parent bb6078d commit 754a103
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 95 deletions.
4 changes: 2 additions & 2 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
})

router.Register(routes.ResolveIndexPath, method("GET"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
sources, err := resolve.HandleResolve(req.Params["index"], sr, cfg)
sources, err := resolve.HandleResolve(req.Params["index"], sr, queryRunner.im)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -330,7 +330,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl

router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg, sr, req.Params["index"], lm)
responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg.IndexConfig, sr, req.Params["index"], lm)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
if req.QueryParams.Get("allow_no_indices") == "true" || req.QueryParams.Get("ignore_unavailable") == "true" {
Expand Down
110 changes: 17 additions & 93 deletions quesma/quesma/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@ import (
"quesma/quesma/errors"
"quesma/quesma/functionality/bulk"
"quesma/quesma/functionality/doc"
"quesma/quesma/functionality/field_capabilities"
"quesma/quesma/functionality/resolve"
"quesma/quesma/functionality/terms_enum"
"quesma/quesma/types"
"quesma/schema"
"quesma/table_resolver"
quesma_api "quesma_v2/core"
"quesma_v2/core/routes"
tracing "quesma_v2/core/tracing"

"strings"
"time"
)
Expand Down Expand Up @@ -166,28 +162,11 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
})

router.Register(routes.ResolveIndexPath, method("GET"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
sources, err := resolve.HandleResolve(req.Params["index"], sr, cfg)
if err != nil {
return nil, err
}
return resolveIndexResult(sources)
return HandleResolveIndex(ctx, req.Params["index"], sr, cfg.Elasticsearch)
})

router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
cnt, err := queryRunner.handleCount(ctx, req.Params["index"])
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else {
return nil, err
}
}

if cnt == -1 {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else {
return elasticsearchCountResult(cnt, http.StatusOK)
}
return HandleIndexCount(ctx, req.Params["index"], queryRunner)
})

// TODO: This endpoint is currently disabled (mux.Never()) as it's pretty much used only by internal Kibana requests,
Expand All @@ -201,7 +180,7 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
}

// TODO we should pass JSON here instead of []byte
responseBody, err := queryRunner.handleSearch(ctx, "*", body)
responseBody, err := queryRunner.HandleSearch(ctx, "*", body)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
Expand All @@ -213,29 +192,18 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
})

router.Register(routes.IndexSearchPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, err
}
return HandleIndexSearch(ctx, req.Params["index"], body, queryRunner)
})

responseBody, err := queryRunner.handleSearch(ctx, req.Params["index"], body)
router.Register(routes.IndexAsyncSearchPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
query, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) {
return &quesma_api.Result{
Body: string(queryparser.BadRequestParseError(err)),
StatusCode: http.StatusBadRequest,
GenericResult: queryparser.BadRequestParseError(err),
}, nil
} else {
return nil, err
}
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
})
router.Register(routes.IndexAsyncSearchPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
waitForResultsMs := 1000 // Defaults to 1 second as in docs
if v, ok := req.Params["wait_for_completion_timeout"]; ok {
if w, err := time.ParseDuration(v); err == nil {
Expand All @@ -251,26 +219,7 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
}
}

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, err
}

responseBody, err := queryRunner.handleAsyncSearch(ctx, req.Params["index"], body, waitForResultsMs, keepOnCompletion)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) {
return &quesma_api.Result{
Body: string(queryparser.BadRequestParseError(err)),
StatusCode: http.StatusBadRequest,
GenericResult: queryparser.BadRequestParseError(err),
}, nil
} else {
return nil, err
}
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
return HandleIndexAsyncSearch(ctx, req.Params["index"], query, waitForResultsMs, keepOnCompletion, queryRunner)
})

router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
Expand Down Expand Up @@ -308,52 +257,27 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
})

router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
responseBody, err := queryRunner.handleAsyncSearchStatus(ctx, req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
return HandleAsyncSearchStatus(ctx, req.Params["id"], queryRunner)
})

router.Register(routes.AsyncSearchIdPath, and(method("GET", "DELETE"), matchedAgainstAsyncId()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

switch req.Method {

case "GET":
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil

return HandleGettingAsyncSearchById(ctx, req.Params["id"], queryRunner)
case "DELETE":
responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
return HandleDeletingAsyncSearchById(queryRunner, req.Params["id"])
}

return nil, errors.New("unsupported method")
})

router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg, sr, req.Params["index"], lm)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
if req.QueryParams.Get("allow_no_indices") == "true" || req.QueryParams.Get("ignore_unavailable") == "true" {
return elasticsearchQueryResult(string(field_capabilities.EmptyFieldCapsResponse()), http.StatusOK), nil
}
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else {
return nil, err
}
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
return HandleFieldCaps(ctx, req.Params["index"],
req.QueryParams.Get("allow_no_indices") == "true",
req.QueryParams.Get("ignore_unavailable") == "true",
cfg.IndexConfig, sr, lm)
})
router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

if strings.Contains(req.Params["index"], ",") {
return nil, errors.New("multi index terms enum is not yet supported")
} else {
Expand Down

0 comments on commit 754a103

Please sign in to comment.