Skip to content

Commit

Permalink
Json rendering refactor (#522)
Browse files Browse the repository at this point in the history
Changes to render logic:
1. Unify the whole JSON logic with render inside
`TranslateSqlResponseToJson`:
- this allows to generate buckets as well as top-level values
- we always return `model.JsonMap` and no longer assume one element
array
2. As a proof of concept, I implement `doc_count_error_upper_bound` for
`terms` in new architecture.
3. Query translation logic got much simpler.
4. Need to do a lot of tiny fixes.

---------

Signed-off-by: Jacek Migdal <[email protected]>
Co-authored-by: Grzegorz Piwowarek <[email protected]>
  • Loading branch information
jakozaur and pivovarit authored Jul 13, 2024
1 parent 9c185fa commit c84fd2a
Show file tree
Hide file tree
Showing 46 changed files with 172 additions and 149 deletions.
6 changes: 4 additions & 2 deletions quesma/model/bucket_aggregations/dateRange.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (query DateRange) IsBucketAggregation() bool {
return true
}

func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) != 1 {
logger.ErrorWithCtx(query.ctx).Msgf("unexpected number of rows in date_range aggregation response, len: %d", len(rows))
return nil
Expand All @@ -104,7 +104,9 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow, l
response = append(response, responseForInterval)
columnIdx = nextColumnIdx
}
return response
return model.JsonMap{
"buckets": response,
}
}

func (query DateRange) String() string {
Expand Down
6 changes: 4 additions & 2 deletions quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (query *DateHistogram) IsBucketAggregation() bool {
return true
}

func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) > 0 && len(rows[0].Cols) < 2 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in date_histogram aggregation response, len(rows[0].Cols): "+
Expand All @@ -77,7 +77,9 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
"key_as_string": intervalStart,
})
}
return response
return model.JsonMap{
"buckets": response,
}
}

func (query *DateHistogram) String() string {
Expand Down
8 changes: 5 additions & 3 deletions quesma/model/bucket_aggregations/date_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ func TestTranslateSqlResponseToJson(t *testing.T) {
{Cols: []model.QueryResultCol{model.NewQueryResultCol("key", int64(56962370)), model.NewQueryResultCol("doc_count", 14)}},
}
interval := "30s"
expectedResponse := []model.JsonMap{
{"key": int64(56962398) * 30_000, "doc_count": 8, "key_as_string": "2024-02-25T14:39:00.000"},
{"key": int64(56962370) * 30_000, "doc_count": 14, "key_as_string": "2024-02-25T14:25:00.000"},
expectedResponse := model.JsonMap{
"buckets": []model.JsonMap{
{"key": int64(56962398) * 30_000, "doc_count": 8, "key_as_string": "2024-02-25T14:39:00.000"},
{"key": int64(56962370) * 30_000, "doc_count": 14, "key_as_string": "2024-02-25T14:25:00.000"},
},
}
response := (&DateHistogram{interval: interval, intervalType: DateHistogramFixedInterval}).TranslateSqlResponseToJson(resultRows, 1)
assert.Equal(t, expectedResponse, response)
Expand Down
6 changes: 3 additions & 3 deletions quesma/model/bucket_aggregations/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (query Filters) IsBucketAggregation() bool {
return true
}

func (query Filters) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Filters) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
var value any = nil
if len(rows) > 0 {
if len(rows[0].Cols) > 0 {
Expand All @@ -43,9 +43,9 @@ func (query Filters) TranslateSqlResponseToJson(rows []model.QueryResultRow, lev
logger.ErrorWithCtx(query.ctx).Msgf("unexpected number of columns in filters aggregation response, len(rows[0].Cols): %d, level: %d", len(rows[0].Cols), level)
}
}
return []model.JsonMap{{
return model.JsonMap{
"doc_count": value,
}}
}
}

func (query Filters) String() string {
Expand Down
6 changes: 4 additions & 2 deletions quesma/model/bucket_aggregations/geotile_grid.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (query GeoTileGrid) IsBucketAggregation() bool {
return true
}

func (query GeoTileGrid) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query GeoTileGrid) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) > 0 && len(rows[0].Cols) < 3 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in geotile_grid aggregation response, len(rows[0].Cols): "+
Expand All @@ -39,7 +39,9 @@ func (query GeoTileGrid) TranslateSqlResponseToJson(rows []model.QueryResultRow,
"doc_count": row.LastColValue(),
})
}
return response
return model.JsonMap{
"buckets": response,
}
}

func (query GeoTileGrid) String() string {
Expand Down
6 changes: 4 additions & 2 deletions quesma/model/bucket_aggregations/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (query Histogram) IsBucketAggregation() bool {
return true
}

func (query Histogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Histogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) > 0 && len(rows[0].Cols) < 2 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in histogram aggregation response, len(rows[0].Cols): "+
Expand All @@ -37,7 +37,9 @@ func (query Histogram) TranslateSqlResponseToJson(rows []model.QueryResultRow, l
"doc_count": row.Cols[level].Value,
})
}
return response
return model.JsonMap{
"buckets": response,
}
}

