Skip to content

Commit

Permalink
Query column as expression 4 (#243)
Browse files Browse the repository at this point in the history
TL;DR
* Change all our left expressions from plain strings to
`model.SelectColumn` (`FromClause` and a couple of complex SQLs). We're
currently thinking what this `FromClause` should ultimately be, so
that'll probably change, but still it's a step forward.
* Add pretty print for errors in all tests with `sqlmock`
---------------------------------------------
Some older story:

So I basically wanted to improve my last `GROUP BY` change just a little
bit, by making just a few expressions like that
```
Select := fmt.Sprintf("count(if(%s<=%f, 1, NULL))/count(*)*100", strconv.Quote(getFirstFieldName()), cutValue)
query.Columns = append(query.Columns, model.SelectColumn{Expression: aexp.SQL{Query: Select}})
```
properly typed, instead of using `aexp.SQL{}`. But the scope of this PR
kind of spiraled out of control, as those small changes made some other
part of code not compile, and that repeated a lot of times.

But it's not that bad, all these changes change code from untyped
`string` field representation to our typed system. One most notable
change is `Query.FromClause` also becomes of `model.SelectColumn` type
instead of `string`. I didn't expect that, but that makes sense. This
string often looked like this:
```
query.FromClause = fmt.Sprintf(
	"(SELECT %s, ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s %s) AS %s FROM %s WHERE %s)",
		fieldsAsString, partitionBy,
		strconv.Quote(metricsAggr.SortBy), metricsAggr.Order,
		model.RowNumberColumnName, query.FromClause, b.whereBuilder.WhereClauseAsString(),
)
```
so it's probably a good idea to make it typed as well. I think that
maybe using `aexp.AExp` as its type might be a bit more natural, than
`model.SelectColumn`, but even if you agree, that's a small not very
important change that can be done in another PR.

---------

Co-authored-by: Jacek Migdal <[email protected]>
  • Loading branch information
trzysiek and jakozaur authored Jun 7, 2024
1 parent 2eeb5c7 commit 905f4b3
Show file tree
Hide file tree
Showing 31 changed files with 668 additions and 609 deletions.
16 changes: 7 additions & 9 deletions quesma/clickhouse/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"mitmproxy/quesma/concurrent"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/types"
"mitmproxy/quesma/util"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -204,8 +205,7 @@ func TestProcessInsertQuery(t *testing.T) {
for index2, config := range configs {
for index3, lm := range logManagers(config) {
t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], logManager["+strconv.Itoa(index3)+"]", func(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
db, mock := util.InitSqlMockWithPrettyPrint(t)
lm.lm.chDb = db
defer db.Close()

Expand All @@ -224,7 +224,7 @@ func TestProcessInsertQuery(t *testing.T) {
mock.ExpectExec(expectedInserts[2*index1+1]).WillReturnResult(sqlmock.NewResult(1, 1))
}

err = lm.lm.ProcessInsertQuery(ctx, tableName, []types.JSON{types.MustJSON(tt.insertJson)})
err := lm.lm.ProcessInsertQuery(ctx, tableName, []types.JSON{types.MustJSON(tt.insertJson)})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand All @@ -247,16 +247,15 @@ func TestInsertVeryBigIntegers(t *testing.T) {
// big integer as a schema field
for i, bigInt := range bigInts {
t.Run("big integer schema field: "+bigInt, func(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
db, mock := util.InitSqlMockWithPrettyPrint(t)
lm := NewLogManagerEmpty()
lm.chDb = db
defer db.Close()

mock.ExpectExec(`CREATE TABLE IF NOT EXISTS "` + tableName).WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(expectedInsertJsons[i]).WillReturnResult(sqlmock.NewResult(0, 0))

err = lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))})
err := lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand All @@ -274,8 +273,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {

for i, bigInt := range bigInts {
t.Run("big integer attribute field: "+bigInt, func(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
db, mock := util.InitSqlMockWithPrettyPrint(t)
lm := NewLogManagerEmpty()
lm.chDb = db
var ptr = atomic.Pointer[TableMap]{}
Expand All @@ -288,7 +286,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {

bigIntAsInt, _ := strconv.ParseInt(bigInt, 10, 64)
fmt.Printf(`{"severity":"sev","int": %d}\n`, bigIntAsInt)
err = lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt))})
err := lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt))})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down
7 changes: 7 additions & 0 deletions quesma/clickhouse/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ func (t *Table) GetDateTimeType(ctx context.Context, fieldName string) DateTimeT
return Invalid
}

