Skip to content

Commit

Permalink
Fix array handling (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
nablaone authored Sep 12, 2024
1 parent bdfac95 commit be27360
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 35 deletions.
30 changes: 12 additions & 18 deletions quesma/quesma/schema_array_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package quesma

import (
"fmt"
"quesma/logger"
"quesma/model"
"quesma/schema"
Expand Down Expand Up @@ -49,13 +48,9 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) model.ExprVisitor {
column, ok := e.Left.(model.ColumnRef)
if ok {
dbType := resolver.dbColumnType(column.ColumnName)

if strings.HasPrefix(dbType, "Array") {

op := strings.ToUpper(e.Op)

switch {

case (op == "ILIKE" || op == "LIKE") && dbType == "Array(String)":

variableName := "x"
Expand All @@ -66,7 +61,7 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) model.ExprVisitor {
return model.NewFunction("has", e.Left, e.Right.Accept(b).(model.Expr))

default:
logger.Warn().Msgf("Unhandled array infix operation %s, column %v (%v)", e.Op, column.ColumnName, dbType)
logger.Error().Msgf("Unhandled array infix operation %s, column %v (%v)", e.Op, column.ColumnName, dbType)
}
}
}
Expand All @@ -79,23 +74,22 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) model.ExprVisitor {
}

visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} {
if len(e.Args) == 1 {

if len(e.Args) > 0 {
arg := e.Args[0]
column, ok := arg.(model.ColumnRef)
if ok {
dbType := resolver.dbColumnType(column.ColumnName)
if strings.HasPrefix(dbType, "Array") {
switch {

case e.Name == "sumOrNull" && dbType == "Array(Int64)":
fnName := model.LiteralExpr{Value: fmt.Sprintf("'%s'", e.Name)}
wrapped := model.NewFunction("arrayReduce", fnName, column)
wrapped = model.NewFunction(e.Name, wrapped)
return wrapped

default:
logger.Warn().Msgf("Unhandled array function %s, column %v (%v)", e.Name, column.ColumnName, dbType)

if strings.HasPrefix(e.Name, "sum") {
// here we apply -Array combinator to the sum function
// https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-array
//
// TODO this can be rewritten to transform all aggregate functions as well
//
e.Name = strings.ReplaceAll(e.Name, "sum", "sumArray")
} else {
logger.Error().Msgf("Unhandled array function %s, column %v (%v)", e.Name, column.ColumnName, dbType)
}
}
}
Expand Down
35 changes: 26 additions & 9 deletions quesma/quesma/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,7 @@ func (s *SchemaCheckPass) applyArrayTransformations(indexSchema schema.Schema, q

// check if the query has array columns

selectCommand := query.SelectCommand

var allColumns []model.ColumnRef
for _, expr := range selectCommand.Columns {
allColumns = append(allColumns, model.GetUsedColumns(expr)...)
}
if selectCommand.WhereClause != nil {
allColumns = append(allColumns, model.GetUsedColumns(selectCommand.WhereClause)...)
}
allColumns := model.GetUsedColumns(query.SelectCommand)

hasArrayColumn := false
for _, col := range allColumns {
Expand Down Expand Up @@ -559,3 +551,28 @@ func (g *GeoIpResultTransformer) Transform(result [][]model.QueryResultRow) ([][
}
return result, nil
}

// ArrayResultTransformer is a transformer that transforms array columns into string representation
type ArrayResultTransformer struct {
}

func (g *ArrayResultTransformer) Transform(result [][]model.QueryResultRow) ([][]model.QueryResultRow, error) {

for i, rows := range result {

for j, row := range rows {
for k, col := range row.Cols {

if ary, ok := col.Value.([]string); ok {
aryStr := make([]string, 0, len(ary))
for _, el := range ary {
aryStr = append(aryStr, fmt.Sprintf("%v", el))
}
result[i][j].Cols[k].Value = fmt.Sprintf("[%s]", strings.Join(aryStr, ","))
}
}
}

}
return result, nil
}
2 changes: 1 addition & 1 deletion quesma/quesma/schema_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ func Test_arrayType(t *testing.T) {
FromClause: model.NewTableRef("kibana_sample_data_ecommerce"),
Columns: []model.Expr{
model.NewColumnRef("order_date"),
model.NewFunction("sumOrNull", model.NewFunction("arrayReduce", model.NewLiteral("'sumOrNull'"), model.NewColumnRef("products::quantity"))),
model.NewFunction("sumArrayOrNull", model.NewColumnRef("products::quantity")),
},
GroupBy: []model.Expr{model.NewColumnRef("order_date")},
},
Expand Down
26 changes: 19 additions & 7 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,17 +765,29 @@ func (q *QueryRunner) findNonexistingProperties(query *model.Query, table *click

func (q *QueryRunner) postProcessResults(table *clickhouse.Table, results [][]model.QueryResultRow) ([][]model.QueryResultRow, error) {

transformer := &replaceColumNamesWithFieldNames{}
pipeline := []struct {
name string
transformer model.ResultTransformer
}{
{"replaceColumNamesWithFieldNames", &replaceColumNamesWithFieldNames{}},
{"geoIpResultTransformer", &GeoIpResultTransformer{schemaRegistry: q.schemaRegistry, fromTable: table.Name}},
{"arrayResultTransformer", &ArrayResultTransformer{}},
}

res, err := transformer.Transform(results)
var err error
for _, t := range pipeline {

if err != nil {
return nil, err
// TODO we should check if the transformer is applicable here
// for example if the schema doesn't hava array fields, we should skip the arrayResultTransformer
// these transformers can be cpu and mem consuming

results, err = t.transformer.Transform(results)
if err != nil {
return nil, fmt.Errorf("resuls transformer %s has failed: %w", t.name, err)
}
}

// TODO this should be created in different place
geoIpTransformer := GeoIpResultTransformer{schemaRegistry: q.schemaRegistry, fromTable: table.Name}
return geoIpTransformer.Transform(res)
return results, nil
}

func pushPrimaryInfo(qmc *ui.QuesmaManagementConsole, Id string, QueryResp []byte, startTime time.Time) {
Expand Down

0 comments on commit be27360

Please sign in to comment.