Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed May 20, 2024
1 parent 42cb08b commit c0ae484
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 58 deletions.
3 changes: 3 additions & 0 deletions quesma/model/pipeline_aggregations/average_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ func (query AverageBucket) CalculateResultWhenMissing(qwa *model.Query, parentRo
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleAvgBucket(parentRowsOneBucket))
}
Expand Down
8 changes: 5 additions & 3 deletions quesma/model/pipeline_aggregations/min_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ func (query MinBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, l
}
if returnMap, ok := rows[0].LastColValue().(model.JsonMap); ok {
return []model.JsonMap{returnMap}
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to JsonMap: %v, type: %T", rows[0].LastColValue(), rows[0].LastColValue())
return []model.JsonMap{nil}
}
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to JsonMap: %v, type: %T", rows[0].LastColValue(), rows[0].LastColValue())
return []model.JsonMap{nil}
}

func (query MinBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
Expand All @@ -48,6 +47,9 @@ func (query MinBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows [
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleMinBucket(parentRowsOneBucket))
}
Expand Down
12 changes: 5 additions & 7 deletions quesma/model/pipeline_aggregations/sum_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pipeline_aggregations
import (
"context"
"fmt"
"github.com/k0kubun/pp"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryprocessor"
Expand Down Expand Up @@ -33,15 +32,12 @@ func (query SumBucket) TranslateSqlResponseToJson(rows []model.QueryResultRow, l
}
if returnMap, ok := rows[0].LastColValue().(model.JsonMap); ok {
return []model.JsonMap{returnMap}
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to JsonMap: %v, type: %T", rows[0].LastColValue(), rows[0].LastColValue())
return []model.JsonMap{nil}
}
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to JsonMap: %v, type: %T", rows[0].LastColValue(), rows[0].LastColValue())
return []model.JsonMap{nil}
}

func (query SumBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
fmt.Println("hoho")
pp.Println("parentRows", parentRows)
resultRows := make([]model.QueryResultRow, 0)
if len(parentRows) == 0 {
return resultRows // maybe null?
Expand All @@ -50,10 +46,12 @@ func (query SumBucket) CalculateResultWhenMissing(qwa *model.Query, parentRows [
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
if parentFieldsCnt < 0 {
logger.WarnWithCtx(query.ctx).Msgf("parentFieldsCnt is less than 0: %d", parentFieldsCnt)
}
for _, parentRowsOneBucket := range qp.SplitResultSetIntoBuckets(parentRows, parentFieldsCnt) {
resultRows = append(resultRows, query.calculateSingleSumBucket(parentRowsOneBucket))
}
pp.Println("resultRows", resultRows)
return resultRows
}

Expand Down
6 changes: 2 additions & 4 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ package queryparser
import (
"cmp"
"context"
"fmt"
"github.com/barkimedes/go-deepcopy"
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/concurrent"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/testdata"
Expand Down Expand Up @@ -544,7 +542,7 @@ func sortAggregations(aggregations []model.Query) {
}

func Test2AggregationParserExternalTestcases(t *testing.T) {
logger.InitSimpleLoggerForTests()
// logger.InitSimpleLoggerForTests()
table := clickhouse.Table{
Cols: map[string]*clickhouse.Column{
"@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")},
Expand Down Expand Up @@ -593,7 +591,7 @@ func Test2AggregationParserExternalTestcases(t *testing.T) {

// Let's leave those commented debugs for now, they'll be useful in next PRs
for j, aggregation := range aggregations {
fmt.Printf("--- Aggregation %d: %+v\n\n---SQL string: %s\n\n", j, aggregation, aggregation.String())
// fmt.Printf("--- Aggregation %d: %+v\n\n---SQL string: %s\n\n", j, aggregation, aggregation.String())
test.ExpectedResults[j] = aggregation.Type.PostprocessResults(test.ExpectedResults[j])
// fmt.Println("--- Group by: ", aggregation.GroupByFields)
if test.ExpectedSQLs[j] != "NoDBQuery" {
Expand Down
16 changes: 0 additions & 16 deletions quesma/queryparser/pipeline_aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ func (cw *ClickhouseQueryTranslator) parsePipelineAggregations(queryMap QueryMap
delete(queryMap, "min_bucket")
return
}
if aggregationType, success = cw.parseMaxBucket(queryMap); success {
delete(queryMap, "max_bucket")
return
}
if aggregationType, success = cw.parseSumBucket(queryMap); success {
delete(queryMap, "sum_bucket")
return
Expand Down Expand Up @@ -88,18 +84,6 @@ func (cw *ClickhouseQueryTranslator) parseMinBucket(queryMap QueryMap) (aggregat
return pipeline_aggregations.NewMinBucket(cw.Ctx, bucketsPath), true
}

func (cw *ClickhouseQueryTranslator) parseMaxBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
maxBucketRaw, exists := queryMap["max_bucket"]
if !exists {
return
}
bucketsPath, ok := cw.parseBucketsPath(maxBucketRaw, "max_bucket")
if !ok {
return
}
return pipeline_aggregations.NewMinBucket(cw.Ctx, bucketsPath), true
}

func (cw *ClickhouseQueryTranslator) parseSumBucket(queryMap QueryMap) (aggregationType model.QueryType, success bool) {
sumBucketRaw, exists := queryMap["sum_bucket"]
if !exists {
Expand Down
28 changes: 0 additions & 28 deletions quesma/testdata/unsupported_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -1186,34 +1186,6 @@ var UnsupportedQueriesTests = []UnsupportedQueryTestCase{
}
}`,
},
{ // [56]
TestName: "pipeline aggregation: sum_bucket",
QueryType: "sum_bucket",
QueryRequestJson: `
{
"size": 0,
"aggs": {
"sales_per_month": {
"date_histogram": {
"field": "date",
"calendar_interval": "month"
},
"aggs": {
"sales": {
"sum": {
"field": "price"
}
}
}
},
"sum_monthly_sales": {
"sum_bucket": {
"buckets_path": "sales_per_month>sales"
}
}
}
}`,
},
// random non-existing aggregation:
{ // [57]
TestName: "non-existing aggregation: Augustus_Caesar",
Expand Down

0 comments on commit c0ae484

Please sign in to comment.