diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index 4a2fe282b..c0e7b435f 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -164,7 +164,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl }) router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { - cnt, err := queryRunner.handleCount(ctx, req.Params["index"]) + cnt, err := queryRunner.HandleCount(ctx, req.Params["index"]) if err != nil { if errors.Is(quesma_errors.ErrIndexNotExists(), err) { return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil @@ -191,7 +191,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl } // TODO we should pass JSON here instead of []byte - responseBody, err := queryRunner.handleSearch(ctx, "*", body) + responseBody, err := queryRunner.HandleSearch(ctx, "*", body) if err != nil { if errors.Is(quesma_errors.ErrIndexNotExists(), err) { return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil @@ -209,7 +209,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return nil, err } - responseBody, err := queryRunner.handleSearch(ctx, req.Params["index"], body) + responseBody, err := queryRunner.HandleSearch(ctx, req.Params["index"], body) if err != nil { if errors.Is(quesma_errors.ErrIndexNotExists(), err) { return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil @@ -246,7 +246,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return nil, err } - responseBody, err := queryRunner.handleAsyncSearch(ctx, req.Params["index"], body, waitForResultsMs, keepOnCompletion) + responseBody, err := queryRunner.HandleAsyncSearch(ctx, req.Params["index"], body, waitForResultsMs, keepOnCompletion) if err != nil { if errors.Is(quesma_errors.ErrIndexNotExists(), err) { return &quesma_api.Result{StatusCode: http.StatusNotFound}, nil @@ -298,7 +298,7 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl }) router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { - responseBody, err := queryRunner.handleAsyncSearchStatus(ctx, req.Params["id"]) + responseBody, err := queryRunner.HandleAsyncSearchStatus(ctx, req.Params["id"]) if err != nil { return nil, err } @@ -311,14 +311,14 @@ func ConfigureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl case "GET": ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"]) - responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"]) + responseBody, err := queryRunner.HandlePartialAsyncSearch(ctx, req.Params["id"]) if err != nil { return nil, err } return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil case "DELETE": - responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"]) + responseBody, err := queryRunner.DeleteAsyncSearch(req.Params["id"]) if err != nil { return nil, err } diff --git a/quesma/quesma/router_v2_handlers.go b/quesma/quesma/router_v2_handlers.go new file mode 100644 index 000000000..5b7c49978 --- /dev/null +++ b/quesma/quesma/router_v2_handlers.go @@ -0,0 +1,123 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package quesma + +import ( + "context" + "errors" + "net/http" + "quesma/clickhouse" + "quesma/elasticsearch" + "quesma/queryparser" + "quesma/quesma/config" + quesma_errors "quesma/quesma/errors" + "quesma/quesma/functionality/field_capabilities" + "quesma/quesma/functionality/resolve" + "quesma/quesma/types" + "quesma/schema" + quesma_api "quesma_v2/core" + "quesma_v2/core/tracing" +) + +func HandleDeletingAsyncSearchById(queryRunner QueryRunnerIFace, asyncSearchId string) (*quesma_api.Result, error) { + responseBody, err := queryRunner.DeleteAsyncSearch(asyncSearchId) + if err != nil { + return nil, err + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil +} + +func HandleGettingAsyncSearchById(ctx context.Context, asyncSearchId string, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) { + ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, asyncSearchId) + responseBody, err := queryRunner.HandlePartialAsyncSearch(ctx, asyncSearchId) + if err != nil { + return nil, err + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil +} + +func HandleAsyncSearchStatus(ctx context.Context, asyncSearchId string, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) { + responseBody, err := queryRunner.HandleAsyncSearchStatus(ctx, asyncSearchId) + if err != nil { + return nil, err + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil +} + +func HandleIndexSearch(ctx context.Context, indexPattern string, query types.JSON, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) { + responseBody, err := queryRunner.HandleSearch(ctx, indexPattern, query) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) { + return &quesma_api.Result{ + Body: string(queryparser.BadRequestParseError(err)), + StatusCode: http.StatusBadRequest, + GenericResult: queryparser.BadRequestParseError(err), + }, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil +} + +func HandleIndexAsyncSearch(ctx context.Context, indexPattern string, query types.JSON, waitForResultsMs int, keepOnCompletion bool, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) { + responseBody, err := queryRunner.HandleAsyncSearch(ctx, indexPattern, query, waitForResultsMs, keepOnCompletion) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) { + return &quesma_api.Result{ + Body: string(queryparser.BadRequestParseError(err)), + StatusCode: http.StatusBadRequest, + GenericResult: queryparser.BadRequestParseError(err), + }, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil +} + +func HandleResolveIndex(_ context.Context, indexPattern string, sr schema.Registry, esConfig config.ElasticsearchConfiguration) (*quesma_api.Result, error) { + ir := elasticsearch.NewIndexResolver(esConfig) + sources, err := resolve.HandleResolve(indexPattern, sr, ir) + if err != nil { + return nil, err + } + return resolveIndexResult(sources) +} + +func HandleIndexCount(ctx context.Context, indexPattern string, queryRunner QueryRunnerIFace) (*quesma_api.Result, error) { + cnt, err := queryRunner.HandleCount(ctx, indexPattern) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } else { + return nil, err + } + } + + if cnt == -1 { + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } else { + return elasticsearchCountResult(cnt, http.StatusOK) + } +} + +func HandleFieldCaps(ctx context.Context, indexPattern string, allowNoIndices, ignoreUnavailable bool, cfg map[string]config.IndexConfiguration, sr schema.Registry, lm clickhouse.LogManagerIFace) (*quesma_api.Result, error) { + responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg, sr, indexPattern, lm) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + if allowNoIndices || ignoreUnavailable { // TODO I think this is no longer applicable? :| + return elasticsearchQueryResult(string(field_capabilities.EmptyFieldCapsResponse()), http.StatusOK), nil + } + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil +} diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 5bfce7364..09480018f 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -65,6 +65,20 @@ type QueryRunner struct { maxParallelQueries int // if set to 0, we run queries in sequence, it's fine for testing purposes } +// QueryRunnerIFace is a temporary interface to bridge gap between QueryRunner and QueryRunner2 in `router_v2.go`. +// moving forwards as we remove two implementations we might look at making all these methods private again. +type QueryRunnerIFace interface { + HandleSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error) + HandleAsyncSearch(ctx context.Context, indexPattern string, body types.JSON, waitForResultsMs int, keepOnCompletion bool) ([]byte, error) + HandleAsyncSearchStatus(_ context.Context, id string) ([]byte, error) + HandleCount(ctx context.Context, indexPattern string) (int64, error) + // Todo: consider removing this getters for these two below, this was required for temporary Field Caps impl in v2 api + GetSchemaRegistry() schema.Registry + GetLogManager() clickhouse.LogManagerIFace + DeleteAsyncSearch(id string) ([]byte, error) + HandlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) +} + func (q *QueryRunner) EnableQueryOptimization(cfg *config.QuesmaConfiguration) { q.transformationPipeline.transformers = append(q.transformationPipeline.transformers, optimize.NewOptimizePipeline(cfg)) } @@ -100,6 +114,14 @@ func NewQueryRunner(lm *clickhouse.LogManager, } } +func (q *QueryRunner) GetSchemaRegistry() schema.Registry { + return q.schemaRegistry +} + +func (q *QueryRunner) GetLogManager() clickhouse.LogManagerIFace { + return q.logManager +} + func NewQueryRunnerDefaultForTests(db *sql.DB, cfg *config.QuesmaConfiguration, tableName string, tables *clickhouse.TableMap, staticRegistry *schema.StaticRegistry) *QueryRunner { @@ -126,8 +148,8 @@ func NewQueryRunnerDefaultForTests(db *sql.DB, cfg *config.QuesmaConfiguration, return NewQueryRunner(lm, cfg, nil, managementConsole, staticRegistry, ab_testing.NewEmptySender(), resolver, tableDiscovery) } -// returns -1 when table name could not be resolved -func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int64, error) { +// HandleCount returns -1 when table name could not be resolved +func (q *QueryRunner) HandleCount(ctx context.Context, indexPattern string) (int64, error) { indexes, err := q.logManager.ResolveIndexPattern(ctx, q.schemaRegistry, indexPattern) if err != nil { return 0, err @@ -148,7 +170,7 @@ func (q *QueryRunner) handleCount(ctx context.Context, indexPattern string) (int } } -func (q *QueryRunner) handleSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error) { +func (q *QueryRunner) HandleSearch(ctx context.Context, indexPattern string, body types.JSON) ([]byte, error) { return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageDefault) } @@ -156,7 +178,7 @@ func (q *QueryRunner) handleEQLSearch(ctx context.Context, indexPattern string, return q.handleSearchCommon(ctx, indexPattern, body, nil, QueryLanguageEQL) } -func (q *QueryRunner) handleAsyncSearch(ctx context.Context, indexPattern string, body types.JSON, +func (q *QueryRunner) HandleAsyncSearch(ctx context.Context, indexPattern string, body types.JSON, waitForResultsMs int, keepOnCompletion bool) ([]byte, error) { async := AsyncQuery{ asyncId: tracing.GetAsyncId(), @@ -273,7 +295,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan responseBody, err = q.storeAsyncSearch(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, body, res, true, opaqueId) sendMainPlanResult(responseBody, err) }() - return q.handlePartialAsyncSearch(ctx, optAsync.asyncId) + return q.HandlePartialAsyncSearch(ctx, optAsync.asyncId) case res := <-doneCh: responseBody, err = q.storeAsyncSearch(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, body, res, optAsync.keepOnCompletion, opaqueId) @@ -504,7 +526,7 @@ func (q *QueryRunner) asyncQueriesCumulatedBodySize() int { return size } -func (q *QueryRunner) handleAsyncSearchStatus(_ context.Context, id string) ([]byte, error) { +func (q *QueryRunner) HandleAsyncSearchStatus(_ context.Context, id string) ([]byte, error) { if _, ok := q.AsyncRequestStorage.Load(id); ok { // there IS a result in storage, so query is completed/no longer running, return queryparser.EmptyAsyncSearchStatusResponse(id, false, false, 200) } else { // there is no result so query is might be(*) running @@ -514,7 +536,7 @@ func (q *QueryRunner) handleAsyncSearchStatus(_ context.Context, id string) ([]b // However since you're referring to async ID given from Quesma, we naively assume it *does* exist. } -func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) { +func (q *QueryRunner) HandlePartialAsyncSearch(ctx context.Context, id string) ([]byte, error) { if !strings.Contains(id, tracing.AsyncIdPrefix) { logger.ErrorWithCtx(ctx).Msgf("non quesma async id: %v", id) return queryparser.EmptyAsyncSearchResponse(id, false, 503) @@ -549,7 +571,7 @@ func (q *QueryRunner) handlePartialAsyncSearch(ctx context.Context, id string) ( } } -func (q *QueryRunner) deleteAsyncSearch(id string) ([]byte, error) { +func (q *QueryRunner) DeleteAsyncSearch(id string) ([]byte, error) { if !strings.Contains(id, tracing.AsyncIdPrefix) { return nil, errors.New("invalid quesma async search id : " + id) } diff --git a/quesma/quesma/search_ab_testing.go b/quesma/quesma/search_ab_testing.go index e64d7a7ca..c29a03e97 100644 --- a/quesma/quesma/search_ab_testing.go +++ b/quesma/quesma/search_ab_testing.go @@ -227,7 +227,7 @@ func (q *QueryRunner) executePlanElastic(ctx context.Context, plan *model.Execut responseBody, err = q.storeAsyncSearchWithRaw(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId) sendABResult(responseBody, err) }() - return q.handlePartialAsyncSearch(ctx, optAsync.asyncId) + return q.HandlePartialAsyncSearch(ctx, optAsync.asyncId) case res := <-doneCh: responseBody, err = q.storeAsyncSearchWithRaw(q.debugInfoCollector, id, optAsync.asyncId, optAsync.startTime, path, requestBody, res.response, res.err, res.translatedQueryBody, true, opaqueId) sendABResult(responseBody, err) diff --git a/quesma/v2/core/quesma_apis.go b/quesma/v2/core/quesma_apis.go index 9b59af592..59670ea27 100644 --- a/quesma/v2/core/quesma_apis.go +++ b/quesma/v2/core/quesma_apis.go @@ -102,5 +102,5 @@ type BackendConnector interface { // Exec executes a command that doesn't return rows, typically an INSERT, UPDATE, or DELETE. Exec(ctx context.Context, query string, args ...interface{}) error - Close() error + Close() error // TODO we should revisit returning error here }