Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More router_v2 portability (terms_enum + minor additions) #1153

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"quesma/end_user_errors"
"quesma/logger"
"quesma/model"
"quesma/persistence"
"quesma/quesma/config"
"quesma/quesma/recovery"
Expand Down Expand Up @@ -69,6 +70,8 @@ type (

type LogManagerIFace interface {
ResolveIndexPattern(ctx context.Context, schema schema.Registry, pattern string) (results []string, err error)
FindTable(tableName string) (result *Table)
ProcessQuery(ctx context.Context, table *Table, query *model.Query) (rows []model.QueryResultRow, performanceResult PerformanceResult, err error)
}

func NewTableMap() *TableMap {
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/functionality/terms_enum/terms_enum.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"
)

func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *clickhouse.LogManager,
func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm clickhouse.LogManagerIFace,
schemaRegistry schema.Registry, qmc diag.DebugInfoCollector) ([]byte, error) {
if indices, err := lm.ResolveIndexPattern(ctx, schemaRegistry, index); err != nil || len(indices) != 1 { // multi index terms enum is not yet supported
errorMsg := fmt.Sprintf("terms enum failed - could not resolve table name for index: %s", index)
Expand All @@ -37,7 +37,7 @@ func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *cli
}
}

func handleTermsEnumRequest(ctx context.Context, body types.JSON, lm *clickhouse.LogManager, qt *queryparser.ClickhouseQueryTranslator,
func handleTermsEnumRequest(ctx context.Context, body types.JSON, lm clickhouse.LogManagerIFace, qt *queryparser.ClickhouseQueryTranslator,
qmc diag.DebugInfoCollector) (result []byte, err error) {
startTime := time.Now()

Expand Down
60 changes: 60 additions & 0 deletions quesma/quesma/route_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"net/http"
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/logger"
"quesma/queryparser"
"quesma/quesma/config"
quesma_errors "quesma/quesma/errors"
"quesma/quesma/functionality/field_capabilities"
"quesma/quesma/functionality/resolve"
"quesma/quesma/functionality/terms_enum"
"quesma/quesma/types"
"quesma/schema"
quesma_api "quesma_v2/core"
Expand Down Expand Up @@ -121,3 +123,61 @@ func HandleFieldCaps(ctx context.Context, indexPattern string, allowNoIndices, i
}
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}

func HandlePutIndex(index string, reqBody types.JSON, sr schema.Registry) (*quesma_api.Result, error) {
if len(reqBody) == 0 {
logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index)
return putIndexResult(index)
}

mappings, ok := reqBody["mappings"]
if !ok {
logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, reqBody)
return putIndexResult(index)
}
columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{}))

sr.UpdateDynamicConfiguration(schema.IndexName(index), schema.Table{Columns: columns})

return putIndexResult(index)
}

func HandleGetIndex(sr schema.Registry, index string) (*quesma_api.Result, error) {
foundSchema, found := sr.FindSchema(schema.IndexName(index))
if !found {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)

return getIndexResult(index, mappings)
}

func HandleTermsEnum(ctx context.Context, indexPattern string, body types.JSON, lm clickhouse.LogManagerIFace, sr schema.Registry, dependencies quesma_api.Dependencies) (*quesma_api.Result, error) {
if responseBody, err := terms_enum.HandleTermsEnum(ctx, indexPattern, body, lm, sr, dependencies.DebugInfoCollector()); err != nil {
return nil, err
} else {
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}
}

func HandleClusterHealth() (*quesma_api.Result, error) {
return ElasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil
}

func HandleIndexRefresh() (*quesma_api.Result, error) {
return ElasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil
}

func HandleGetIndexMapping(sr schema.Registry, index string) (*quesma_api.Result, error) {
foundSchema, found := sr.FindSchema(schema.IndexName(index))
if !found {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)

return getIndexMappingResult(index, mappings)
}
10 changes: 10 additions & 0 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,11 @@ type countResult struct {
Count int64 `json:"count"`
}

// ElasticsearchQueryResult is a low-effort way to export widely used func without too much refactoring
func ElasticsearchQueryResult(body string, statusCode int) *quesma_api.Result {
return elasticsearchQueryResult(body, statusCode)
}

