diff --git a/quesma/model/DUPA_query.go b/quesma/model/DUPA_query.go new file mode 100644 index 000000000..47c56dd44 --- /dev/null +++ b/quesma/model/DUPA_query.go @@ -0,0 +1,405 @@ +package model + +/* +import ( + "context" + "fmt" + "mitmproxy/quesma/logger" + "sort" + "strconv" + "strings" +) + +const RowNumberColumnName = "row_number" +const EmptyFieldSelection = "''" // we can query SELECT '', that's why such quotes +const CountShortName = "cnt" + +type Highlighter struct { + Tokens []string + Fields map[string]bool + + PreTags []string + PostTags []string +} + +// implements String() (now) and MakeResponse() interface (in the future (?)) +type Query struct { + IsDistinct bool // true <=> query is SELECT DISTINCT + Fields []string // Fields in 'SELECT Fields FROM ...' + NonSchemaFields []string // Fields that are not in schema, but are in 'SELECT ...', e.g. count() + WhereClause string // "WHERE ..." until next clause like GROUP BY/ORDER BY, etc. + GroupByFields []string // if not empty, we do GROUP BY GroupByFields... They are quoted if they are column names, unquoted if non-schema. So no quotes need to be added. + OrderBy []string // ORDER BY fields + SuffixClauses []string // LIMIT, etc. + FromClause string // usually just "tableName", or databaseName."tableName". Sometimes a subquery e.g. (SELECT ...) + TableName string + SubQueries []subQuery + OrderByCount bool + CanParse bool // true <=> query is valid + QueryInfo SearchQueryInfo + Highlighter Highlighter + NoDBQuery bool // true <=> we don't need query to DB here, true in some pipeline aggregations + Parent string // parent aggregation name, used in some pipeline aggregations + Aggregators []Aggregator // keeps names of aggregators, e.g. "0", "1", "2", "suggestions". Needed for JSON response. + Type QueryType + SubSelect string + Metadata JsonMap +} + +type subQuery struct { + sql string + innerJoin string + name string +} + +func newSubQuery(sql, innerJoin, name string) subQuery { + return subQuery{sql: sql, innerJoin: innerJoin, name: name} +} + +var NoMetadataField JsonMap = nil + +// returns string with * in SELECT +func (q *Query) String() string { + return q.stringCommon(q.allFields(), true) +} + +// returns string with SQL query +// colNames - list of columns (schema fields) for SELECT +func (q *Query) StringFromColumns(colNames []string, printSubQueries bool) string { + return q.stringCommon(colNames, printSubQueries) +} + +func (q *Query) stringCommon(selectSchemaFields []string, printSubQueries bool) string { + var sb strings.Builder + if len(q.SubQueries) > 0 && printSubQueries { + sb.WriteString("WITH ") + for i, sq := range q.SubQueries { + sb.WriteString(sq.name + " AS (" + sq.sql + ")") + if i < len(q.SubQueries)-1 { + sb.WriteString(", ") + } + } + sb.WriteString(" ") + } + sb.WriteString("SELECT ") + if q.IsDistinct { + sb.WriteString("DISTINCT ") + } + sb.WriteString(strings.Join(selectSchemaFields, ", ")) + sb.WriteString(" FROM " + q.FromClause + " ") //where + q.WhereClause + " ") + for i, sq := range q.SubQueries { + sb.WriteString("INNER JOIN " + sq.name + " ON " + sq.innerJoin + " ") + if i < len(q.SubQueries)-1 { + sb.WriteString("AND ") + } + } + if len(q.WhereClause) > 0 { + sb.WriteString("WHERE " + q.WhereClause + " ") + } + lastLetterIsSpace := true + if len(q.GroupByFields) > 0 { + sb.WriteString("GROUP BY ") + for i, field := range q.GroupByFields { + sb.WriteString(field) + if i < len(q.GroupByFields)-1 { + sb.WriteString(", ") + } + } + lastLetterIsSpace = false + } + if len(q.OrderBy) > 0 { + if !lastLetterIsSpace { + sb.WriteString(" ") + } + sb.WriteString("ORDER BY ") + for i, field := range q.OrderBy { + sb.WriteString(field) + if i < len(q.OrderBy)-1 { + sb.WriteString(", ") + } + } + } + if len(q.SuffixClauses) > 0 { + sb.WriteString(" " + strings.Join(q.SuffixClauses, " ")) + } + return sb.String() +} + +func (q *Query) IsWildcard() bool { + return len(q.Fields) == 1 && q.Fields[0] == "*" +} + +func (q *Query) allFields() []string { + fields := make([]string, 0, len(q.Fields)+len(q.NonSchemaFields)) + for _, field := range q.Fields { + if field == "*" { + fields = append(fields, "*") + } else { + fields = append(fields, strconv.Quote(field)) + } + } + for _, field := range q.NonSchemaFields { + fields = append(fields, field) + } + return fields +} + +func (q *Query) AddSubQueryFromCurrentState(ctx context.Context, subqueryNr int) { + queryName := q.subQueryName(subqueryNr) + + selectFields := make([]string, 0, len(q.Fields)+len(q.NonSchemaFields)+1) + for _, schemaField := range q.Fields { + if schemaField == "*" { + logger.WarnWithCtx(ctx).Msgf("Query with * shouldn't happen here. Skipping (query: %+v)", q) + continue + } + selectFields = append(selectFields, fmt.Sprintf(`"%s" AS "%s_%s"`, schemaField, queryName, schemaField)) + } + for i, nonSchemaField := range q.NonSchemaFields { + selectFields = append(selectFields, fmt.Sprintf(`%s AS "%s_ns_%d"`, nonSchemaField, queryName, i)) + } + selectFields = append(selectFields, fmt.Sprintf("count() AS %s", strconv.Quote(q.subQueryCountFieldName(subqueryNr)))) + sql := q.StringFromColumns(selectFields, false) + innerJoinParts := make([]string, 0, len(q.GroupByFields)) + for _, field := range q.Fields { + innerJoinParts = append(innerJoinParts, fmt.Sprintf(`"%s" = "%s_%s"`, field, queryName, field)) + // FIXME add support for non-schema fields + } + innerJoin := strings.Join(innerJoinParts, " AND ") + q.SubQueries = append(q.SubQueries, newSubQuery(sql, innerJoin, queryName)) +} + +func (q *Query) ReplaceLastOrderByWithSubQuery(subqueryNr int) { + queryName := q.subQueryName(subqueryNr) + idxToReplace := len(q.OrderBy) - 2 + replaceString := queryName + "_" + CountShortName + " desc" + q.OrderBy[idxToReplace] = replaceString +} + +func (q *Query) subQueryName(nr int) string { + return "subQuery" + strconv.Itoa(nr) +} + +func (q *Query) subQueryCountFieldName(nr int) string { + return q.subQueryName(nr) + "_" + CountShortName +} + +// CopyAggregationFields copies all aggregation fields from qwa to q +func (q *Query) CopyAggregationFields(qwa Query) { + q.GroupByFields = make([]string, len(qwa.GroupByFields)) + copy(q.GroupByFields, qwa.GroupByFields) + + q.Fields = make([]string, len(qwa.Fields)) + copy(q.Fields, qwa.Fields) + + q.NonSchemaFields = make([]string, len(qwa.NonSchemaFields)) + copy(q.NonSchemaFields, qwa.NonSchemaFields) + + q.SuffixClauses = make([]string, len(qwa.SuffixClauses)) + copy(q.SuffixClauses, qwa.SuffixClauses) + + q.Aggregators = make([]Aggregator, len(qwa.Aggregators)) + copy(q.Aggregators, qwa.Aggregators) +} + +// RemoveEmptyGroupBy removes EmptyFieldSelection from GroupByFields +func (q *Query) RemoveEmptyGroupBy() { + nonEmptyFields := make([]string, 0) + for _, field := range q.GroupByFields { + if field != EmptyFieldSelection { + nonEmptyFields = append(nonEmptyFields, field) + } + } + q.GroupByFields = nonEmptyFields +} + +// TrimKeywordFromFields trims .keyword from fields and group by fields +// In future probably handle it in a better way +func (q *Query) TrimKeywordFromFields(ctx context.Context) { + for i := range q.Fields { + if strings.HasSuffix(q.Fields[i], `.keyword"`) { + logger.WarnWithCtx(ctx).Msgf("trimming .keyword from field %s", q.Fields[i]) + q.Fields[i] = strings.TrimSuffix(q.Fields[i], `.keyword"`) + q.Fields[i] += `"` + } + } + for i := range q.GroupByFields { + if strings.HasSuffix(q.GroupByFields[i], `.keyword"`) { + logger.WarnWithCtx(ctx).Msgf("trimming .keyword from group by field %s", q.GroupByFields[i]) + q.GroupByFields[i] = strings.TrimSuffix(q.GroupByFields[i], `.keyword"`) + q.GroupByFields[i] += `"` + } + } + for i := range q.NonSchemaFields { + if strings.HasSuffix(q.NonSchemaFields[i], `.keyword"`) { + logger.WarnWithCtx(ctx).Msgf("trimming .keyword from group by field %s", q.GroupByFields[i]) + q.NonSchemaFields[i] = strings.TrimSuffix(q.NonSchemaFields[i], `.keyword"`) + q.NonSchemaFields[i] += `"` + } + } +} + +// Name returns the name of this aggregation (specifically, the last aggregator) +// So for nested aggregation {"a": {"b": {"c": this aggregation}}}, it returns "c". +// In some queries aggregations are referenced by full name, so "a>b>c", but so far this implementation seems sufficient. +func (q *Query) Name() string { + if len(q.Aggregators) == 0 { + return "" + } + return q.Aggregators[len(q.Aggregators)-1].Name +} + +// HasParentAggregation returns true <=> this aggregation has a parent aggregation, so there's no query to the DB, +// and results are calculated based on parent aggregation's results. +func (q *Query) HasParentAggregation() bool { + return q.NoDBQuery && len(q.Parent) > 0 // first condition should be enough, second just in case +} + +// IsChild returns true <=> this aggregation is a child of maybeParent (so maybeParent is its parent). +func (q *Query) IsChild(maybeParent Query) bool { + return q.HasParentAggregation() && q.Parent == maybeParent.Name() +} + +type Aggregator struct { + Name string + Empty bool // is this aggregator empty, so no buckets + Keyed bool // determines how results are returned in response's JSON + Filters bool // if true, this aggregator is a filters aggregator +} + +func NewAggregatorEmpty(name string) Aggregator { + return Aggregator{Name: name, Empty: true} +} + +type SearchQueryType int + +const ( + Facets SearchQueryType = iota + FacetsNumeric + ListByField + ListAllFields + CountAsync + Normal + None +) + +const DefaultSizeListQuery = 1000 // we use LIMIT 1000 in some simple list queries (SELECT ...) + +func (queryType SearchQueryType) String() string { + return []string{"Facets", "FacetsNumeric", "ListByField", "ListAllFields", "CountAsync", "Normal", "None"}[queryType] +} + +type SearchQueryInfo struct { + Typ SearchQueryType + // to be used as replacement for FieldName + RequestedFields []string + // deprecated + FieldName string + Interval string + I1 int + I2 int + Size int // how many hits to return +} + +func NewSearchQueryInfoNone() SearchQueryInfo { + return SearchQueryInfo{Typ: None} +} + +func (h *Highlighter) ShouldHighlight(columnName string) bool { + _, ok := h.Fields[columnName] + return ok +} + +func (h *Highlighter) HighlightValue(value string) []string { + + //https://www.elastic.co/guide/en/elasticsearch/reference/current/highlighting.html + // https://medium.com/@andre.luiz1987/using-highlighting-elasticsearch-9ccd698f08 + + // paranoia check for empty tags + if len(h.PreTags) < 1 && len(h.PostTags) < 1 { + return []string{} + } + + type match struct { + start int + end int + } + + var matches []match + + lowerValue := strings.ToLower(value) + length := len(lowerValue) + + // find all matches + for _, token := range h.Tokens { + + if token == "" { + continue + } + + pos := 0 + for pos < length { + // token are lower cased already + idx := strings.Index(lowerValue[pos:], token) + if idx == -1 { + break + } + + start := pos + idx + end := start + len(token) + + matches = append(matches, match{start, end}) + pos = end + } + } + + if len(matches) == 0 { + return []string{} + } + + // sort matches by start position + sort.Slice(matches, func(i, j int) bool { + return matches[i].start < matches[j].start + }) + + var mergedMatches []match + + // merge overlapping matches + for i := 0; i < len(matches); i++ { + lastMerged := len(mergedMatches) - 1 + + if len(mergedMatches) > 0 && matches[i].start <= mergedMatches[len(mergedMatches)-1].end { + mergedMatches[lastMerged].end = max(matches[i].end, mergedMatches[lastMerged].end) + } else { + mergedMatches = append(mergedMatches, matches[i]) + } + } + + // populate highlights + var highlights []string + for _, m := range mergedMatches { + highlights = append(highlights, h.PreTags[0]+value[m.start:m.end]+h.PostTags[0]) + } + + return highlights +} + +func (h *Highlighter) SetTokens(tokens []string) { + + uniqueTokens := make(map[string]bool) + for _, token := range tokens { + uniqueTokens[strings.ToLower(token)] = true + } + + h.Tokens = make([]string, 0, len(uniqueTokens)) + for token := range uniqueTokens { + h.Tokens = append(h.Tokens, token) + } + + // longer tokens firsts + sort.Slice(h.Tokens, func(i, j int) bool { + return len(h.Tokens[i]) > len(h.Tokens[j]) + }) + +} +*/ diff --git a/quesma/model/bucket_aggregations/terms.go b/quesma/model/bucket_aggregations/terms.go index 44446f8f0..4602e811f 100644 --- a/quesma/model/bucket_aggregations/terms.go +++ b/quesma/model/bucket_aggregations/terms.go @@ -2,17 +2,21 @@ package bucket_aggregations import ( "context" + "fmt" "mitmproxy/quesma/logger" "mitmproxy/quesma/model" ) +const DefaultSize = 10 + type Terms struct { ctx context.Context + size int significant bool // true <=> significant_terms, false <=> terms } -func NewTerms(ctx context.Context, significant bool) Terms { - return Terms{ctx: ctx, significant: significant} +func NewTerms(ctx context.Context, size int, significant bool) Terms { + return Terms{ctx: ctx, size: size, significant: significant} } func (query Terms) IsBucketAggregation() bool { @@ -41,10 +45,11 @@ func (query Terms) TranslateSqlResponseToJson(rows []model.QueryResultRow, level } func (query Terms) String() string { - if !query.significant { - return "terms" + var namePrefix string + if query.significant { + namePrefix = "significant_" } - return "significant_terms" + return fmt.Sprintf("%sterms(size=%d)", namePrefix, query.size) } func (query Terms) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow { diff --git a/quesma/model/query.go b/quesma/model/query.go index 61bab1d3e..5dae75a76 100644 --- a/quesma/model/query.go +++ b/quesma/model/query.go @@ -41,6 +41,7 @@ type Query struct { Aggregators []Aggregator // keeps names of aggregators, e.g. "0", "1", "2", "suggestions". Needed for JSON response. Type QueryType SortFields []SortField // fields to sort by + SubSelect string // dictionary to add as 'meta' field in the response. // WARNING: it's probably not passed everywhere where it's needed, just in one place. // But it works for the test + our dashboards, so let's fix it later if necessary. diff --git a/quesma/queryparser/aggregation_parser.go b/quesma/queryparser/aggregation_parser.go index c47e12a5f..5cf8ebc1e 100644 --- a/quesma/queryparser/aggregation_parser.go +++ b/quesma/queryparser/aggregation_parser.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "github.com/barkimedes/go-deepcopy" + "github.com/k0kubun/pp" "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/logger" "mitmproxy/quesma/model" @@ -48,6 +49,16 @@ type metricsAggregation struct { const metricsAggregationDefaultFieldType = clickhouse.Invalid +func (b *aggrQueryBuilder) applyTermsSubSelect(terms bucket_aggregations.Terms) { + termsField := b.Query.GroupByFields[len(b.Query.GroupByFields)-1] + pp.Println(b, terms, termsField, b.Query.String()) + whereLimitStmt := fmt.Sprintf("%s IN (%s)", termsField, b.String()) + fmt.Println("WHERE LIMIT STMT:", whereLimitStmt) + fmt.Println("where before:", b.whereBuilder.Sql.Stmt) + b.whereBuilder = combineWheres(b.whereBuilder, newSimpleQuery(NewSimpleStatement(whereLimitStmt), true)) + fmt.Println("where after:", b.whereBuilder.Sql.Stmt) +} + func (b *aggrQueryBuilder) buildAggregationCommon(metadata model.JsonMap) model.Query { query := b.Query query.WhereClause = b.whereBuilder.Sql.Stmt @@ -348,7 +359,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil if filter, ok := filterRaw.(QueryMap); ok { filterOnThisLevel = true currentAggr.Type = metrics_aggregations.NewCount(cw.Ctx) - currentAggr.whereBuilder = cw.combineWheres( + currentAggr.whereBuilder = combineWheres( currentAggr.whereBuilder, cw.parseQueryMap(filter), ) @@ -394,7 +405,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil } for _, filter := range filters { currentAggr.Type = bucket_aggregations.NewFilters(cw.Ctx) - currentAggr.whereBuilder = cw.combineWheres(currentAggr.whereBuilder, filter.sql) + currentAggr.whereBuilder = combineWheres(currentAggr.whereBuilder, filter.sql) currentAggr.Aggregators = append(currentAggr.Aggregators, model.NewAggregatorEmpty(filter.name)) *resultAccumulator = append(*resultAccumulator, currentAggr.buildBucketAggregation(metadata)) if aggs, ok := queryMap["aggs"].(QueryMap); ok { @@ -416,10 +427,21 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil aggsHandledSeparately := isRange || isFilters if aggs, ok := queryMap["aggs"]; ok && !aggsHandledSeparately { + pp.Println(currentAggr.Type) + var terms bucket_aggregations.Terms + var isTerms bool + var oldWhere = currentAggr.whereBuilder + if terms, isTerms = currentAggr.Type.(bucket_aggregations.Terms); isTerms { + fmt.Println("jestem") + currentAggr.applyTermsSubSelect(terms) + } err = cw.parseAggregationNames(currentAggr, aggs.(QueryMap), resultAccumulator) if err != nil { return err } + if isTerms { + currentAggr.whereBuilder = oldWhere + } } delete(queryMap, "aggs") // no-op if no "aggs" @@ -632,25 +654,20 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery } for _, termsType := range []string{"terms", "significant_terms"} { if terms, ok := queryMap[termsType]; ok { - currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms") + var size int + if sizeRaw, exists := terms.(QueryMap)["size"]; exists { + size = (int)(sizeRaw.(float64)) + } else { + size = bucket_aggregations.DefaultSize + } + currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, size, termsType == "significant_terms") + fieldName := strconv.Quote(cw.parseFieldField(terms, termsType)) - isEmptyGroupBy := len(currentAggr.GroupByFields) == 0 currentAggr.GroupByFields = append(currentAggr.GroupByFields, fieldName) currentAggr.NonSchemaFields = append(currentAggr.NonSchemaFields, fieldName) - size := 10 - if _, ok := queryMap["aggs"]; isEmptyGroupBy && !ok { // we can do limit only it terms are not nested - if jsonMap, ok := terms.(QueryMap); ok { - if sizeRaw, ok := jsonMap["size"]; ok { - if sizeParsed, ok := sizeRaw.(float64); ok { - size = int(sizeParsed) - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("size is not an float64, but %T, value: %v. Using default", sizeRaw, sizeRaw) - } - } - } - currentAggr.SuffixClauses = append(currentAggr.SuffixClauses, "ORDER BY count() DESC") - currentAggr.SuffixClauses = append(currentAggr.SuffixClauses, fmt.Sprintf("LIMIT %d", size)) - } + currentAggr.SuffixClauses = append(currentAggr.SuffixClauses, fmt.Sprintf("LIMIT %d", size)) + currentAggr.SubSelect = currentAggr.Query.String() + fmt.Println("SUB:", currentAggr.SubSelect) delete(queryMap, termsType) return success, 1, 1, nil } @@ -706,7 +723,7 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery } if boolRaw, ok := queryMap["bool"]; ok { if Bool, ok := boolRaw.(QueryMap); ok { - currentAggr.whereBuilder = cw.combineWheres(currentAggr.whereBuilder, cw.parseBool(Bool)) + currentAggr.whereBuilder = combineWheres(currentAggr.whereBuilder, cw.parseBool(Bool)) } else { logger.WarnWithCtx(cw.Ctx).Msgf("bool is not a map, but %T, value: %v. Skipping", boolRaw, boolRaw) } @@ -817,13 +834,13 @@ func (cw *ClickhouseQueryTranslator) parseFilters(filtersMap QueryMap) []filter return filters } -func (cw *ClickhouseQueryTranslator) combineWheres(where1, where2 SimpleQuery) SimpleQuery { +func combineWheres(where1, where2 SimpleQuery) SimpleQuery { combined := SimpleQuery{ Sql: and([]Statement{where1.Sql, where2.Sql}), CanParse: where1.CanParse && where2.CanParse, } if len(where1.FieldName) > 0 && len(where2.FieldName) > 0 && where1.FieldName != where2.FieldName { - logger.WarnWithCtx(cw.Ctx).Msgf("combining 2 where clauses with different field names: %s, %s, where queries: %v %v", where1.FieldName, where2.FieldName, where1, where2) + logger.Warn().Msgf("combining 2 where clauses with different field names: %s, %s, where queries: %v %v", where1.FieldName, where2.FieldName, where1, where2) } if len(where1.FieldName) > 0 { combined.FieldName = where1.FieldName diff --git a/quesma/queryparser/range_aggregation.go b/quesma/queryparser/range_aggregation.go index b0c68b847..274ab5185 100644 --- a/quesma/queryparser/range_aggregation.go +++ b/quesma/queryparser/range_aggregation.go @@ -85,7 +85,7 @@ func (cw *ClickhouseQueryTranslator) processRangeAggregation(currentAggr *aggrQu // Range aggregation with subaggregations should be a quite rare case, so I'm leaving that for later. whereBeforeNesting := currentAggr.whereBuilder for _, interval := range Range.Intervals { - currentAggr.whereBuilder = cw.combineWheres( + currentAggr.whereBuilder = combineWheres( currentAggr.whereBuilder, newSimpleQuery(NewSimpleStatement(interval.ToWhereClause(Range.QuotedFieldName)), true), )