func (t *Table) GetDateTimeTypeFromSelectColumn(ctx context.Context, col model.SelectColumn) DateTimeType {
if exp, ok := col.Expression.(aexp.TableColumnExp); ok {
return t.GetDateTimeType(ctx, exp.ColumnName)
}
return Invalid
}

// applyIndexConfig applies full text search and alias configuration to the table
func (t *Table) applyIndexConfig(configuration config.QuesmaConfiguration) {
for _, c := range t.Cols {
Expand Down
22 changes: 17 additions & 5 deletions quesma/clickhouse/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryparser/aexp"
"strings"
"time"
)
Expand Down Expand Up @@ -112,15 +114,25 @@ func PrettyJson(jsonStr string) string {
// TimestampGroupBy returns string to be used in the select part of Clickhouse query, when grouping by timestamp interval.
// e.g.
// - timestampGroupBy("@timestamp", DateTime64, 30 seconds) --> toInt64(toUnixTimestamp64Milli(`@timestamp`)/30000)
// - timestampGroupBy("@timestamp", DateTime, 30 seconds) --> toInt64(toUnixTimestamp(`@timestamp`)/30.0)
func TimestampGroupBy(timestampFieldName string, typ DateTimeType, groupByInterval time.Duration) string {
// - timestampGroupBy("@timestamp", DateTime, 30 seconds) --> toInt64(toUnixTimestamp(`@timestamp`)/30)
func TimestampGroupBy(timestampField model.SelectColumn, typ DateTimeType, groupByInterval time.Duration) aexp.AExp {

createAExp := func(innerFuncName string, interval int64) aexp.AExp {
return aexp.Function("toInt64", aexp.NewComposite(
aexp.Function(innerFuncName, timestampField.Expression),
aexp.String("/"),
aexp.Literal(interval),
))
}

switch typ {
case DateTime64:
return fmt.Sprintf("toInt64(toUnixTimestamp64Milli(`%s`)/%d)", timestampFieldName, groupByInterval.Milliseconds())
// as string: fmt.Sprintf("toInt64(toUnixTimestamp(`%s`)/%f)", timestampFieldName, groupByInterval.Seconds())
return createAExp("toUnixTimestamp64Milli", groupByInterval.Milliseconds())
case DateTime:
return fmt.Sprintf("toInt64(toUnixTimestamp(`%s`)/%f)", timestampFieldName, groupByInterval.Seconds())
return createAExp("toUnixTimestamp", groupByInterval.Milliseconds()/1000)
default:
logger.Error().Msgf("invalid timestamp fieldname: %s", timestampFieldName)
return "invalid"
return aexp.Literal("invalid") // maybe create new type InvalidExpr?
}
}
43 changes: 25 additions & 18 deletions quesma/model/bucket_aggregations/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryparser/aexp"
wc "mitmproxy/quesma/queryparser/where_clause"
"strconv"
"strings"
Expand All @@ -30,28 +31,34 @@ func (interval Interval) String() string {
}

// ToSQLSelectQuery returns count(...) where ... is a condition for the interval, just like we want it in SQL's SELECT
func (interval Interval) ToSQLSelectQuery(quotedFieldName string) string {
var sqlLeft, sqlRight, sql string
func (interval Interval) ToSQLSelectQuery(col model.SelectColumn) model.SelectColumn {
var sqlLeft, sqlRight, sql aexp.AExp
if !interval.IsOpeningBoundInfinite() {
sqlLeft = quotedFieldName + ">=" + strconv.FormatFloat(interval.Begin, 'f', -1, 64)
sqlLeft = aexp.Infix(col.Expression, ">=", aexp.Literal(interval.Begin))
}
if !interval.IsClosingBoundInfinite() {
sqlRight = quotedFieldName + "<" + strconv.FormatFloat(interval.End, 'f', -1, 64)
sqlRight = aexp.Infix(col.Expression, "<", aexp.Literal(interval.End))
}
switch {
case sqlLeft != "" && sqlRight != "":
sql = sqlLeft + " AND " + sqlRight
case sqlLeft != "":
case sqlLeft != nil && sqlRight != nil:
sql = aexp.Infix(sqlLeft, "AND", sqlRight)
case sqlLeft != nil:
sql = sqlLeft
case sqlRight != "":
case sqlRight != nil:
sql = sqlRight
default:
return "count()"
return model.SelectColumn{Expression: aexp.Function("count")}
}
return "count(if(" + sql + ", 1, NULL))"
// count(if(sql, 1, NULL))
return model.SelectColumn{Expression: aexp.Function("count", aexp.Function("if", sql, aexp.Literal(1), aexp.String("NULL")))}
}

func (interval Interval) ToWhereClause(fieldName string) wc.Statement { // returns a condition for the interval, just like we want it in SQL's WHERE
func (interval Interval) ToWhereClause(field model.SelectColumn) wc.Statement { // returns a condition for the interval, just like we want it in SQL's WHERE
fieldName := field.SQL() // TODO a) this should be improved b) unify SelectColumn and ColumnRef?
if unquoted, err := strconv.Unquote(fieldName); err == nil {
fieldName = unquoted
}

var sqlLeft, sqlRight wc.Statement
if !interval.IsOpeningBoundInfinite() {
sqlLeft = wc.NewInfixOp(wc.NewColumnRef(fieldName), ">=", wc.NewLiteral(strconv.FormatFloat(interval.Begin, 'f', -1, 64)))
Expand Down Expand Up @@ -100,20 +107,20 @@ func (interval Interval) floatToString(number float64) string {
}

type Range struct {
ctx context.Context
QuotedFieldName string
Intervals []Interval
ctx context.Context
Col model.SelectColumn
Intervals []Interval
// defines what response should look like
// https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-range-aggregation.html#_keyed_response_4
Keyed bool
}

func NewRange(ctx context.Context, quotedFieldName string, intervals []Interval, keyed bool) Range {
return Range{ctx, quotedFieldName, intervals, keyed}
func NewRange(ctx context.Context, col model.SelectColumn, intervals []Interval, keyed bool) Range {
return Range{ctx, col, intervals, keyed}
}

func NewRangeWithDefaultKeyed(ctx context.Context, quotedFieldName string, intervals []Interval) Range {
return Range{ctx, quotedFieldName, intervals, keyedDefaultValue}
func NewRangeWithDefaultKeyed(ctx context.Context, col model.SelectColumn, intervals []Interval) Range {
return Range{ctx, col, intervals, keyedDefaultValue}
}

func (query Range) IsBucketAggregation() bool {
Expand Down
61 changes: 57 additions & 4 deletions quesma/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ type (
Columns []SelectColumn // Columns to select, including aliases
GroupBy []SelectColumn // if not empty, we do GROUP BY GroupBy...
OrderBy []SelectColumn // if not empty, we do ORDER BY OrderBy...
FromClause SelectColumn // usually just "tableName", or databaseName."tableName". Sometimes a subquery e.g. (SELECT ...)
WhereClause where_clause.Statement // "WHERE ..." until next clause like GROUP BY/ORDER BY, etc.
Limit int // LIMIT clause, noLimit (0) means no limit

FromClause string // usually just "tableName", or databaseName."tableName". Sometimes a subquery e.g. (SELECT ...)
CanParse bool // true <=> query is valid
CanParse bool // true <=> query is valid

// Eventually we should merge this two
QueryInfoType SearchQueryType
Type QueryType
TableName string

Highlighter Highlighter
NoDBQuery bool // true <=> we don't need query to DB here, true in some pipeline aggregations
Expand Down Expand Up @@ -95,6 +96,14 @@ func NewSortByCountColumn(desc bool) SelectColumn {
return SelectColumn{Expression: aexp.NewComposite(aexp.Count(), aexp.String(order))}
}

func NewSelectColumnTableField(fieldName string) SelectColumn {
return SelectColumn{Expression: aexp.TableColumn(fieldName)}
}

func NewSelectColumnString(s string) SelectColumn {
return SelectColumn{Expression: aexp.StringExp{Value: s}}
}

func (c SelectColumn) SQL() string {

if c.Expression == nil {
Expand Down Expand Up @@ -126,7 +135,6 @@ var NoMetadataField JsonMap = nil

// returns string with SQL query
func (q *Query) String(ctx context.Context) string {

var sb strings.Builder
sb.WriteString("SELECT ")
if q.IsDistinct {
Expand All @@ -147,7 +155,7 @@ func (q *Query) String(ctx context.Context) string {
sb.WriteString(strings.Join(columns, ", "))

sb.WriteString(" FROM ")
sb.WriteString(q.FromClause)
sb.WriteString(q.FromClause.SQL())

if q.WhereClause != nil {
sb.WriteString(" WHERE ")
Expand Down Expand Up @@ -279,6 +287,51 @@ func (q *Query) ApplyAliases(cfg map[string]config.IndexConfiguration, resolvedT
}
}

// TODO change whereClause type string -> some typed
func (q *Query) NewSelectColumnSubselectWithRowNumber(selectFields []SelectColumn, groupByFields []SelectColumn,
whereClause string, orderByField string, orderByDesc bool) SelectColumn {

const additionalArrayLength = 6
/* used to be as string:
fromSelect := fmt.Sprintf(
"(SELECT %s, ROW_NUMBER() OVER (PARTITION BY %s ORDER BY %s %s) AS %s FROM %s WHERE %s)",
fieldsAsString, fieldsAsString, orderField, asc/desc,
model.RowNumberColumnName, query.FromClause, b.whereBuilder.WhereClauseAsString(),
)
*/

fromSelect := make([]aexp.AExp, 0, 2*(len(selectFields)+len(groupByFields))+additionalArrayLength) // +6 without ORDER BY, +8 with ORDER BY
fromSelect = append(fromSelect, aexp.String("SELECT"))
for _, field := range selectFields {
fromSelect = append(fromSelect, field.Expression)
fromSelect = append(fromSelect, aexp.String(","))
}

// Maybe keep this ROW_NUMBER as SelectColumn? It'd introduce some problems, because it's not in schema.
// Sticking to simpler solution now.
fromSelect = append(fromSelect, aexp.String("ROW_NUMBER() OVER (PARTITION BY"))
for i, field := range groupByFields {
fromSelect = append(fromSelect, field.Expression)
if i != len(groupByFields)-1 {
fromSelect = append(fromSelect, aexp.String(","))
}
}
if orderByField != "" {
fromSelect = append(fromSelect, aexp.String("ORDER BY"))
fromSelect = append(fromSelect, NewSortColumn(orderByField, orderByDesc).Expression)
}
fromSelect = append(fromSelect, aexp.String(") AS"))
fromSelect = append(fromSelect, aexp.Literal(RowNumberColumnName))
fromSelect = append(fromSelect, aexp.String("FROM"))
fromSelect = append(fromSelect, q.FromClause.Expression)

if whereClause != "" {
fromSelect = append(fromSelect, aexp.String("WHERE "+whereClause))
}

return SelectColumn{Expression: aexp.Function("", aexp.NewComposite(fromSelect...))}
}

// Aggregator is always initialized as "empty", so with SplitOverHowManyFields == 0, Keyed == false, Filters == false.
// It's updated after construction, during further processing of aggregations.
type Aggregator struct {
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/aexp/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type AExpVisitor interface {
VisitFunction(e FunctionExp) interface{}
VisitMultiFunction(e MultiFunctionExp) interface{}
VisitLiteral(l LiteralExp) interface{}
VisitString(l StringExp) interface{}
VisitString(e StringExp) interface{}
VisitComposite(e CompositeExp) interface{}
VisitInfix(e InfixExp) interface{}
VisitSQL(s SQL) interface{}
Expand Down
Loading

0 comments on commit 905f4b3

Please sign in to comment.