diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 38920e11c..fad56fcb0 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -8,6 +8,7 @@ import ( "fmt" "quesma/end_user_errors" "quesma/logger" + "quesma/model" "quesma/persistence" "quesma/quesma/config" "quesma/quesma/recovery" @@ -69,6 +70,8 @@ type ( type LogManagerIFace interface { ResolveIndexPattern(ctx context.Context, schema schema.Registry, pattern string) (results []string, err error) + FindTable(tableName string) (result *Table) + ProcessQuery(ctx context.Context, table *Table, query *model.Query) (rows []model.QueryResultRow, performanceResult PerformanceResult, err error) } func NewTableMap() *TableMap { diff --git a/quesma/quesma/functionality/terms_enum/terms_enum.go b/quesma/quesma/functionality/terms_enum/terms_enum.go index 785ef0d67..d334441ee 100644 --- a/quesma/quesma/functionality/terms_enum/terms_enum.go +++ b/quesma/quesma/functionality/terms_enum/terms_enum.go @@ -20,7 +20,7 @@ import ( "time" ) -func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *clickhouse.LogManager, +func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm clickhouse.LogManagerIFace, schemaRegistry schema.Registry, qmc diag.DebugInfoCollector) ([]byte, error) { if indices, err := lm.ResolveIndexPattern(ctx, schemaRegistry, index); err != nil || len(indices) != 1 { // multi index terms enum is not yet supported errorMsg := fmt.Sprintf("terms enum failed - could not resolve table name for index: %s", index) @@ -37,7 +37,7 @@ func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *cli } } -func handleTermsEnumRequest(ctx context.Context, body types.JSON, lm *clickhouse.LogManager, qt *queryparser.ClickhouseQueryTranslator, +func handleTermsEnumRequest(ctx context.Context, body types.JSON, lm clickhouse.LogManagerIFace, qt *queryparser.ClickhouseQueryTranslator, qmc diag.DebugInfoCollector) (result []byte, err error) { startTime := time.Now() diff --git a/quesma/quesma/route_handlers.go b/quesma/quesma/route_handlers.go index 5b7c49978..93f27bf2a 100644 --- a/quesma/quesma/route_handlers.go +++ b/quesma/quesma/route_handlers.go @@ -9,11 +9,13 @@ import ( "net/http" "quesma/clickhouse" "quesma/elasticsearch" + "quesma/logger" "quesma/queryparser" "quesma/quesma/config" quesma_errors "quesma/quesma/errors" "quesma/quesma/functionality/field_capabilities" "quesma/quesma/functionality/resolve" + "quesma/quesma/functionality/terms_enum" "quesma/quesma/types" "quesma/schema" quesma_api "quesma_v2/core" @@ -121,3 +123,61 @@ func HandleFieldCaps(ctx context.Context, indexPattern string, allowNoIndices, i } return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil } + +func HandlePutIndex(index string, reqBody types.JSON, sr schema.Registry) (*quesma_api.Result, error) { + if len(reqBody) == 0 { + logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index) + return putIndexResult(index) + } + + mappings, ok := reqBody["mappings"] + if !ok { + logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, reqBody) + return putIndexResult(index) + } + columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{})) + + sr.UpdateDynamicConfiguration(schema.IndexName(index), schema.Table{Columns: columns}) + + return putIndexResult(index) +} + +func HandleGetIndex(sr schema.Registry, index string) (*quesma_api.Result, error) { + foundSchema, found := sr.FindSchema(schema.IndexName(index)) + if !found { + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } + + hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) + mappings := elasticsearch.GenerateMappings(hierarchicalSchema) + + return getIndexResult(index, mappings) +} + +func HandleTermsEnum(ctx context.Context, indexPattern string, body types.JSON, lm clickhouse.LogManagerIFace, sr schema.Registry, dependencies quesma_api.Dependencies) (*quesma_api.Result, error) { + if responseBody, err := terms_enum.HandleTermsEnum(ctx, indexPattern, body, lm, sr, dependencies.DebugInfoCollector()); err != nil { + return nil, err + } else { + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + } +} + +func HandleClusterHealth() (*quesma_api.Result, error) { + return ElasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil +} + +func HandleIndexRefresh() (*quesma_api.Result, error) { + return ElasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil +} + +func HandleGetIndexMapping(sr schema.Registry, index string) (*quesma_api.Result, error) { + foundSchema, found := sr.FindSchema(schema.IndexName(index)) + if !found { + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } + + hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) + mappings := elasticsearch.GenerateMappings(hierarchicalSchema) + + return getIndexMappingResult(index, mappings) +} diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index 9ea9f1fc4..a0e7f37be 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -505,6 +505,11 @@ type countResult struct { Count int64 `json:"count"` } +// ElasticsearchQueryResult is a low-effort way to export widely used func without too much refactoring +func ElasticsearchQueryResult(body string, statusCode int) *quesma_api.Result { + return elasticsearchQueryResult(body, statusCode) +} + func elasticsearchQueryResult(body string, statusCode int) *quesma_api.Result { return &quesma_api.Result{Body: body, Meta: map[string]any{ // TODO copy paste from the original request @@ -578,6 +583,11 @@ func bulkInsertResult(ctx context.Context, ops []bulk.BulkItem, err error) (*que return elasticsearchInsertResult(string(body), http.StatusOK), nil } +// ElasticsearchInsertResult is a low-effort way to export widely used func without too much refactoring +func ElasticsearchInsertResult(body string, statusCode int) *quesma_api.Result { + return elasticsearchInsertResult(body, statusCode) +} + func elasticsearchInsertResult(body string, statusCode int) *quesma_api.Result { return &quesma_api.Result{Body: body, Meta: map[string]any{ // TODO copy paste from the original request diff --git a/quesma/quesma/router_v2.go b/quesma/quesma/router_v2.go index 9678403ce..95aeda744 100644 --- a/quesma/quesma/router_v2.go +++ b/quesma/quesma/router_v2.go @@ -17,7 +17,6 @@ import ( "quesma/quesma/errors" "quesma/quesma/functionality/bulk" "quesma/quesma/functionality/doc" - "quesma/quesma/functionality/terms_enum" "quesma/quesma/types" "quesma/schema" "quesma/table_resolver" @@ -154,11 +153,11 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm // This is current limitation of the router. router.Register(routes.ClusterHealthPath, method("GET"), func(_ context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { - return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil + return HandleClusterHealth() }) router.Register(routes.IndexRefreshPath, and(method("POST"), matchedExactQueryPath(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { - return elasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil + return HandleIndexRefresh() }) router.Register(routes.ResolveIndexPath, method("GET"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { @@ -223,42 +222,18 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm }) router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { - + index := req.Params["index"] switch req.Method { - case "GET": - index := req.Params["index"] - - foundSchema, found := sr.FindSchema(schema.IndexName(index)) - if !found { - return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil - } - - hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) - mappings := elasticsearch.GenerateMappings(hierarchicalSchema) - - return getIndexMappingResult(index, mappings) - + return HandleGetIndexMapping(sr, index) case "PUT": - index := req.Params["index"] - - err := elasticsearch.IsValidIndexName(index) - if err != nil { - return nil, err - } - - body, err := types.ExpectJSON(req.ParsedBody) - if err != nil { + if body, err := types.ExpectJSON(req.ParsedBody); err != nil { return nil, err + } else { + return HandlePutIndex(index, body, sr) } - - columns := elasticsearch.ParseMappings("", body) - sr.UpdateDynamicConfiguration(schema.IndexName(index), schema.Table{Columns: columns}) - return putIndexResult(index) } - return nil, errors.New("unsupported method") - }) router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { @@ -282,25 +257,15 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm cfg.IndexConfig, sr, lm) }) router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { - - if strings.Contains(req.Params["index"], ",") { + indexPattern := req.Params["index"] + if strings.Contains(indexPattern, ",") { return nil, errors.New("multi index terms enum is not yet supported") - } else { - - var body types.JSON - switch b := req.ParsedBody.(type) { - case types.JSON: - body = b - default: - return nil, errors.New("invalid request body, expecting JSON") - } - - if responseBody, err := terms_enum.HandleTermsEnum(ctx, req.Params["index"], body, lm, sr, dependencies.DebugInfoCollector()); err != nil { - return nil, err - } else { - return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil - } } + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return nil, errors.New("invalid request body, expecting JSON") + } + return HandleTermsEnum(ctx, indexPattern, body, lm, sr, dependencies) }) router.Register(routes.EQLSearch, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { @@ -321,52 +286,20 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm }) router.Register(routes.IndexPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { - + index := req.Params["index"] switch req.Method { - case "GET": - index := req.Params["index"] - - foundSchema, found := sr.FindSchema(schema.IndexName(index)) - if !found { - return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil - } - - hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) - mappings := elasticsearch.GenerateMappings(hierarchicalSchema) - - return getIndexResult(index, mappings) - + return HandleGetIndex(sr, index) case "PUT": - - index := req.Params["index"] if req.Body == "" { - logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index) - return putIndexResult(index) + return HandlePutIndex(index, types.JSON{}, sr) } - - err := elasticsearch.IsValidIndexName(index) - if err != nil { - return nil, err - } - - body, err := types.ExpectJSON(req.ParsedBody) - if err != nil { + if body, err := types.ExpectJSON(req.ParsedBody); err != nil { return nil, err + } else { + return HandlePutIndex(index, body, sr) } - - mappings, ok := body["mappings"] - if !ok { - logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, req.Body) - return putIndexResult(index) - } - columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{})) - - sr.UpdateDynamicConfiguration(schema.IndexName(index), schema.Table{Columns: columns}) - - return putIndexResult(index) } - return nil, errors.New("unsupported method") })