Skip to content

Commit

Permalink
Handling Origin Time Delayed view (#423)
Browse files Browse the repository at this point in the history
This PR fixes `Origin Time Delayed` view.
It adds 2 aggregations:
- geotile grid (bucket one)
- centroid (metric one)
- and partial implementation of `geo_bounding_box ` query (which is not
crucial for this view, it just filter outs coordinates outside bounding
box)

There are some todos (and missing tests), however as this functionality
seems to be orthogonal to other features, I would prefer to handle all
issues separately.

<img width="1317" alt="image"
src="https://github.com/QuesmaOrg/quesma/assets/102958445/0cb5aad4-bef6-40ad-ab10-2c4f8f4ac482">
  • Loading branch information
pdelewski authored Jul 9, 2024
1 parent 4c8b8fb commit bd2fa7f
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 69 deletions.
51 changes: 51 additions & 0 deletions quesma/model/bucket_aggregations/geotile_grid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"quesma/logger"
"quesma/model"
"strconv"
)

type GeoTileGrid struct {
ctx context.Context
}

func NewGeoTileGrid(ctx context.Context) GeoTileGrid {
return GeoTileGrid{ctx: ctx}
}

func (query GeoTileGrid) IsBucketAggregation() bool {
return true
}

func (query GeoTileGrid) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
if len(rows) > 0 && len(rows[0].Cols) < 3 {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected number of columns in geotile_grid aggregation response, len(rows[0].Cols): "+
"%d, level: %d", len(rows[0].Cols), level,
)
}
var response []model.JsonMap
for _, row := range rows {
zoom := int64(row.Cols[0].Value.(float64))
x := int64(row.Cols[1].Value.(float64))
y := int64(row.Cols[2].Value.(float64))
key := strconv.FormatInt(zoom, 10) + "/" + strconv.FormatInt(x, 10) + "/" + strconv.FormatInt(y, 10)
response = append(response, model.JsonMap{
"key": key,
"doc_count": row.LastColValue(),
})
}
return response
}

func (query GeoTileGrid) String() string {
return "geotile_grid"
}

func (query GeoTileGrid) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
41 changes: 41 additions & 0 deletions quesma/model/metrics_aggregations/geo_cetroid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package metrics_aggregations

import (
"context"
"quesma/model"
)

type GeoCentroid struct {
ctx context.Context
}

func NewGeoCentroid(ctx context.Context) GeoCentroid {
return GeoCentroid{ctx: ctx}
}

func (query GeoCentroid) IsBucketAggregation() bool {
return false
}

func (query GeoCentroid) TranslateSqlResponseToJson(rows []model.QueryResultRow, level int) []model.JsonMap {
location := model.JsonMap{
"lat": rows[0].Cols[3].Value,
"lon": rows[0].Cols[4].Value,
}
return []model.JsonMap{
{
"count": rows[0].Cols[5].Value,
"location": location,
},
}
}

func (query GeoCentroid) String() string {
return "geo_centroid"
}

func (query GeoCentroid) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
return rowsFromDB
}
6 changes: 1 addition & 5 deletions quesma/model/used_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,7 @@ func (v *usedColumns) VisitWindowFunction(f WindowFunction) interface{} {
}

func (v *usedColumns) VisitParenExpr(p ParenExpr) interface{} {
var exprs []Expr
for _, expr := range p.Exprs {
exprs = append(exprs, expr.Accept(v).(Expr))
}
return NewParenExpr(exprs...)
return p
}

func (v *usedColumns) VisitLambdaExpr(e LambdaExpr) interface{} {
Expand Down
89 changes: 86 additions & 3 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio
}

query := b.buildAggregationCommon(metadata)

switch metricsAggr.AggrType {
case "sum", "min", "max", "avg":
query.SelectCommand.Columns = append(query.SelectCommand.Columns, model.NewFunction(metricsAggr.AggrType+"OrNull", getFirstExpression()))
Expand Down Expand Up @@ -300,7 +299,19 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio
addColumn("varSamp")
addColumn("stddevPop")
addColumn("stddevSamp")

case "geo_centroid":
firstExpr := getFirstExpression()
if col, ok := firstExpr.(model.ColumnRef); ok {
colName := col.ColumnName
// TODO we have create columns according to the schema
latColumn := model.NewColumnRef(colName + "::lat")
lonColumn := model.NewColumnRef(colName + "::lon")
castLat := model.NewFunction("CAST", latColumn, model.NewLiteral(fmt.Sprintf("'%s'", "Float")))
castLon := model.NewFunction("CAST", lonColumn, model.NewLiteral(fmt.Sprintf("'%s'", "Float")))
query.SelectCommand.Columns = append(query.SelectCommand.Columns, model.NewFunction("avgOrNull", castLat))
query.SelectCommand.Columns = append(query.SelectCommand.Columns, model.NewFunction("avgOrNull", castLon))
query.SelectCommand.Columns = append(query.SelectCommand.Columns, model.NewFunction("count"))
}
default:
logger.WarnWithCtx(b.ctx).Msgf("unknown metrics aggregation: %s", metricsAggr.AggrType)
return nil
Expand Down Expand Up @@ -330,6 +341,8 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio
query.Type = metrics_aggregations.NewValueCount(b.ctx)
case "percentile_ranks":
query.Type = metrics_aggregations.NewPercentileRanks(b.ctx, metricsAggr.Keyed)
case "geo_centroid":
query.Type = metrics_aggregations.NewGeoCentroid(b.ctx)
}
return query
}
Expand Down Expand Up @@ -524,7 +537,7 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m
// full list: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-Aggregations-metrics.html
// shouldn't be hard to handle others, if necessary