func elasticsearchQueryResult(body string, statusCode int) *quesma_api.Result {
return &quesma_api.Result{Body: body, Meta: map[string]any{
// TODO copy paste from the original request
Expand Down Expand Up @@ -578,6 +583,11 @@ func bulkInsertResult(ctx context.Context, ops []bulk.BulkItem, err error) (*que
return elasticsearchInsertResult(string(body), http.StatusOK), nil
}

// ElasticsearchInsertResult is a low-effort way to export widely used func without too much refactoring
func ElasticsearchInsertResult(body string, statusCode int) *quesma_api.Result {
return elasticsearchInsertResult(body, statusCode)
}

func elasticsearchInsertResult(body string, statusCode int) *quesma_api.Result {
return &quesma_api.Result{Body: body, Meta: map[string]any{
// TODO copy paste from the original request
Expand Down
107 changes: 20 additions & 87 deletions quesma/quesma/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"quesma/quesma/errors"
"quesma/quesma/functionality/bulk"
"quesma/quesma/functionality/doc"
"quesma/quesma/functionality/terms_enum"
"quesma/quesma/types"
"quesma/schema"
"quesma/table_resolver"
Expand Down Expand Up @@ -154,11 +153,11 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
// This is current limitation of the router.

router.Register(routes.ClusterHealthPath, method("GET"), func(_ context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil
return HandleClusterHealth()
})

router.Register(routes.IndexRefreshPath, and(method("POST"), matchedExactQueryPath(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
return elasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil
return HandleIndexRefresh()
})

router.Register(routes.ResolveIndexPath, method("GET"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
Expand Down Expand Up @@ -223,42 +222,18 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
})

router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

index := req.Params["index"]
switch req.Method {

case "GET":
index := req.Params["index"]

foundSchema, found := sr.FindSchema(schema.IndexName(index))
if !found {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)

return getIndexMappingResult(index, mappings)

return HandleGetIndexMapping(sr, index)
case "PUT":
index := req.Params["index"]

err := elasticsearch.IsValidIndexName(index)
if err != nil {
return nil, err
}

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
if body, err := types.ExpectJSON(req.ParsedBody); err != nil {
return nil, err
} else {
return HandlePutIndex(index, body, sr)
}

columns := elasticsearch.ParseMappings("", body)
sr.UpdateDynamicConfiguration(schema.IndexName(index), schema.Table{Columns: columns})
return putIndexResult(index)
}

return nil, errors.New("unsupported method")

})

router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
Expand All @@ -282,25 +257,15 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
cfg.IndexConfig, sr, lm)
})
router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

if strings.Contains(req.Params["index"], ",") {
indexPattern := req.Params["index"]
if strings.Contains(indexPattern, ",") {
return nil, errors.New("multi index terms enum is not yet supported")
} else {

var body types.JSON
switch b := req.ParsedBody.(type) {
case types.JSON:
body = b
default:
return nil, errors.New("invalid request body, expecting JSON")
}

if responseBody, err := terms_enum.HandleTermsEnum(ctx, req.Params["index"], body, lm, sr, dependencies.DebugInfoCollector()); err != nil {
return nil, err
} else {
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
}
}
body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
return nil, errors.New("invalid request body, expecting JSON")
}
return HandleTermsEnum(ctx, indexPattern, body, lm, sr, dependencies)
})

router.Register(routes.EQLSearch, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
Expand All @@ -321,52 +286,20 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
})

router.Register(routes.IndexPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {

index := req.Params["index"]
switch req.Method {

case "GET":
index := req.Params["index"]

foundSchema, found := sr.FindSchema(schema.IndexName(index))
if !found {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)

return getIndexResult(index, mappings)

return HandleGetIndex(sr, index)
case "PUT":

index := req.Params["index"]
if req.Body == "" {
logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index)
return putIndexResult(index)
return HandlePutIndex(index, types.JSON{}, sr)
}

err := elasticsearch.IsValidIndexName(index)
if err != nil {
return nil, err
}

body, err := types.ExpectJSON(req.ParsedBody)
if err != nil {
if body, err := types.ExpectJSON(req.ParsedBody); err != nil {
return nil, err
} else {
return HandlePutIndex(index, body, sr)
}

mappings, ok := body["mappings"]
if !ok {
logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, req.Body)
return putIndexResult(index)
}
columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{}))

sr.UpdateDynamicConfiguration(schema.IndexName(index), schema.Table{Columns: columns})

return putIndexResult(index)
}

return nil, errors.New("unsupported method")
})

Expand Down
Loading