Skip to content

Commit

Permalink
Flight dashboard, introducing geo transformation (#324)
Browse files Browse the repository at this point in the history
This PR introduces first version of geo transformation

---------

Co-authored-by: Przemysław Hejman <[email protected]>
  • Loading branch information
pdelewski and mieciu authored Jun 19, 2024
1 parent 207a937 commit 8a83728
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 21 deletions.
1 change: 1 addition & 0 deletions docker/quesma/config/local-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ indexes:
enabled: true
mappings:
DestLocation: "geo_point"
OriginLocation: "geo_point"
kibana_sample_data_logs:
enabled: true
mappings:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,9 @@ func (t *columNameFormatter) Format(namespace, columnName string) string {
type Dot2DoubleColons struct{}

func (p *Dot2DoubleColons) matches(table string) bool {
return !strings.HasPrefix(table, "kibana_")
// TODO this breaks geo stuff
//return !strings.HasPrefix(table, "kibana_")
return true
}

func (p *Dot2DoubleColons) ApplyIngestTransformers(table string, cfg config.QuesmaConfiguration, transformers []plugins.IngestTransformer) []plugins.IngestTransformer {
Expand Down Expand Up @@ -243,7 +245,8 @@ func (p *Dot2DoubleColons) GetTableColumnFormatter(table string, cfg config.Ques
type Dot2DoubleColons2Dot struct{}

func (*Dot2DoubleColons2Dot) matches(table string) bool {
return strings.HasPrefix(table, "kibana_")
//return strings.HasPrefix(table, "kibana_")
return false
}

func (*Dot2DoubleColons2Dot) IngestTransformer() plugins.IngestTransformer {
Expand Down
6 changes: 4 additions & 2 deletions quesma/plugins/registry/plugin_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ var registeredPlugins []plugins.Plugin

func init() {
registeredPlugins = []plugins.Plugin{
&elastic_clickhouse_fields.Dot2DoubleColons2Dot{},
&elastic_clickhouse_fields.Dot2DoubleUnderscores2Dot{},
// TODO below plugins are disabled due to some
// interferences with other components
//&elastic_clickhouse_fields.Dot2DoubleColons2Dot{},
//&elastic_clickhouse_fields.Dot2DoubleUnderscores2Dot{},
&elastic_clickhouse_fields.Dot2DoubleColons{}}
}

Expand Down
4 changes: 2 additions & 2 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio
)
*/
query.SelectCommand.FromClause = query.NewSelectExprWithRowNumber(
innerFieldsAsSelect, b.SelectCommand.GroupBy, b.whereBuilder.WhereClause, "", true)
query.SelectCommand.Columns, b.SelectCommand.GroupBy, b.whereBuilder.WhereClause, "", true)
query.SelectCommand.WhereClause = model.And([]model.Expr{
query.SelectCommand.WhereClause,
model.NewInfixExpr(
Expand All @@ -146,7 +146,7 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio
model.NewLiteral(strconv.Itoa(metricsAggr.Size)),
)},
)

query.SelectCommand.GroupBy = append(query.SelectCommand.GroupBy, innerFieldsAsSelect...)
case "top_metrics":
// This appending of `metricsAggr.SortBy` and having it duplicated in SELECT block
// is a way to pass value we're sorting by to the query.SelectCommand.result. In the future we might add SQL aliasing support, e.g. SELECT x AS 'sort_by' FROM ...
Expand Down
8 changes: 4 additions & 4 deletions quesma/queryparser/aggregation_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,18 +299,18 @@ var aggregationTests = []struct {
`SELECT "OriginAirportID", "DestAirportID", count() FROM ` + tableNameQuoted + ` ` +
`GROUP BY "OriginAirportID", "DestAirportID" ORDER BY "OriginAirportID", "DestAirportID"`,
`SELECT "OriginAirportID", "DestAirportID", "DestLocation" ` +
`FROM (SELECT "DestLocation", ROW_NUMBER() ` +
`FROM (SELECT "OriginAirportID", "DestAirportID", "DestLocation", ROW_NUMBER() ` +
`OVER (PARTITION BY "OriginAirportID", "DestAirportID") AS "row_number" ` +
`FROM "logs-generic-default") ` +
`WHERE "row_number"<=1 ` +
`GROUP BY "OriginAirportID", "DestAirportID" ` +
`GROUP BY "OriginAirportID", "DestAirportID", "DestLocation" ` +
`ORDER BY "OriginAirportID", "DestAirportID"`,
`SELECT "OriginAirportID", "OriginLocation", "Origin" ` +
`FROM (SELECT "OriginLocation", "Origin", ROW_NUMBER() ` +
`FROM (SELECT "OriginAirportID", "OriginLocation", "Origin", ROW_NUMBER() ` +
`OVER (PARTITION BY "OriginAirportID") AS "row_number" ` +
`FROM "logs-generic-default") ` +
`WHERE "row_number"<=1 ` +
`GROUP BY "OriginAirportID" ` +
`GROUP BY "OriginAirportID", "OriginLocation", "Origin" ` +
`ORDER BY "OriginAirportID"`,
`SELECT "OriginAirportID", count() FROM ` + tableNameQuoted + ` GROUP BY "OriginAirportID" ORDER BY "OriginAirportID"`,
},
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (cw *ClickhouseQueryTranslator) MakeSearchResponse(queries []*model.Query,
if hits != nil {
response.Hits = *hits
} else {
response.Hits = model.SearchHits{}
response.Hits = model.SearchHits{Hits: []model.SearchHit{}} // empty hits
}
if total != nil {
response.Hits.Total = total
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/query_translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestMakeResponseSearchQuery(t *testing.T) {
t.Error(err)
}
assert.Empty(t, actualMinusExpected)
assert.Empty(t, expectedMinusActual)
assert.Len(t, expectedMinusActual, 1)
})
}
}
Expand Down
117 changes: 109 additions & 8 deletions quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/schema"
"strings"
)