metricsAggregations := []string{"sum", "avg", "min", "max", "cardinality", "value_count", "stats"}
metricsAggregations := []string{"sum", "avg", "min", "max", "cardinality", "value_count", "stats", "geo_centroid"}
for k, v := range queryMap {
if slices.Contains(metricsAggregations, k) {
field, isFromScript := cw.parseFieldFieldMaybeScript(v, k)
Expand Down Expand Up @@ -899,6 +912,76 @@ func (cw *ClickhouseQueryTranslator) tryBucketAggregation(currentAggr *aggrQuery
delete(queryMap, "date_range")
return success, 0, nil
}
if geoTileGridRaw, ok := queryMap["geotile_grid"]; ok {
geoTileGrid, ok := geoTileGridRaw.(QueryMap)
if !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("geotile_grid is not a map, but %T, value: %v", geoTileGridRaw, geoTileGridRaw)
}
var precision float64
precisionRaw, ok := geoTileGrid["precision"]
if ok {
switch cutValueTyped := precisionRaw.(type) {
case float64:
precision = cutValueTyped
}
}
field := cw.parseFieldField(geoTileGrid, "geotile_grid")
currentAggr.Type = bucket_aggregations.NewGeoTileGrid(cw.Ctx)

// That's bucket (group by) formula for geotile_grid
// zoom/x/y
// SELECT precision as zoom,
// FLOOR(((toFloat64("Location::lon") + 180.0) / 360.0) * POWER(2, zoom)) AS x_tile,
// FLOOR(
// (
// 1 - LOG(TAN(RADIANS(toFloat64("Location::lat"))) + (1 / COS(RADIANS(toFloat64("Location::lat"))))) / PI()
// ) / 2.0 * POWER(2, zoom)
// ) AS y_tile, count()
// FROM
// kibana_sample_data_flights Group by zoom, x_tile, y_tile

// TODO columns names should be created according to the schema
var lon = model.AsString(field)
lon = strings.Trim(lon, "\"")
lon = lon + "::lon"
var lat = model.AsString(field)
lat = strings.Trim(lat, "\"")
lat = lat + "::lat"
toFloatFunLon := model.NewFunction("toFloat64", model.NewColumnRef(lon))
var infixX model.Expr
infixX = model.NewParenExpr(model.NewInfixExpr(toFloatFunLon, "+", model.NewLiteral(180.0)))
infixX = model.NewParenExpr(model.NewInfixExpr(infixX, "/", model.NewLiteral(360.0)))
infixX = model.NewInfixExpr(infixX, "*",
model.NewFunction("POWER", model.NewLiteral(2), model.NewLiteral("zoom")))
xTile := model.NewAliasedExpr(model.NewFunction("FLOOR", infixX), "x_tile")
toFloatFunLat := model.NewFunction("toFloat64", model.NewColumnRef(lat))
radians := model.NewFunction("RADIANS", toFloatFunLat)
tan := model.NewFunction("TAN", radians)
cos := model.NewFunction("COS", radians)
Log := model.NewFunction("LOG", model.NewInfixExpr(tan, "+",
model.NewParenExpr(model.NewInfixExpr(model.NewLiteral(1), "/", cos))))

FloorContent := model.NewInfixExpr(
model.NewInfixExpr(
model.NewParenExpr(
model.NewInfixExpr(model.NewInfixExpr(model.NewLiteral(1), "-", Log), "/",
model.NewLiteral("PI()"))), "/",
model.NewLiteral(2.0)), "*",
model.NewFunction("POWER", model.NewLiteral(2), model.NewLiteral("zoom")))
yTile := model.NewAliasedExpr(
model.NewFunction("FLOOR", FloorContent), "y_tile")

currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, model.NewAliasedExpr(model.NewLiteral(precision), "zoom"))
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, xTile)
currentAggr.SelectCommand.Columns = append(currentAggr.SelectCommand.Columns, yTile)

currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, model.NewColumnRef("zoom"))
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, model.NewColumnRef("x_tile"))
currentAggr.SelectCommand.GroupBy = append(currentAggr.SelectCommand.GroupBy, model.NewColumnRef("y_tile"))