func (query Histogram) String() string {
Expand Down
8 changes: 6 additions & 2 deletions quesma/model/bucket_aggregations/multi_terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ func (query MultiTerms) IsBucketAggregation() bool {
return true
}

func (query MultiTerms) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) (response []model.JsonMap) {
func (query MultiTerms) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
minimumExpectedColNr := query.fieldsNr + 1 // +1 for doc_count. Can be more, if this MultiTerms has parent aggregations, but never fewer.
if len(rows) > 0 && len(rows[0].Cols) < minimumExpectedColNr {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in terms aggregation response, len: %d, expected (at least): %d, rows[0]: %v", len(rows[0].Cols), minimumExpectedColNr, rows[0])
}
var response []model.JsonMap
const delimiter = '|' // between keys in key_as_string
for _, row := range rows {
startIndex := len(row.Cols) - query.fieldsNr - 1
Expand All @@ -55,7 +56,10 @@ func (query MultiTerms) TranslateSqlResponseToJson(rows []model.QueryResultRow,
}
response = append(response, bucket)
}
return
return model.JsonMap{
"doc_count_error_upper_bound": 0,
"buckets": response,
}
}

func (query MultiTerms) String() string {
Expand Down
12 changes: 7 additions & 5 deletions quesma/model/bucket_aggregations/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,33 +122,35 @@ func (query Range) IsBucketAggregation() bool {
return true
}

func (query Range) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Range) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) != 1 {
logger.ErrorWithCtx(query.ctx).Msgf("unexpected %d of rows in range aggregation response. Expected 1.", len(rows))
return nil
return model.JsonMap{}
}
startIteration := len(rows[0].Cols) - 1 - len(query.Intervals)
endIteration := len(rows[0].Cols) - 1
if startIteration >= endIteration || startIteration < 0 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected column nr in aggregation response, startIteration: %d, endIteration: %d", startIteration, endIteration)
return nil
return model.JsonMap{}
}
if query.Keyed {
var response = make(model.JsonMap)
for i, col := range rows[0].Cols[startIteration:endIteration] {
responseForInterval := query.responseForInterval(query.Intervals[i], col.Value)
response[query.Intervals[i].String()] = responseForInterval
}
return []model.JsonMap{response}
return response
} else {
var response []model.JsonMap
for i, col := range rows[0].Cols[startIteration:endIteration] {
responseForInterval := query.responseForInterval(query.Intervals[i], col.Value)
responseForInterval["key"] = query.Intervals[i].String()
response = append(response, responseForInterval)
}
return response
return model.JsonMap{
"buckets": response,
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (query Terms) IsBucketAggregation() bool {
return true
}

func (query Terms) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Terms) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
var response []model.JsonMap
if len(rows) > 0 && len(rows[0].Cols) < 2 {
logger.ErrorWithCtx(query.ctx).Msgf(
Expand All @@ -39,7 +39,10 @@ func (query Terms) TranslateSqlResponseToJson(rows []model.QueryResultRow, level
}
response = append(response, bucket)
}
return response
return model.JsonMap{
"doc_count_error_upper_bound": 0,
"buckets": response,
}
}

func (query Terms) String() string {
Expand Down
2 changes: 1 addition & 1 deletion quesma/model/metrics_aggregations/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (query Avg) IsBucketAggregation() bool {
return false
}

func (query Avg) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Avg) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

Expand Down
2 changes: 1 addition & 1 deletion quesma/model/metrics_aggregations/cardinality.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (query Cardinality) IsBucketAggregation() bool {
return false
}

func (query Cardinality) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Cardinality) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
return metricsTranslateSqlResponseToJson(query.ctx, rows, level)
}

Expand Down
10 changes: 5 additions & 5 deletions quesma/model/metrics_aggregations/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,20 @@ import (
"time"
)

