Skip to content

Commit

Permalink
Reimplementing ResolveField with schema and moving it to QueryParser (#…
Browse files Browse the repository at this point in the history
…453)

This PR:
- Moves `ResolveField` to `QueryParser` to reduce coupling
- Reimplement it, taking schema into account
  • Loading branch information
pdelewski authored Jul 5, 2024
1 parent b961b09 commit 0d2d5dc
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 56 deletions.
31 changes: 1 addition & 30 deletions quesma/clickhouse/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ func (t *Table) FullTableName() string {
// GetDateTimeType returns type of a field (currently DateTime/DateTime64), if it's a DateTime type. Invalid otherwise.
// Timestamp from config defaults to DateTime64.
func (t *Table) GetDateTimeType(ctx context.Context, fieldName string) DateTimeType {
fieldName = t.ResolveField(ctx, fieldName)
if col, ok := t.Cols[fieldName]; ok {
typeName := col.Type.String()
// hasPrefix, not equal, because we can have DateTime64(3) and we want to catch it
Expand All @@ -178,13 +177,6 @@ func (t *Table) GetDateTimeType(ctx context.Context, fieldName string) DateTimeT
return Invalid
}

func (t *Table) GetDateTimeTypeFromSelectClause(ctx context.Context, expr model.Expr) DateTimeType {
if ref, ok := expr.(model.ColumnRef); ok {
return t.GetDateTimeType(ctx, ref.ColumnName)
}
return Invalid
}

// applyIndexConfig applies full text search and alias configuration to the table
func (t *Table) applyIndexConfig(configuration config.QuesmaConfiguration) {
for _, c := range t.Cols {
Expand All @@ -209,27 +201,7 @@ func (t *Table) applyIndexConfig(configuration config.QuesmaConfiguration) {

}

// deprecated
func (t *Table) ResolveField(ctx context.Context, fieldName string) (field string) {
// Alias resolution should occur *after* the query is parsed, not during the parsing
field = fieldName
if t.aliases != nil {
if alias, ok := t.aliases[fieldName]; ok {
field = alias
}
}

if field != "*" && field != "_all" && field != "_doc" && field != "_id" && field != "_index" {
if _, ok := t.Cols[field]; !ok {
logger.DebugWithCtx(ctx).Msgf("field '%s' referenced, but not found in table '%s'", fieldName, t.Name)
}
}

return
}

func (t *Table) HasColumn(ctx context.Context, fieldName string) bool {
fieldName = t.ResolveField(ctx, fieldName)
return t.Cols[fieldName] != nil
}

Expand Down Expand Up @@ -269,8 +241,7 @@ func (t *Table) GetAttributesList() []Attribute {
// TODO Won't work with tuples, e.g. trying to access via tupleName.tupleField will return NotExists,
// instead of some other response. Fix this when needed (we seem to not need tuples right now)
func (t *Table) GetFieldInfo(ctx context.Context, fieldName string) FieldInfo {
resolvedFieldName := t.ResolveField(ctx, fieldName)
col, ok := t.Cols[resolvedFieldName]
col, ok := t.Cols[fieldName]
if !ok {
return NotExists
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/aggregation_date_range_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(dateRange QueryMa

if field, exists := dateRange["field"]; exists {
if fieldNameRaw, ok := field.(string); ok {
fieldName = cw.Table.ResolveField(cw.Ctx, fieldNameRaw)
fieldName = cw.ResolveField(cw.Ctx, fieldNameRaw)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("field specified for date range aggregation is not a string. Using empty. Querymap: %v", dateRange)
}
Expand Down
10 changes: 6 additions & 4 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,10 +528,11 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m
for k, v := range queryMap {
if slices.Contains(metricsAggregations, k) {
field, isFromScript := cw.parseFieldFieldMaybeScript(v, k)

return metricsAggregation{
AggrType: k,
Fields: []model.Expr{field},
FieldType: cw.Table.GetDateTimeTypeFromSelectClause(cw.Ctx, field),
FieldType: cw.GetDateTimeTypeFromSelectClause(cw.Ctx, field),
IsFieldNameCompound: isFromScript,
}, true
}
Expand All @@ -543,10 +544,11 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m
logger.WarnWithCtx(cw.Ctx).Msgf("percentiles is not a map, but %T, value: %v. Using empty map.", percentile, percentile)
}
field, keyed, percentiles := cw.parsePercentilesAggregation(percentileMap)

return metricsAggregation{
AggrType: "quantile",
Fields: []model.Expr{field},
FieldType: cw.Table.GetDateTimeTypeFromSelectClause(cw.Ctx, field),
FieldType: cw.GetDateTimeTypeFromSelectClause(cw.Ctx, field),
Percentiles: percentiles,
Keyed: keyed,
}, true
Expand Down Expand Up @@ -909,7 +911,7 @@ func (cw *ClickhouseQueryTranslator) parseFieldField(shouldBeMap any, aggregatio
}
if fieldRaw, ok := Map["field"]; ok {
if field, ok := fieldRaw.(string); ok {
return model.NewColumnRef(cw.Table.ResolveField(cw.Ctx, field)) // model.NewSelectColumnTableField(cw.Table.ResolveField(cw.Ctx, field)) // remove this resolve? we do all transforms after parsing is done?
return model.NewColumnRef(cw.ResolveField(cw.Ctx, field)) // model.NewSelectColumnTableField(cw.Table.ResolveField(cw.Ctx, field)) // remove this resolve? we do all transforms after parsing is done?
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("field is not a string, but %T, value: %v", fieldRaw, fieldRaw)
}
Expand Down Expand Up @@ -939,7 +941,7 @@ func (cw *ClickhouseQueryTranslator) parseFieldFieldMaybeScript(shouldBeMap any,
// maybe "field" field
if fieldRaw, ok := Map["field"]; ok {
if field, ok := fieldRaw.(string); ok {
return model.NewColumnRef(cw.Table.ResolveField(cw.Ctx, field)), true // remove this resolve? we do all transforms after parsing is done?
return model.NewColumnRef(cw.ResolveField(cw.Ctx, field)), true // remove this resolve? we do all transforms after parsing is done?
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("field is not a string, but %T, value: %v", fieldRaw, fieldRaw)
}
Expand Down
82 changes: 65 additions & 17 deletions quesma/queryparser/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package queryparser

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -13,6 +14,7 @@ import (
"quesma/model/typical_queries"
"quesma/queryparser/lucene"
"quesma/quesma/types"
"quesma/schema"
"quesma/util"
"strconv"
"strings"
Expand Down Expand Up @@ -252,7 +254,7 @@ func (cw *ClickhouseQueryTranslator) parseMetadata(queryMap QueryMap) QueryMap {
}

func (cw *ClickhouseQueryTranslator) ParseAutocomplete(indexFilter *QueryMap, fieldName string, prefix *string, caseIns bool) model.SimpleQuery {
fieldName = cw.Table.ResolveField(cw.Ctx, fieldName)
fieldName = cw.ResolveField(cw.Ctx, fieldName)
canParse := true
stmts := make([]model.Expr, 0)
if indexFilter != nil {
Expand Down Expand Up @@ -561,7 +563,7 @@ func (cw *ClickhouseQueryTranslator) parseMatch(queryMap QueryMap, matchPhrase b
}

for fieldName, v := range queryMap {
fieldName = cw.Table.ResolveField(cw.Ctx, fieldName)
fieldName = cw.ResolveField(cw.Ctx, fieldName)
// (fieldName, v) = either e.g. ("message", "this is a test")
// or ("message", map["query": "this is a test", ...]). Here we only care about "query" until we find a case where we need more.
vUnNested := v
Expand Down Expand Up @@ -661,7 +663,7 @@ func (cw *ClickhouseQueryTranslator) parsePrefix(queryMap QueryMap) model.Simple
}

for fieldName, v := range queryMap {
fieldName = cw.Table.ResolveField(cw.Ctx, fieldName)
fieldName = cw.ResolveField(cw.Ctx, fieldName)
switch vCasted := v.(type) {
case string:
simpleStat := model.NewInfixExpr(model.NewColumnRef(fieldName), "iLIKE", model.NewLiteral("'"+vCasted+"%'"))
Expand Down Expand Up @@ -691,7 +693,7 @@ func (cw *ClickhouseQueryTranslator) parseWildcard(queryMap QueryMap) model.Simp
}

for fieldName, v := range queryMap {
fieldName = cw.Table.ResolveField(cw.Ctx, fieldName)
fieldName = cw.ResolveField(cw.Ctx, fieldName)
if vAsMap, ok := v.(QueryMap); ok {
if value, ok := vAsMap["value"]; ok {
if valueAsString, ok := value.(string); ok {
Expand Down Expand Up @@ -781,7 +783,7 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ
}

for field, v := range queryMap {
field = cw.Table.ResolveField(cw.Ctx, field)
field = cw.ResolveField(cw.Ctx, field)
stmts := make([]model.Expr, 0)
if _, ok := v.(QueryMap); !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("invalid range type: %T, value: %v", v, v)
Expand All @@ -797,7 +799,7 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ
v := v.(QueryMap)[op]
var timeFormatFuncName string
var finalLHS, valueToCompare model.Expr
fieldType := cw.Table.GetDateTimeType(cw.Ctx, field)
fieldType := cw.Table.GetDateTimeType(cw.Ctx, cw.ResolveField(cw.Ctx, field))
vToPrint := sprint(v)
valueToCompare = model.NewLiteral(vToPrint)
finalLHS = model.NewColumnRef(field)
Expand Down Expand Up @@ -875,7 +877,7 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ

// parseDateTimeString returns string used to parse DateTime in Clickhouse (depends on column type)
func (cw *ClickhouseQueryTranslator) parseDateTimeString(table *clickhouse.Table, field, dateTime string) (string, string) {
typ := table.GetDateTimeType(cw.Ctx, field)
typ := table.GetDateTimeType(cw.Ctx, cw.ResolveField(cw.Ctx, field))
switch typ {
case clickhouse.DateTime64:
return "parseDateTime64BestEffort('" + dateTime + "')", "parseDateTime64BestEffort"
Expand All @@ -900,10 +902,10 @@ func (cw *ClickhouseQueryTranslator) parseExists(queryMap QueryMap) model.Simple
logger.WarnWithCtx(cw.Ctx).Msgf("invalid exists type: %T, value: %v", v, v)
return model.NewSimpleQuery(nil, false)
}
fieldName = cw.Table.ResolveField(cw.Ctx, fieldName)
fieldName = cw.ResolveField(cw.Ctx, fieldName)
fieldNameQuoted := strconv.Quote(fieldName)

switch cw.Table.GetFieldInfo(cw.Ctx, fieldName) {
switch cw.Table.GetFieldInfo(cw.Ctx, cw.ResolveField(cw.Ctx, fieldName)) {
case clickhouse.ExistsAndIsBaseType:
sql = model.NewInfixExpr(model.NewColumnRef(fieldName), "IS", model.NewLiteral("NOT NULL"))
case clickhouse.ExistsAndIsArray:
Expand All @@ -923,7 +925,7 @@ func (cw *ClickhouseQueryTranslator) parseExists(queryMap QueryMap) model.Simple
}
sql = model.Or(stmts)
default:
logger.WarnWithCtx(cw.Ctx).Msgf("invalid field type: %T for exists: %s", cw.Table.GetFieldInfo(cw.Ctx, fieldName), fieldName)
logger.WarnWithCtx(cw.Ctx).Msgf("invalid field type: %T for exists: %s", cw.Table.GetFieldInfo(cw.Ctx, cw.ResolveField(cw.Ctx, fieldName)), fieldName)
}
}
return model.NewSimpleQuery(sql, true)
Expand Down Expand Up @@ -1002,7 +1004,7 @@ func (cw *ClickhouseQueryTranslator) extractFields(fields []interface{}) []strin
if fieldStr == "*" {
return cw.Table.GetFulltextFields()
}
fieldStr = cw.Table.ResolveField(cw.Ctx, fieldStr)
fieldStr = cw.ResolveField(cw.Ctx, fieldStr)
result = append(result, fieldStr)
}
return result
Expand Down Expand Up @@ -1093,7 +1095,7 @@ func (cw *ClickhouseQueryTranslator) isItFacetsRequest(queryMap QueryMap) (model
return model.NewSearchQueryInfoNormal(), false
}
fieldName = strings.TrimSuffix(fieldName, ".keyword")
fieldName = cw.Table.ResolveField(cw.Ctx, fieldName)
fieldName = cw.ResolveField(cw.Ctx, fieldName)

secondNestingMap, ok := queryMap["sampler"].(QueryMap)
if !ok {
Expand Down Expand Up @@ -1183,7 +1185,7 @@ func (cw *ClickhouseQueryTranslator) isItListRequest(queryMap QueryMap) (model.S
}
}

resolvedField := cw.Table.ResolveField(cw.Ctx, fieldName)
resolvedField := cw.ResolveField(cw.Ctx, fieldName)
if resolvedField == "*" {
return model.SearchQueryInfo{Typ: model.ListAllFields, RequestedFields: []string{"*"}, FieldName: "*", I1: 0, I2: size}, true
}
Expand Down Expand Up @@ -1229,11 +1231,11 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns

// sortMap has only 1 key, so we can just iterate over it
for k, v := range sortMap {
if strings.HasPrefix(k, "_") && cw.Table.GetFieldInfo(cw.Ctx, k) == clickhouse.NotExists {
if strings.HasPrefix(k, "_") && cw.Table.GetFieldInfo(cw.Ctx, cw.ResolveField(cw.Ctx, k)) == clickhouse.NotExists {
// we're skipping ELK internal fields, like "_doc", "_id", etc.
continue
}
fieldName := cw.Table.ResolveField(cw.Ctx, k)
fieldName := cw.ResolveField(cw.Ctx, k)
switch v := v.(type) {
case QueryMap:
if order, ok := v["order"]; ok {
Expand Down Expand Up @@ -1263,7 +1265,7 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns
return sortColumns
case map[string]interface{}:
for fieldName, fieldValue := range sortMaps {
if strings.HasPrefix(fieldName, "_") && cw.Table.GetFieldInfo(cw.Ctx, fieldName) == clickhouse.NotExists {
if strings.HasPrefix(fieldName, "_") && cw.Table.GetFieldInfo(cw.Ctx, cw.ResolveField(cw.Ctx, fieldName)) == clickhouse.NotExists {
// TODO Elastic internal fields will need to be supported in the future
continue
}
Expand All @@ -1280,7 +1282,7 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns

case map[string]string:
for fieldName, fieldValue := range sortMaps {
if strings.HasPrefix(fieldName, "_") && cw.Table.GetFieldInfo(cw.Ctx, fieldName) == clickhouse.NotExists {
if strings.HasPrefix(fieldName, "_") && cw.Table.GetFieldInfo(cw.Ctx, cw.ResolveField(cw.Ctx, fieldName)) == clickhouse.NotExists {
// TODO Elastic internal fields will need to be supported in the future
continue
}
Expand Down Expand Up @@ -1310,6 +1312,45 @@ func createSortColumn(fieldName, ordering string) (model.OrderByExpr, error) {
}
}

// ResolveField resolves field name to internal name
// For now, it's part of QueryParser, however, it can
// be part of transformation pipeline in the future
// What prevents us from moving it to transformation pipeline now, is that
// we need to anotate this field somehow in the AST, to be able
// to distinguish it from other fields
func (cw *ClickhouseQueryTranslator) ResolveField(ctx context.Context, fieldName string) (field string) {
// Alias resolution should occur *after* the query is parsed, not during the parsing
if cw.SchemaRegistry == nil {
logger.Error().Msg("Schema registry is not set")
field = fieldName
return
}
schemaInstance, exists := cw.SchemaRegistry.FindSchema(schema.TableName(cw.Table.Name))
if !exists {
logger.Error().Msgf("Schema fot table %s not found", cw.Table.Name)
field = fieldName
return
}
if value, ok := schemaInstance.Fields[schema.FieldName(fieldName)]; ok {
field = value.InternalPropertyName.AsString()
} else {
// fallback to original field name
logger.DebugWithCtx(ctx).Msgf("field '%s' referenced, but not found in schema", fieldName)
field = fieldName
}

// Check aliases
if value, ok := schemaInstance.Aliases[schema.FieldName(fieldName)]; ok {
field = value.AsString()
}

if field != "*" && field != "_all" && field != "_doc" && field != "_id" && field != "_index" {
if _, ok := schemaInstance.Fields[schema.FieldName(field)]; !ok {
logger.DebugWithCtx(ctx).Msgf("field '%s' referenced, but not found in schema", fieldName)
}
}
return
}
func (cw *ClickhouseQueryTranslator) parseSizeExists(queryMap QueryMap) (size int, ok bool) {
sizeRaw, exists := queryMap["size"]
if !exists {
Expand All @@ -1333,3 +1374,10 @@ func (cw *ClickhouseQueryTranslator) parseSize(queryMap QueryMap, defaultSize in
return defaultSize
}
}

func (cw *ClickhouseQueryTranslator) GetDateTimeTypeFromSelectClause(ctx context.Context, expr model.Expr) clickhouse.DateTimeType {
if ref, ok := expr.(model.ColumnRef); ok {
return cw.Table.GetDateTimeType(ctx, cw.ResolveField(ctx, ref.ColumnName))
}
return clickhouse.Invalid
}
2 changes: 1 addition & 1 deletion quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func (cw *ClickhouseQueryTranslator) createHistogramPartOfQuery(queryMap QueryMa
if err != nil {
logger.ErrorWithCtx(cw.Ctx).Msg(err.Error())
}
dateTimeType := cw.Table.GetDateTimeTypeFromSelectClause(cw.Ctx, field)
dateTimeType := cw.GetDateTimeTypeFromSelectClause(cw.Ctx, field)
if dateTimeType == clickhouse.Invalid {
logger.ErrorWithCtx(cw.Ctx).Msgf("invalid date type for field %+v. Using DateTime64 as default.", field)
dateTimeType = defaultDateTimeType
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/top_metrics_aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (cw *ClickhouseQueryTranslator) getFieldNames(fields []interface{}) (exprs
for _, field := range fields {
if fName, ok := field.(QueryMap)["field"]; ok {
if fieldName, ok := fName.(string); ok {
exprs = append(exprs, model.NewColumnRef(cw.Table.ResolveField(cw.Ctx, fieldName)))
exprs = append(exprs, model.NewColumnRef(cw.ResolveField(cw.Ctx, fieldName)))
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("field %v is not a string (type: %T). Might be correct, might not. Check it out.", fName, fName)
}
Expand Down
8 changes: 6 additions & 2 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin

if canParse {
if len(queries) > 0 && query_util.IsNonAggregationQuery(queries[0]) {
if properties := q.findNonexistingProperties(queries[0], table); len(properties) > 0 {
if properties := q.findNonexistingProperties(queries[0], table, queryTranslator); len(properties) > 0 {
logger.DebugWithCtx(ctx).Msgf("properties %s not found in table %s", properties, table.Name)
if elasticsearch.IsIndexPattern(indexPattern) {
return queryparser.EmptySearchResponse(ctx), nil
Expand Down Expand Up @@ -636,7 +636,7 @@ func (q *QueryRunner) Close() {
logger.Info().Msg("queryRunner Stopped")
}

func (q *QueryRunner) findNonexistingProperties(query *model.Query, table *clickhouse.Table) []string {
func (q *QueryRunner) findNonexistingProperties(query *model.Query, table *clickhouse.Table, queryTranslator IQueryTranslator) []string {
// this is not fully correct, but we keep it backward compatible
var results = make([]string, 0)
var allReferencedFields = make([]string, 0)
Expand All @@ -648,6 +648,10 @@ func (q *QueryRunner) findNonexistingProperties(query *model.Query, table *click
allReferencedFields = append(allReferencedFields, query.SelectCommand.OrderByFieldNames()...)

for _, property := range allReferencedFields {
queryTranslatorValue, ok := queryTranslator.(*queryparser.ClickhouseQueryTranslator)
if ok {
property = queryTranslatorValue.ResolveField(q.executionCtx, property)
}
if property != "*" && !table.HasColumn(q.executionCtx, property) {
results = append(results, property)
}
Expand Down

0 comments on commit 0d2d5dc

Please sign in to comment.