Skip to content

Commit

Permalink
Jacek fixes (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
jakozaur authored and trzysiek committed May 5, 2024
1 parent d4c15d9 commit ae2d25c
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 26 deletions.
13 changes: 13 additions & 0 deletions docker/cloudflared.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: "3.7"
services:
cloudflared:
image: cloudflare/cloudflared:latest
env_file:
- .env
restart: unless-stopped
command:
- "tunnel"
- "--no-autoupdate"
- "run"
- "--token"
- ${CLOUDFLARED_TOKEN}
13 changes: 10 additions & 3 deletions docker/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ services:
- QUESMA_logging_path=/var/quesma/logs
- QUESMA_CONFIG_FILE=/config/local-dev.yaml
depends_on:
clickhouse:
condition: service_healthy
clean-clickhouse:
condition: service_completed_successfully
opensearch:
condition: service_healthy
ports:
Expand Down Expand Up @@ -127,4 +127,11 @@ services:
interval: 1s
start_period: 1m
timeout: 1s
# TODO: add clean, skip for now
clean-clickhouse:
build: clean-clickhouse
depends_on:
clickhouse:
condition: service_healthy
restart: "no"
volumes:
- ./mitmproxy:/var/mitmproxy
8 changes: 7 additions & 1 deletion docker/quesma/config/local-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,10 @@ indexes:
timestamp:
source: "@timestamp"
target: "created_at"
fullTextFields: [ "body", "title" ]
fullTextFields: [ "body", "title" ]
opensearch_dashboards_sample_data_ecommerce:
enabled: true
opensearch_dashboards_sample_data_flights:
enabled: true
opensearch_dashboards_sample_data_logs:
enabled: true
15 changes: 10 additions & 5 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,21 @@ package bucket_aggregations

import (
"context"
"fmt"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
)

const DefaultSize = 10

type Terms struct {
ctx context.Context
size int
significant bool // true <=> significant_terms, false <=> terms
}

func NewTerms(ctx context.Context, significant bool) Terms {
return Terms{ctx: ctx, significant: significant}
func NewTerms(ctx context.Context, size int, significant bool) Terms {
return Terms{ctx: ctx, size: size, significant: significant}
}

func (query Terms) IsBucketAggregation() bool {
Expand Down Expand Up @@ -41,8 +45,9 @@ func (query Terms) TranslateSqlResponseToJson(rows []model.QueryResultRow, level
}

func (query Terms) String() string {
if !query.significant {
return "terms"
var namePrefix string
if query.significant {
namePrefix = "significant_"
}
return "significant_terms"
return fmt.Sprintf("%sterms(size=%d)", namePrefix, query.size)
}
2 changes: 1 addition & 1 deletion quesma/model/metrics_aggregations/top_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (query TopMetrics) TranslateSqlResponseToJson(rows []model.QueryResultRow,
sortVal := row.Cols[lastIndex].Value
for _, col := range valuesForMetrics[level:] {
colName, _ := strings.CutPrefix(col.ColName, "windowed_")
metrics[colName] = col.ExtractValue(context.TODO()) // CHANGE IT AFTER PART 2 MERGE!! ENTER REAL CONTEXT FROM THE query
metrics[colName] = col.ExtractValue(query.ctx) // CHANGE IT AFTER PART 2 MERGE!! ENTER REAL CONTEXT FROM THE query
}
elem := model.JsonMap{
"sort": []interface{}{sortVal},
Expand Down
6 changes: 5 additions & 1 deletion quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package model

import (
"context"
"fmt"
"mitmproxy/quesma/logger"
"strconv"
"strings"
Expand All @@ -20,6 +21,7 @@ type Query struct {
SuffixClauses []string // ORDER BY, etc.
FromClause string // usually just "tableName", or databaseName."tableName". Sometimes a subquery e.g. (SELECT ...)
CanParse bool // true <=> query is valid
SubSelect string
}

var NoMetadataField JsonMap = nil
Expand Down Expand Up @@ -63,7 +65,7 @@ func (q *Query) String() string {
if len(q.WhereClause) == 0 {
where = ""
}
sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " " + strings.Join(q.SuffixClauses, " "))
sb.WriteString(" FROM " + q.FromClause + where + q.WhereClause + " ") // + strings.Join(q.SuffixClauses, " "))
if len(q.GroupByFields) > 0 {
sb.WriteString(" GROUP BY (")
for i, field := range q.GroupByFields {
Expand All @@ -83,6 +85,8 @@ func (q *Query) String() string {
}
sb.WriteString(")")
}
sb.WriteString(" " + strings.Join(q.SuffixClauses, " "))
fmt.Println("QUERY SUBSELECT:", q.SubSelect)
return sb.String()
}

Expand Down
5 changes: 2 additions & 3 deletions quesma/model/query_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type QueryResultCol struct {
}

type QueryResultRow struct {
ctx context.Context
Index string
Cols []QueryResultCol
}
Expand Down Expand Up @@ -110,13 +109,13 @@ func (c QueryResultCol) ExtractValue(ctx context.Context) any {
return c.Value
}

func (r QueryResultRow) String() string {
func (r QueryResultRow) String(ctx context.Context) string {
str := strings.Builder{}
str.WriteString(util.Indent(1) + "{\n")
numCols := len(r.Cols)
i := 0
for _, col := range r.Cols {
str.WriteString(util.Indent(2) + col.String(r.ctx))
str.WriteString(util.Indent(2) + col.String(ctx))
if i < numCols-1 {
str.WriteString(",")
}
Expand Down
44 changes: 38 additions & 6 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"github.com/barkimedes/go-deepcopy"
"github.com/k0kubun/pp"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/model/bucket_aggregations"
Expand Down Expand Up @@ -40,6 +41,16 @@ type metricsAggregation struct {
Order string // Only for top_metrics
}

func (b *aggrQueryBuilder) applyTermsSubSelect(terms bucket_aggregations.Terms) {
termsField := b.Query.GroupByFields[len(b.Query.GroupByFields)-1]
pp.Println(b, terms, termsField, b.Query.String())
whereLimitStmt := fmt.Sprintf("%s IN (%s)", termsField, b.String())
fmt.Println("WHERE LIMIT STMT:", whereLimitStmt)
fmt.Println("where before:", b.whereBuilder.Sql.Stmt)
b.whereBuilder = combineWheres(b.whereBuilder, newSimpleQuery(NewSimpleStatement(whereLimitStmt), true))
fmt.Println("where after:", b.whereBuilder.Sql.Stmt)
}

func (b *aggrQueryBuilder) buildAggregationCommon(metadata model.JsonMap) model.QueryWithAggregation {
query := b.QueryWithAggregation
query.WhereClause = b.whereBuilder.Sql.Stmt
Expand Down Expand Up @@ -317,7 +328,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil
if filter, ok := filterRaw.(QueryMap); ok {
filterOnThisLevel = true
currentAggr.Type = metrics_aggregations.NewCount(cw.Ctx)
currentAggr.whereBuilder = cw.combineWheres(
currentAggr.whereBuilder = combineWheres(
currentAggr.whereBuilder,
cw.parseQueryMap(filter),
)
Expand Down Expand Up @@ -360,7 +371,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil
}
for _, filter := range filters {
currentAggr.Type = bucket_aggregations.NewFilters(cw.Ctx)
currentAggr.whereBuilder = cw.combineWheres(currentAggr.whereBuilder, filter.sql)
currentAggr.whereBuilder = combineWheres(currentAggr.whereBuilder, filter.sql)
currentAggr.Aggregators = append(currentAggr.Aggregators, model.NewAggregatorEmpty(filter.name))
*resultAccumulator = append(*resultAccumulator, currentAggr.buildBucketAggregation(metadata))
if aggs, ok := queryMap["aggs"].(QueryMap); ok {
Expand All @@ -379,7 +390,18 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil

aggsHandledSeparately := isRange || isFilters
if aggs, ok := queryMap["aggs"]; ok && !aggsHandledSeparately {
pp.Println(currentAggr.Type)
var terms bucket_aggregations.Terms
var isTerms bool
var oldWhere = currentAggr.whereBuilder
if terms, isTerms = currentAggr.Type.(bucket_aggregations.Terms); isTerms {
fmt.Println("jestem")
currentAggr.applyTermsSubSelect(terms)
}
cw.parseAggregationNames(currentAggr, aggs.(QueryMap), resultAccumulator)
if isTerms {
currentAggr.whereBuilder = oldWhere
}
}
delete(queryMap, "aggs") // no-op if no "aggs"

Expand Down Expand Up @@ -561,10 +583,20 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
}
for _, termsType := range []string{"terms", "significant_terms"} {
if terms, ok := queryMap[termsType]; ok {
currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms")
var size int
if sizeRaw, exists := terms.(QueryMap)["size"]; exists {
size = (int)(sizeRaw.(float64))
} else {
size = bucket_aggregations.DefaultSize
}
currentAggr.Type = bucket_aggregations.NewTerms(cw.Ctx, size, termsType == "significant_terms")

fieldName := strconv.Quote(cw.parseFieldField(terms, termsType))
currentAggr.GroupByFields = append(currentAggr.GroupByFields, fieldName)
currentAggr.NonSchemaFields = append(currentAggr.NonSchemaFields, fieldName)
currentAggr.SuffixClauses = append(currentAggr.SuffixClauses, fmt.Sprintf("LIMIT %d", size))
currentAggr.SubSelect = currentAggr.QueryWithAggregation.String()
fmt.Println("SUB:", currentAggr.SubSelect)
delete(queryMap, termsType)
return success, 1, 1
}
Expand Down Expand Up @@ -616,7 +648,7 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
}
if boolRaw, ok := queryMap["bool"]; ok {
if Bool, ok := boolRaw.(QueryMap); ok {
currentAggr.whereBuilder = cw.combineWheres(currentAggr.whereBuilder, cw.parseBool(Bool))
currentAggr.whereBuilder = combineWheres(currentAggr.whereBuilder, cw.parseBool(Bool))
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("bool is not a map, but %T, value: %v. Skipping", boolRaw, boolRaw)
}
Expand Down Expand Up @@ -715,13 +747,13 @@ func (cw *ClickhouseQueryTranslator) parseFilters(filtersMap QueryMap) []filter
return filters
}

func (cw *ClickhouseQueryTranslator) combineWheres(where1, where2 SimpleQuery) SimpleQuery {
func combineWheres(where1, where2 SimpleQuery) SimpleQuery {
combined := SimpleQuery{
Sql: and([]Statement{where1.Sql, where2.Sql}),
CanParse: where1.CanParse && where2.CanParse,
}
if len(where1.FieldName) > 0 && len(where2.FieldName) > 0 && where1.FieldName != where2.FieldName {
logger.WarnWithCtx(cw.Ctx).Msgf("combining 2 where clauses with different field names: %s, %s, where queries: %v %v", where1.FieldName, where2.FieldName, where1, where2)
logger.Warn().Msgf("combining 2 where clauses with different field names: %s, %s, where queries: %v %v", where1.FieldName, where2.FieldName, where1, where2)
}
if len(where1.FieldName) > 0 {
combined.FieldName = where1.FieldName
Expand Down
12 changes: 8 additions & 4 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package queryparser
import (
"cmp"
"context"
"fmt"
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/concurrent"
Expand Down Expand Up @@ -560,6 +561,9 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {
allTests = append(allTests, testdata.PipelineAggregationTests...)
for i, test := range allTests {
t.Run(test.TestName+"("+strconv.Itoa(i)+")", func(t *testing.T) {
if i != 2 {
t.Skip()
}
if i == 26 {
t.Skip("Need a (most likely) small fix to top_hits.")
}
Expand All @@ -579,10 +583,10 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {

// Let's leave those commented debugs for now, they'll be useful in next PRs
for j, aggregation := range aggregations {
// fmt.Println("--- Aggregation "+strconv.Itoa(j)+":", aggregation)
// fmt.Println()
// fmt.Println("--- SQL string ", aggregation.String())
// fmt.Println()
fmt.Println("--- Aggregation "+strconv.Itoa(j)+":", aggregation)
fmt.Println()
fmt.Println("--- SQL string ", aggregation.String())
fmt.Println()
// fmt.Println("--- Group by: ", aggregation.GroupByFields)
if test.ExpectedSQLs[j] != "TODO" {
util.AssertSqlEqual(t, test.ExpectedSQLs[j], aggregation.String())
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (cw *ClickhouseQueryTranslator) makeSearchResponseNormal(ResultSet []model.
for i, row := range ResultSet {
hits[i] = model.SearchHit{
Index: row.Index,
Source: []byte(row.String()),
Source: []byte(row.String(cw.Ctx)),
Fields: make(map[string][]interface{}),
Highlight: make(map[string][]string),
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/range_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (cw *ClickhouseQueryTranslator) processRangeAggregation(currentAggr *aggrQu
// Range aggregation with subaggregations should be a quite rare case, so I'm leaving that for later.
whereBeforeNesting := currentAggr.whereBuilder
for _, interval := range Range.Intervals {
currentAggr.whereBuilder = cw.combineWheres(
currentAggr.whereBuilder = combineWheres(
currentAggr.whereBuilder,
newSimpleQuery(NewSimpleStatement(interval.ToWhereClause(Range.QuotedFieldName)), true),
)
Expand Down

0 comments on commit ae2d25c

Please sign in to comment.