From 8a83728cad64214781b21e307f66add8b072ba60 Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Wed, 19 Jun 2024 19:48:24 +0200 Subject: [PATCH] Flight dashboard, introducing geo transformation (#324) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR introduces first version of geo transformation --------- Co-authored-by: Przemysław Hejman --- docker/quesma/config/local-dev.yaml | 1 + .../elastic_clickhouse_fields.go | 7 +- quesma/plugins/registry/plugin_registry.go | 6 +- quesma/queryparser/aggregation_parser.go | 4 +- quesma/queryparser/aggregation_parser_test.go | 8 +- quesma/queryparser/query_translator.go | 2 +- quesma/queryparser/query_translator_test.go | 2 +- quesma/quesma/schema_transformer.go | 117 ++++++++++++++++-- quesma/quesma/search.go | 2 +- 9 files changed, 128 insertions(+), 21 deletions(-) diff --git a/docker/quesma/config/local-dev.yaml b/docker/quesma/config/local-dev.yaml index 1f02dfa7c..4154c5c12 100644 --- a/docker/quesma/config/local-dev.yaml +++ b/docker/quesma/config/local-dev.yaml @@ -22,6 +22,7 @@ indexes: enabled: true mappings: DestLocation: "geo_point" + OriginLocation: "geo_point" kibana_sample_data_logs: enabled: true mappings: diff --git a/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go b/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go index 7e9937d27..591be9fc5 100644 --- a/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go +++ b/quesma/plugins/elastic_clickhouse_fields/elastic_clickhouse_fields.go @@ -209,7 +209,9 @@ func (t *columNameFormatter) Format(namespace, columnName string) string { type Dot2DoubleColons struct{} func (p *Dot2DoubleColons) matches(table string) bool { - return !strings.HasPrefix(table, "kibana_") + // TODO this breaks geo stuff + //return !strings.HasPrefix(table, "kibana_") + return true } func (p *Dot2DoubleColons) ApplyIngestTransformers(table string, cfg config.QuesmaConfiguration, transformers []plugins.IngestTransformer) []plugins.IngestTransformer { @@ -243,7 +245,8 @@ func (p *Dot2DoubleColons) GetTableColumnFormatter(table string, cfg config.Ques type Dot2DoubleColons2Dot struct{} func (*Dot2DoubleColons2Dot) matches(table string) bool { - return strings.HasPrefix(table, "kibana_") + //return strings.HasPrefix(table, "kibana_") + return false } func (*Dot2DoubleColons2Dot) IngestTransformer() plugins.IngestTransformer { diff --git a/quesma/plugins/registry/plugin_registry.go b/quesma/plugins/registry/plugin_registry.go index 9a821d603..199f07354 100644 --- a/quesma/plugins/registry/plugin_registry.go +++ b/quesma/plugins/registry/plugin_registry.go @@ -11,8 +11,10 @@ var registeredPlugins []plugins.Plugin func init() { registeredPlugins = []plugins.Plugin{ - &elastic_clickhouse_fields.Dot2DoubleColons2Dot{}, - &elastic_clickhouse_fields.Dot2DoubleUnderscores2Dot{}, + // TODO below plugins are disabled due to some + // interferences with other components + //&elastic_clickhouse_fields.Dot2DoubleColons2Dot{}, + //&elastic_clickhouse_fields.Dot2DoubleUnderscores2Dot{}, &elastic_clickhouse_fields.Dot2DoubleColons{}} } diff --git a/quesma/queryparser/aggregation_parser.go b/quesma/queryparser/aggregation_parser.go index a1a1b71e3..b5a339070 100644 --- a/quesma/queryparser/aggregation_parser.go +++ b/quesma/queryparser/aggregation_parser.go @@ -137,7 +137,7 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio ) */ query.SelectCommand.FromClause = query.NewSelectExprWithRowNumber( - innerFieldsAsSelect, b.SelectCommand.GroupBy, b.whereBuilder.WhereClause, "", true) + query.SelectCommand.Columns, b.SelectCommand.GroupBy, b.whereBuilder.WhereClause, "", true) query.SelectCommand.WhereClause = model.And([]model.Expr{ query.SelectCommand.WhereClause, model.NewInfixExpr( @@ -146,7 +146,7 @@ func (b *aggrQueryBuilder) buildMetricsAggregation(metricsAggr metricsAggregatio model.NewLiteral(strconv.Itoa(metricsAggr.Size)), )}, ) - + query.SelectCommand.GroupBy = append(query.SelectCommand.GroupBy, innerFieldsAsSelect...) case "top_metrics": // This appending of `metricsAggr.SortBy` and having it duplicated in SELECT block // is a way to pass value we're sorting by to the query.SelectCommand.result. In the future we might add SQL aliasing support, e.g. SELECT x AS 'sort_by' FROM ... diff --git a/quesma/queryparser/aggregation_parser_test.go b/quesma/queryparser/aggregation_parser_test.go index 555eb2088..56eeea919 100644 --- a/quesma/queryparser/aggregation_parser_test.go +++ b/quesma/queryparser/aggregation_parser_test.go @@ -299,18 +299,18 @@ var aggregationTests = []struct { `SELECT "OriginAirportID", "DestAirportID", count() FROM ` + tableNameQuoted + ` ` + `GROUP BY "OriginAirportID", "DestAirportID" ORDER BY "OriginAirportID", "DestAirportID"`, `SELECT "OriginAirportID", "DestAirportID", "DestLocation" ` + - `FROM (SELECT "DestLocation", ROW_NUMBER() ` + + `FROM (SELECT "OriginAirportID", "DestAirportID", "DestLocation", ROW_NUMBER() ` + `OVER (PARTITION BY "OriginAirportID", "DestAirportID") AS "row_number" ` + `FROM "logs-generic-default") ` + `WHERE "row_number"<=1 ` + - `GROUP BY "OriginAirportID", "DestAirportID" ` + + `GROUP BY "OriginAirportID", "DestAirportID", "DestLocation" ` + `ORDER BY "OriginAirportID", "DestAirportID"`, `SELECT "OriginAirportID", "OriginLocation", "Origin" ` + - `FROM (SELECT "OriginLocation", "Origin", ROW_NUMBER() ` + + `FROM (SELECT "OriginAirportID", "OriginLocation", "Origin", ROW_NUMBER() ` + `OVER (PARTITION BY "OriginAirportID") AS "row_number" ` + `FROM "logs-generic-default") ` + `WHERE "row_number"<=1 ` + - `GROUP BY "OriginAirportID" ` + + `GROUP BY "OriginAirportID", "OriginLocation", "Origin" ` + `ORDER BY "OriginAirportID"`, `SELECT "OriginAirportID", count() FROM ` + tableNameQuoted + ` GROUP BY "OriginAirportID" ORDER BY "OriginAirportID"`, }, diff --git a/quesma/queryparser/query_translator.go b/quesma/queryparser/query_translator.go index e031d7214..7786d02b9 100644 --- a/quesma/queryparser/query_translator.go +++ b/quesma/queryparser/query_translator.go @@ -328,7 +328,7 @@ func (cw *ClickhouseQueryTranslator) MakeSearchResponse(queries []*model.Query, if hits != nil { response.Hits = *hits } else { - response.Hits = model.SearchHits{} + response.Hits = model.SearchHits{Hits: []model.SearchHit{}} // empty hits } if total != nil { response.Hits.Total = total diff --git a/quesma/queryparser/query_translator_test.go b/quesma/queryparser/query_translator_test.go index 9cdb8082d..7f10ab176 100644 --- a/quesma/queryparser/query_translator_test.go +++ b/quesma/queryparser/query_translator_test.go @@ -160,7 +160,7 @@ func TestMakeResponseSearchQuery(t *testing.T) { t.Error(err) } assert.Empty(t, actualMinusExpected) - assert.Empty(t, expectedMinusActual) + assert.Len(t, expectedMinusActual, 1) }) } } diff --git a/quesma/quesma/schema_transformer.go b/quesma/quesma/schema_transformer.go index 2c0a01a7f..1859e8c86 100644 --- a/quesma/quesma/schema_transformer.go +++ b/quesma/quesma/schema_transformer.go @@ -4,6 +4,7 @@ import ( "mitmproxy/quesma/logger" "mitmproxy/quesma/model" "mitmproxy/quesma/quesma/config" + "mitmproxy/quesma/schema" "strings" ) @@ -123,7 +124,8 @@ func (v *WhereVisitor) VisitSelectCommand(e model.SelectCommand) interface{} func (v *WhereVisitor) VisitWindowFunction(e model.WindowFunction) interface{} { return e } type SchemaCheckPass struct { - cfg map[string]config.IndexConfiguration + cfg map[string]config.IndexConfiguration + schemaRegistry schema.Registry } // This functions trims the db name from the table name if exists @@ -158,17 +160,116 @@ func (s *SchemaCheckPass) applyIpTransformations(query *model.Query) (*model.Que return query, nil } +type GeoIpVisitor struct { + tableName string + schemaRegistry schema.Registry +} + +func (v *GeoIpVisitor) VisitLiteral(e model.LiteralExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitInfix(e model.InfixExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitPrefixExpr(e model.PrefixExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitFunction(e model.FunctionExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitColumnRef(e model.ColumnRef) interface{} { + return e +} +func (v *GeoIpVisitor) VisitNestedProperty(e model.NestedProperty) interface{} { return e } +func (v *GeoIpVisitor) VisitArrayAccess(e model.ArrayAccess) interface{} { return e } +func (v *GeoIpVisitor) MultiFunctionExpr(e model.MultiFunctionExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitMultiFunction(e model.MultiFunctionExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitString(e model.StringExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitOrderByExpr(e model.OrderByExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitDistinctExpr(e model.DistinctExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitTableRef(e model.TableRef) interface{} { + return model.NewTableRef(e.Name) +} +func (v *GeoIpVisitor) VisitAliasedExpr(e model.AliasedExpr) interface{} { return e } +func (v *GeoIpVisitor) VisitWindowFunction(e model.WindowFunction) interface{} { return e } + +func (v *GeoIpVisitor) VisitSelectCommand(e model.SelectCommand) interface{} { + if v.schemaRegistry == nil { + return e + } + schemaInstance, exists := v.schemaRegistry.FindSchema(schema.TableName(v.tableName)) + if !exists { + return e + } + var groupBy []model.Expr + for _, expr := range e.GroupBy { + groupByExpr := expr.Accept(v).(model.Expr) + if col, ok := expr.(model.ColumnRef); ok { + // This checks if the column is of type point + // and if it is, it appends the lat and lon columns to the group by clause + if schemaInstance.Fields[schema.FieldName(col.ColumnName)].Type.Name == schema.TypePoint.Name { + // TODO suffixes ::lat, ::lon are hardcoded for now + groupBy = append(groupBy, model.NewColumnRef(col.ColumnName+"::lat")) + groupBy = append(groupBy, model.NewColumnRef(col.ColumnName+"::lon")) + } else { + groupBy = append(groupBy, groupByExpr) + } + } else { + groupBy = append(groupBy, groupByExpr) + } + } + var columns []model.Expr + for _, expr := range e.Columns { + if col, ok := expr.(model.ColumnRef); ok { + // This checks if the column is of type point + // and if it is, it appends the lat and lon columns to the select clause + if schemaInstance.Fields[schema.FieldName(col.ColumnName)].Type.Name == schema.TypePoint.Name { + // TODO suffixes ::lat, ::lon are hardcoded for now + columns = append(columns, model.NewColumnRef(col.ColumnName+"::lat")) + columns = append(columns, model.NewColumnRef(col.ColumnName+"::lon")) + } else { + columns = append(columns, expr.Accept(v).(model.Expr)) + } + } else { + columns = append(columns, expr.Accept(v).(model.Expr)) + } + } + + var fromClause model.Expr + if e.FromClause != nil { + fromClause = e.FromClause.Accept(v).(model.Expr) + } + + return model.NewSelectCommand(columns, groupBy, e.OrderBy, + fromClause, e.WhereClause, e.Limit, e.SampleLimit, e.IsDistinct) +} + +func (s *SchemaCheckPass) applyGeoTransformations(query *model.Query) (*model.Query, error) { + if query.SelectCommand.WhereClause == nil { + return query, nil + } + fromTable := getFromTable(query.TableName) + + geoIpVisitor := &GeoIpVisitor{tableName: fromTable, schemaRegistry: s.schemaRegistry} + expr := query.SelectCommand.Accept(geoIpVisitor) + if _, ok := expr.(*model.SelectCommand); ok { + query.SelectCommand = *expr.(*model.SelectCommand) + } + return query, nil +} + func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, error) { for k, query := range queries { var err error - inputQuery := query.SelectCommand.String() - query, err = s.applyIpTransformations(query) - if query.SelectCommand.String() != inputQuery { - logger.Info().Msgf("IpTransformation triggered, input query: %s", inputQuery) - logger.Info().Msgf("IpTransformation triggered, output query: %s", query.SelectCommand.String()) + transformationChain := []struct { + TransformationName string + Transformation func(*model.Query) (*model.Query, error) + }{ + {TransformationName: "IpTransformation", Transformation: s.applyIpTransformations}, + {TransformationName: "GeoTransformation", Transformation: s.applyGeoTransformations}, } - if err != nil { - return nil, err + for _, transformation := range transformationChain { + inputQuery := query.SelectCommand.String() + query, err = transformation.Transformation(query) + if query.SelectCommand.String() != inputQuery { + logger.Info().Msgf(transformation.TransformationName+" triggered, input query: %s", inputQuery) + logger.Info().Msgf(transformation.TransformationName+" triggered, output query: %s", query.SelectCommand.String()) + } + if err != nil { + return nil, err + } } queries[k] = query } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 8157e4cd5..64f0889be 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -76,7 +76,7 @@ func NewQueryRunner(lm *clickhouse.LogManager, cfg config.QuesmaConfiguration, i AsyncQueriesContexts: concurrent.NewMap[string, *AsyncQueryContext](), transformationPipeline: TransformationPipeline{ transformers: []plugins.QueryTransformer{ - &SchemaCheckPass{cfg: cfg.IndexConfig}, // this can be a part of another plugin + &SchemaCheckPass{cfg: cfg.IndexConfig, schemaRegistry: schemaRegistry}, // this can be a part of another plugin }, }, schemaRegistry: schemaRegistry}