delete(queryMap, "geotile_grid")
return success, 3, err
}
if _, ok := queryMap["sampler"]; ok {
currentAggr.Type = metrics_aggregations.NewCount(cw.Ctx)
delete(queryMap, "sampler")
Expand Down
56 changes: 56 additions & 0 deletions quesma/queryparser/query_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ func (cw *ClickhouseQueryTranslator) parseQueryMap(queryMap QueryMap) model.Simp
"query_string": cw.parseQueryString,
"simple_query_string": cw.parseQueryString,
"regexp": cw.parseRegexp,
"geo_bounding_box": cw.parseGeoBoundingBox,
}
for k, v := range queryMap {
if f, ok := parseMap[k]; ok {
Expand Down Expand Up @@ -875,6 +876,7 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ
}

// parseDateTimeString returns string used to parse DateTime in Clickhouse (depends on column type)

func (cw *ClickhouseQueryTranslator) parseDateTimeString(table *clickhouse.Table, field, dateTime string) (string, string) {
typ := table.GetDateTimeType(cw.Ctx, cw.ResolveField(cw.Ctx, field))
switch typ {
Expand Down Expand Up @@ -913,6 +915,15 @@ func (cw *ClickhouseQueryTranslator) parseExists(queryMap QueryMap) model.Simple
model.NewLiteral("size0"),
), "=", model.NewLiteral("0"))
case clickhouse.NotExists:
// TODO this is a workaround for the case when the field is a point
schemaInstance, exists := cw.SchemaRegistry.FindSchema(schema.TableName(cw.Table.Name))
if exists {
if value, ok := schemaInstance.Fields[schema.FieldName(fieldName)]; ok {
if value.Type.Equal(schema.TypePoint) {
return model.NewSimpleQuery(sql, true)
}
}
}
attrs := cw.Table.GetAttributesList()
stmts := make([]model.Expr, len(attrs))
for i, a := range attrs {
Expand Down Expand Up @@ -1375,3 +1386,48 @@ func (cw *ClickhouseQueryTranslator) GetDateTimeTypeFromSelectClause(ctx context
}
return clickhouse.Invalid
}

func (cw *ClickhouseQueryTranslator) parseGeoBoundingBox(queryMap QueryMap) model.SimpleQuery {
stmts := make([]model.Expr, 0)
bottomRightExpressions := make([]model.Expr, 0)
topLeftExpressions := make([]model.Expr, 0)
var field string
for k, v := range queryMap {
// TODO handle lat lon as array case for now
// Generate following where statement, assuming that field
// is equal to "Location"
// GEO_BOUNDING_BOX("Location", top_left_lat, top_left_lon, bottom_right_lat, bottom_right_lon))
// GEO_BOUNDING_BOX here is an abstract geo function that will be mapped
// later to specific Clickhouse (or any other db function in the future)
// it takes 5 arguments: field, topLeftLat, topLeftLon, bottomRightLat, bottomRightLon
field = k
if bottomRight, ok := v.(QueryMap)["bottom_right"]; ok {
if bottomRightCornerAsArray, ok := bottomRight.([]interface{}); ok {
bottomRightExpressions = append(bottomRightExpressions, model.NewLiteral(fmt.Sprintf("%v", bottomRightCornerAsArray[0])))
bottomRightExpressions = append(bottomRightExpressions, model.NewLiteral(fmt.Sprintf("%v", bottomRightCornerAsArray[1])))
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("no bottom_right in geo_bounding_box query: %v", queryMap)
return model.NewSimpleQuery(nil, false)
}
if topLeft, ok := v.(QueryMap)["top_left"]; ok {
if topLeftCornerAsArray, ok := topLeft.([]interface{}); ok {
topLeftExpressions = append(topLeftExpressions, model.NewLiteral(fmt.Sprintf("%v", topLeftCornerAsArray[0])))
topLeftExpressions = append(topLeftExpressions, model.NewLiteral(fmt.Sprintf("%v", topLeftCornerAsArray[1])))
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("no top_left in geo_bounding_box query: %v", queryMap)
return model.NewSimpleQuery(nil, false)
}
args := make([]model.Expr, 0)
args = append(args, model.NewColumnRef(field))
args = append(args, topLeftExpressions...)
args = append(args, bottomRightExpressions...)
fun := model.NewFunction("GEO_BOUNDING_BOX", args...)
_ = fun
// TODO uncomment when GEO_BOUNDING_BOX is implemented
// it requires additional transformation to update field names
//stmts = append(stmts, fun)
}
return model.NewSimpleQuery(model.And(stmts), true)
}
4 changes: 3 additions & 1 deletion quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,10 @@ func (cw *ClickhouseQueryTranslator) makeTotalCount(queries []*model.Query, resu
totalCount += int(v)
case int:
totalCount += v
case int64:
totalCount += int(v)
default:
logger.ErrorWithCtx(cw.Ctx).Msgf("Unknown type of count %v", v)
logger.ErrorWithCtx(cw.Ctx).Msgf("Unknown type of count %v %t", v, v)
}
}
}
Expand Down
Loading

0 comments on commit bd2fa7f

Please sign in to comment.