Expand Down Expand Up @@ -123,7 +124,8 @@ func (v *WhereVisitor) VisitSelectCommand(e model.SelectCommand) interface{}
func (v *WhereVisitor) VisitWindowFunction(e model.WindowFunction) interface{} { return e }

type SchemaCheckPass struct {
cfg map[string]config.IndexConfiguration
cfg map[string]config.IndexConfiguration
schemaRegistry schema.Registry
}

// This functions trims the db name from the table name if exists
Expand Down Expand Up @@ -158,17 +160,116 @@ func (s *SchemaCheckPass) applyIpTransformations(query *model.Query) (*model.Que
return query, nil
}

type GeoIpVisitor struct {
tableName string
schemaRegistry schema.Registry
}

func (v *GeoIpVisitor) VisitLiteral(e model.LiteralExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitInfix(e model.InfixExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitPrefixExpr(e model.PrefixExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitFunction(e model.FunctionExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitColumnRef(e model.ColumnRef) interface{} {
return e
}
func (v *GeoIpVisitor) VisitNestedProperty(e model.NestedProperty) interface{} { return e }
func (v *GeoIpVisitor) VisitArrayAccess(e model.ArrayAccess) interface{} { return e }
func (v *GeoIpVisitor) MultiFunctionExpr(e model.MultiFunctionExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitMultiFunction(e model.MultiFunctionExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitString(e model.StringExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitOrderByExpr(e model.OrderByExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitDistinctExpr(e model.DistinctExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitTableRef(e model.TableRef) interface{} {
return model.NewTableRef(e.Name)
}
func (v *GeoIpVisitor) VisitAliasedExpr(e model.AliasedExpr) interface{} { return e }
func (v *GeoIpVisitor) VisitWindowFunction(e model.WindowFunction) interface{} { return e }

func (v *GeoIpVisitor) VisitSelectCommand(e model.SelectCommand) interface{} {
if v.schemaRegistry == nil {
return e
}
schemaInstance, exists := v.schemaRegistry.FindSchema(schema.TableName(v.tableName))
if !exists {
return e
}
var groupBy []model.Expr
for _, expr := range e.GroupBy {
groupByExpr := expr.Accept(v).(model.Expr)
if col, ok := expr.(model.ColumnRef); ok {
// This checks if the column is of type point
// and if it is, it appends the lat and lon columns to the group by clause
if schemaInstance.Fields[schema.FieldName(col.ColumnName)].Type.Name == schema.TypePoint.Name {
// TODO suffixes ::lat, ::lon are hardcoded for now
groupBy = append(groupBy, model.NewColumnRef(col.ColumnName+"::lat"))
groupBy = append(groupBy, model.NewColumnRef(col.ColumnName+"::lon"))
} else {
groupBy = append(groupBy, groupByExpr)
}
} else {
groupBy = append(groupBy, groupByExpr)
}
}
var columns []model.Expr
for _, expr := range e.Columns {
if col, ok := expr.(model.ColumnRef); ok {
// This checks if the column is of type point
// and if it is, it appends the lat and lon columns to the select clause
if schemaInstance.Fields[schema.FieldName(col.ColumnName)].Type.Name == schema.TypePoint.Name {
// TODO suffixes ::lat, ::lon are hardcoded for now
columns = append(columns, model.NewColumnRef(col.ColumnName+"::lat"))
columns = append(columns, model.NewColumnRef(col.ColumnName+"::lon"))
} else {
columns = append(columns, expr.Accept(v).(model.Expr))
}
} else {
columns = append(columns, expr.Accept(v).(model.Expr))
}
}

var fromClause model.Expr
if e.FromClause != nil {
fromClause = e.FromClause.Accept(v).(model.Expr)
}

return model.NewSelectCommand(columns, groupBy, e.OrderBy,
fromClause, e.WhereClause, e.Limit, e.SampleLimit, e.IsDistinct)
}

func (s *SchemaCheckPass) applyGeoTransformations(query *model.Query) (*model.Query, error) {
if query.SelectCommand.WhereClause == nil {
return query, nil
}
fromTable := getFromTable(query.TableName)

geoIpVisitor := &GeoIpVisitor{tableName: fromTable, schemaRegistry: s.schemaRegistry}
expr := query.SelectCommand.Accept(geoIpVisitor)
if _, ok := expr.(*model.SelectCommand); ok {
query.SelectCommand = *expr.(*model.SelectCommand)
}
return query, nil
}

func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, error) {
for k, query := range queries {
var err error
inputQuery := query.SelectCommand.String()
query, err = s.applyIpTransformations(query)
if query.SelectCommand.String() != inputQuery {
logger.Info().Msgf("IpTransformation triggered, input query: %s", inputQuery)
logger.Info().Msgf("IpTransformation triggered, output query: %s", query.SelectCommand.String())
transformationChain := []struct {
TransformationName string
Transformation func(*model.Query) (*model.Query, error)
}{
{TransformationName: "IpTransformation", Transformation: s.applyIpTransformations},
{TransformationName: "GeoTransformation", Transformation: s.applyGeoTransformations},
}
if err != nil {
return nil, err
for _, transformation := range transformationChain {
inputQuery := query.SelectCommand.String()
query, err = transformation.Transformation(query)
if query.SelectCommand.String() != inputQuery {
logger.Info().Msgf(transformation.TransformationName+" triggered, input query: %s", inputQuery)
logger.Info().Msgf(transformation.TransformationName+" triggered, output query: %s", query.SelectCommand.String())
}
if err != nil {
return nil, err
}
}
queries[k] = query
}
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewQueryRunner(lm *clickhouse.LogManager, cfg config.QuesmaConfiguration, i
AsyncQueriesContexts: concurrent.NewMap[string, *AsyncQueryContext](),
transformationPipeline: TransformationPipeline{
transformers: []plugins.QueryTransformer{
&SchemaCheckPass{cfg: cfg.IndexConfig}, // this can be a part of another plugin
&SchemaCheckPass{cfg: cfg.IndexConfig, schemaRegistry: schemaRegistry}, // this can be a part of another plugin
},
}, schemaRegistry: schemaRegistry}

Expand Down

0 comments on commit 8a83728

Please sign in to comment.