Skip to content

Commit

Permalink
handlers and scaffolding for further work
Browse files Browse the repository at this point in the history
  • Loading branch information
mieciu committed Jan 2, 2025
1 parent 1e1f43d commit bb6078d
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 17 deletions.
14 changes: 7 additions & 7 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
})

router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
cnt, err := queryRunner.handleCount(ctx, req.Params["index"])
cnt, err := queryRunner.HandleCount(ctx, req.Params["index"])
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil
Expand All @@ -191,7 +191,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
}

// TODO we should pass JSON here instead of []byte
responseBody, err := queryRunner.handleSearch(ctx, "*", body)
responseBody, err := queryRunner.HandleSearch(ctx, "*", body)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil
Expand All @@ -209,7 +209,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return nil, err
}

responseBody, err := queryRunner.handleSearch(ctx, req.Params["index"], body)
responseBody, err := queryRunner.HandleSearch(ctx, req.Params["index"], body)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil
Expand Down Expand Up @@ -246,7 +246,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
return nil, err
}

responseBody, err := queryRunner.handleAsyncSearch(ctx, req.Params["index"], body, waitForResultsMs, keepOnCompletion)
responseBody, err := queryRunner.HandleAsyncSearch(ctx, req.Params["index"], body, waitForResultsMs, keepOnCompletion)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil
Expand Down Expand Up @@ -298,7 +298,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
})

router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
responseBody, err := queryRunner.handleAsyncSearchStatus(ctx, req.Params["id"])
responseBody, err := queryRunner.HandleAsyncSearchStatus(ctx, req.Params["id"])
if err != nil {
return nil, err
}
Expand All @@ -311,14 +311,14 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl

case "GET":
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"])
responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"])
responseBody, err := queryRunner.HandlePartialAsyncSearch(ctx, req.Params["id"])
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil

case "DELETE":
responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"])
responseBody, err := queryRunner.DeleteAsyncSearch(req.Params["id"])
if err != nil {
return nil, err
}
Expand Down
123 changes: 123 additions & 0 deletions quesma/quesma/router_v2_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package quesma

import (
"context"
"errors"
"net/http"
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/queryparser"
"quesma/quesma/config"
quesma_errors "quesma/quesma/errors"
"quesma/quesma/functionality/field_capabilities"
"quesma/quesma/functionality/resolve"
"quesma/quesma/types"
"quesma/schema"
quesma_api "quesma_v2/core"
"quesma_v2/core/tracing"
)

func HandleDeletingAsyncSearchById(queryRunner QueryRunnerIFace, asyncSearchId string) (*quesma_api.Result, error) {
responseBody, err := queryRunner.DeleteAsyncSearch(asyncSearchId)
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}

func HandleGettingAsyncSearchById(ctx context.Context, asyncSearchId string, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) {
ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, asyncSearchId)
responseBody, err := queryRunner.HandlePartialAsyncSearch(ctx, asyncSearchId)
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}

func HandleAsyncSearchStatus(ctx context.Context, asyncSearchId string, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) {
responseBody, err := queryRunner.HandleAsyncSearchStatus(ctx, asyncSearchId)
if err != nil {
return nil, err
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}

func HandleIndexSearch(ctx context.Context, indexPattern string, query types.JSON, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) {
responseBody, err := queryRunner.HandleSearch(ctx, indexPattern, query)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) {
return &quesma_api.Result{
Body: string(queryparser.BadRequestParseError(err)),
StatusCode: http.StatusBadRequest,
GenericResult: queryparser.BadRequestParseError(err),
}, nil
} else {
return nil, err
}
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}

func HandleIndexAsyncSearch(ctx context.Context, indexPattern string, query types.JSON, waitForResultsMs int, keepOnCompletion bool, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) {
responseBody, err := queryRunner.HandleAsyncSearch(ctx, indexPattern, query, waitForResultsMs, keepOnCompletion)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) {
return &quesma_api.Result{
Body: string(queryparser.BadRequestParseError(err)),
StatusCode: http.StatusBadRequest,
GenericResult: queryparser.BadRequestParseError(err),
}, nil
} else {
return nil, err
}
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}

func HandleResolveIndex(_ context.Context, indexPattern string, sr schema.Registry, esConfig config.ElasticsearchConfiguration) (*quesma_api.Result, error) {
ir := elasticsearch.NewIndexResolver(esConfig)
sources, err := resolve.HandleResolve(indexPattern, sr, ir)
if err != nil {
return nil, err
}
return resolveIndexResult(sources)
}

func HandleIndexCount(ctx context.Context, indexPattern string, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) {
cnt, err := queryRunner.HandleCount(ctx, indexPattern)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else {
return nil, err
}
}

if cnt == -1 {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else {
return elasticsearchCountResult(cnt, http.StatusOK)
}
}

func HandleFieldCaps(ctx context.Context, indexPattern string, allowNoIndices, ignoreUnavailable bool, cfg map[string]config.IndexConfiguration, sr schema.Registry, lm clickhouse.LogManagerIFace) (*quesma_api.Result, error) {
responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg, sr, indexPattern, lm)
if err != nil {
if errors.Is(quesma_errors.ErrIndexNotExists(), err) {
if allowNoIndices || ignoreUnavailable { // TODO I think this is no longer applicable? :|
return elasticsearchQueryResult(string(field_capabilities.EmptyFieldCapsResponse()), http.StatusOK), nil
}
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
} else {
return nil, err
}
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}
38 changes: 30 additions & 8 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ type QueryRunner struct {
maxParallelQueries int // if set to 0, we run queries in sequence, it's fine for testing purposes
}