func metricsTranslateSqlResponseToJson(ctx context.Context, rows []model.QueryResultRow, level int) []model.JsonMap {
func metricsTranslateSqlResponseToJson(ctx context.Context, rows []model.QueryResultRow, level int) model.JsonMap {
var value any = nil
if resultRowsAreFine(ctx, rows) {
value = rows[0].Cols[len(rows[0].Cols)-1].Value
}
return []model.JsonMap{{
return model.JsonMap{
"value": value,
}}
}
}

// metricsTranslateSqlResponseToJsonWithFieldTypeCheck is the same as metricsTranslateSqlResponseToJson for all types except DateTimes.
// With DateTimes, we need to return 2 values, instead of 1, that's the difference.
func metricsTranslateSqlResponseToJsonWithFieldTypeCheck(
ctx context.Context, rows []model.QueryResultRow, level int, fieldType clickhouse.DateTimeType) []model.JsonMap {
ctx context.Context, rows []model.QueryResultRow, level int, fieldType clickhouse.DateTimeType) model.JsonMap {
if fieldType == clickhouse.Invalid {
// if it's not a date, we do just a normal response
return metricsTranslateSqlResponseToJson(ctx, rows, level)
Expand All @@ -45,7 +45,7 @@ func metricsTranslateSqlResponseToJsonWithFieldTypeCheck(
if value != nil {
response["value_as_string"] = valueAsString
}
return []model.JsonMap{response}
return response
}

func resultRowsAreFine(ctx context.Context, rows []model.QueryResultRow) bool {
Expand Down
10 changes: 5 additions & 5 deletions quesma/model/metrics_aggregations/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ func (query Count) IsBucketAggregation() bool {
return false
}

func (query Count) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
var response []model.JsonMap
func (query Count) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for count aggregation")
return make(model.JsonMap, 0)
}
for _, row := range rows {
response = append(response, model.JsonMap{"doc_count": row.Cols[level].Value})
if len(rows) > 1 {
logger.WarnWithCtx(query.ctx).Msg("More than one row returned for count aggregation")
}
return response
return model.JsonMap{"doc_count": rows[0].Cols[level].Value}
}

func (query Count) String() string {
Expand Down
14 changes: 7 additions & 7 deletions quesma/model/metrics_aggregations/extended_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,21 @@ func (query ExtendedStats) IsBucketAggregation() bool {
return false
}

func (query ExtendedStats) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query ExtendedStats) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for stats aggregation")
return []model.JsonMap{{
return model.JsonMap{
"value": nil, // not completely sure if it's a good return value, but it looks fine to me. We should always get 1 row, not 0 anyway.
}}
}
}
if len(rows) > 1 {
logger.WarnWithCtx(query.ctx).Msgf("more than one row returned for stats aggregation, using only first. rows[0]: %+v, rows[1]: %+v", rows[0], rows[1])
}
if len(rows[0].Cols) < selectFieldsNr {
logger.WarnWithCtx(query.ctx).Msgf("not enough fields in the response for extended_stats aggregation. Expected at least %d, got %d. Got: %+v. Returning empty result.", selectFieldsNr, len(rows[0].Cols), rows[0])
return []model.JsonMap{{
return model.JsonMap{
"value": nil, // not completely sure if it's a good return value, but it looks fine to me. We should always get >= selectFieldsNr columns anyway.
}}
}
}

row := rows[0]
Expand All @@ -56,7 +56,7 @@ func (query ExtendedStats) TranslateSqlResponseToJson(rows []model.QueryResultRo
lowerSampling = avg - query.sigma*stdDevSampling
}

return []model.JsonMap{{
return model.JsonMap{
"count": query.getValue(row, "count"),
"min": query.getValue(row, "min"),
"max": query.getValue(row, "max"),
Expand All @@ -77,7 +77,7 @@ func (query ExtendedStats) TranslateSqlResponseToJson(rows []model.QueryResultRo
"upper_sampling": upperSampling,
"lower_sampling": lowerSampling,
},
}}
}
}

func (query ExtendedStats) String() string {
Expand Down
10 changes: 4 additions & 6 deletions quesma/model/metrics_aggregations/geo_cetroid.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ func (query GeoCentroid) IsBucketAggregation() bool {
return false
}

func (query GeoCentroid) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query GeoCentroid) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
location := model.JsonMap{
"lat": rows[0].Cols[3].Value,
"lon": rows[0].Cols[4].Value,
}
return []model.JsonMap{
{
"count": rows[0].Cols[5].Value,
"location": location,
},
return model.JsonMap{
"count": rows[0].Cols[5].Value,
"location": location,
}
}

Expand Down
2 changes: 1 addition & 1 deletion quesma/model/metrics_aggregations/max.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (query Max) IsBucketAggregation() bool {
return false
}

func (query Max) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Max) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

Expand Down
2 changes: 1 addition & 1 deletion quesma/model/metrics_aggregations/min.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (query Min) IsBucketAggregation() bool {
return false
}

func (query Min) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query Min) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
return metricsTranslateSqlResponseToJsonWithFieldTypeCheck(query.ctx, rows, level, query.fieldType)
}

Expand Down
12 changes: 6 additions & 6 deletions quesma/model/metrics_aggregations/percentile_ranks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ func (query PercentileRanks) IsBucketAggregation() bool {
return false
}

func (query PercentileRanks) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
func (query PercentileRanks) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows in percentile ranks response")
return make([]model.JsonMap, 0)
return make(model.JsonMap, 0)
}
// I duplicate a lot of code in this if/else below,
// but I think it's worth it, as this function might get called a lot of times for a single query.
Expand Down Expand Up @@ -58,9 +58,9 @@ func (query PercentileRanks) TranslateSqlResponseToJson(rows []model.QueryResult
percentileRank.Value, percentileRank.Value)
}
}
return []model.JsonMap{{
return model.JsonMap{
"values": valueMap,
}}
}
} else {
buckets := make([]model.JsonMap, 0)
for _, percentileRank := range rows[0].Cols[level:] {
Expand Down Expand Up @@ -90,9 +90,9 @@ func (query PercentileRanks) TranslateSqlResponseToJson(rows []model.QueryResult
percentileRank.Value, percentileRank.Value)
}
}
return []model.JsonMap{{
return model.JsonMap{
"values": buckets,
}}
}
}
}

Expand Down
Loading

0 comments on commit c84fd2a

Please sign in to comment.