// QueryRunnerIFace is a temporary interface to bridge gap between QueryRunner and QueryRunner2 in `router_v2.go`.
// moving forwards as we remove two implementations we might look at making all these methods private again.
type QueryRunnerIFace interface {
HandleSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error)
HandleAsyncSearch(ctx context.Context, indexPattern string, body types.JSON, waitForResultsMs int, keepOnCompletion bool) ([]byte, error)
HandleAsyncSearchStatus(_ context.Context, id string) ([]byte, error)
HandleCount(ctx context.Context, indexPattern string) (int64, error)
// Todo: consider removing this getters for these two below, this was required for temporary Field Caps impl in v2 api
GetSchemaRegistry() schema.Registry
GetLogManager() clickhouse.LogManagerIFace
DeleteAsyncSearch(id string) ([]byte, error)
HandlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error)
}

func (q *QueryRunner) EnableQueryOptimization(cfg *config.QuesmaConfiguration) {
q.transformationPipeline.transformers = append(q.transformationPipeline.transformers, optimize.NewOptimizePipeline(cfg))
}
Expand Down Expand Up @@ -100,6 +114,14 @@ func NewQueryRunner(lm *clickhouse.LogManager,
}
}

func (q *QueryRunner) GetSchemaRegistry() schema.Registry {
return q.schemaRegistry
}

func (q *QueryRunner) GetLogManager() clickhouse.LogManagerIFace {
return q.logManager
}

func NewQueryRunnerDefaultForTests(db *sql.DB, cfg *config.QuesmaConfiguration,
tableName string, tables *clickhouse.TableMap, staticRegistry *schema.StaticRegistry) *QueryRunner {

Expand All @@ -126,8 +148,8 @@ func NewQueryRunnerDefaultForTests(db *sql.DB, cfg *config.QuesmaConfiguration,
return NewQueryRunner(lm, cfg, nil, managementConsole, staticRegistry, ab_testing.NewEmptySender(), resolver, tableDiscovery)
}

// returns -1 when table name could not be resolved
func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int64, error) {
// HandleCount returns -1 when table name could not be resolved
func (q *QueryRunner) HandleCount(ctx context.Context, indexPattern string) (int64, error) {
indexes, err := q.logManager.ResolveIndexPattern(ctx, q.schemaRegistry, indexPattern)
if err != nil {
return 0, err
Expand All @@ -148,15 +170,15 @@ func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int
}
}

func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error) {
func (q *QueryRunner) HandleSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error) {
return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageDefault)
}

func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error) {
return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageEQL)
}

func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string, body types.JSON,
func (q *QueryRunner) HandleAsyncSearch(ctx context.Context, indexPattern string, body types.JSON,
waitForResultsMs int, keepOnCompletion bool) ([]byte, error) {
async := AsyncQuery{
asyncId: tracing.GetAsyncId(),
Expand Down Expand Up @@ -273,7 +295,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan
responseBody, err = q.storeAsyncSearch(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, body, res, true, opaqueId)
sendMainPlanResult(responseBody, err)
}()
return q.handlePartialAsyncSearch(ctx, optAsync.asyncId)
return q.HandlePartialAsyncSearch(ctx, optAsync.asyncId)
case res := <-doneCh:
responseBody, err = q.storeAsyncSearch(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, body, res,
optAsync.keepOnCompletion, opaqueId)
Expand Down Expand Up @@ -504,7 +526,7 @@ func (q *QueryRunner) asyncQueriesCumulatedBodySize() int {
return size
}

func (q *QueryRunner) handleAsyncSearchStatus(_ context.Context, id string) ([]byte, error) {
func (q *QueryRunner) HandleAsyncSearchStatus(_ context.Context, id string) ([]byte, error) {
if _, ok := q.AsyncRequestStorage.Load(id); ok { // there IS a result in storage, so query is completed/no longer running,
return queryparser.EmptyAsyncSearchStatusResponse(id, false, false, 200)
} else { // there is no result so query is might be(*) running
Expand All @@ -514,7 +536,7 @@ func (q *QueryRunner) handleAsyncSearchStatus(_ context.Context, id string) ([]b
// However since you're referring to async ID given from Quesma, we naively assume it *does* exist.
}

func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) {
func (q *QueryRunner) HandlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) {
if !strings.Contains(id, tracing.AsyncIdPrefix) {
logger.ErrorWithCtx(ctx).Msgf("non quesma async id: %v", id)
return queryparser.EmptyAsyncSearchResponse(id, false, 503)
Expand Down Expand Up @@ -549,7 +571,7 @@ func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) (
}
}

func (q *QueryRunner) deleteAsyncSearch(id string) ([]byte, error) {
func (q *QueryRunner) DeleteAsyncSearch(id string) ([]byte, error) {
if !strings.Contains(id, tracing.AsyncIdPrefix) {
return nil, errors.New("invalid quesma async search id : " + id)
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search_ab_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (q *QueryRunner) executePlanElastic(ctx context.Context, plan *model.Execut
responseBody, err = q.storeAsyncSearchWithRaw(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId)
sendABResult(responseBody, err)
}()
return q.handlePartialAsyncSearch(ctx, optAsync.asyncId)
return q.HandlePartialAsyncSearch(ctx, optAsync.asyncId)
case res := <-doneCh:
responseBody, err = q.storeAsyncSearchWithRaw(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId)
sendABResult(responseBody, err)
Expand Down
2 changes: 1 addition & 1 deletion quesma/v2/core/quesma_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,5 @@ type BackendConnector interface {

// Exec executes a command that doesn't return rows, typically an INSERT, UPDATE, or DELETE.
Exec(ctx context.Context, query string, args ...interface{}) error
Close() error
Close() error // TODO we should revisit returning error here
}

0 comments on commit bb6078d

Please sign in to comment.