Skip to content

Commit

Permalink
Optimizer - Rewrite query using a materialized view (#540)
Browse files Browse the repository at this point in the history
This PR adds an optimizer that rewrites queries and replaces the
original table with a materialized view. The rewrite rule is taken from
the configuration.

Suppose we've got the following table:
```
CREATE some_index (
  `date` DateTime64(3),
   `type` String,
   ... 
)
ENGINE=MergeTree()
ORDER BY (`date`)
```

We've discovered our dashboard generates a lot of queries with common
expressions:
```
(type ILIKE '%foo%')
```

We may create a materialized view and query it instead of the original
table.
Notice that the materialized view is updated on INSERT, it must be
created before ingest.
```
-- a table that holds materialized view data 
CREATE table some_index_foo_dest (
  `date` DateTime64(3),
   `type` String,
   ... 
)

-- an index 

CREATE MATERIALIZED VIEW some_index_foo_mv to some_index_foo_dest AS
SELECT  * 
FROM some_index 
WHERE 
type ILIKE '%foo%'
```

Now we can configure the optimizer itself.
```
-- part of config.yaml
 some_index:
    enabled: true
    optimizers:
      materialized_view_replace:
        enabled: true
        properties:
          table: "some_index"
          condition: "(type ILIKE '%foo%')"
          view: "some_index_foo_mv"
```

So every query like
```
select count(*) from some_index where ((type ILIKE '%foo%') AND (`date` <= ..... AND `date` >= .....))
```
Will be rewritten to:
```
select count(*) from some_index where (TRUE AND (`date` <= ..... AND `date` >= .....))
```


Limitations:
- we can have only one rewrite rule per index/table
- a condition in the configuration must be the same as a `string`
representation of the replaced expression
- setup and configuration are  complex

---------

Signed-off-by: Rafał Strzaliński <[email protected]>
  • Loading branch information
nablaone authored Jul 18, 2024
1 parent 33b52e7 commit f70435e
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 34 deletions.
2 changes: 1 addition & 1 deletion quesma/optimize/cache_group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *cacheGroupByQueries) IsEnabledByDefault() bool {
return false
}

func (s *cacheGroupByQueries) Transform(queries []*model.Query) ([]*model.Query, error) {
func (s *cacheGroupByQueries) Transform(queries []*model.Query, properties map[string]string) ([]*model.Query, error) {

for _, query := range queries {

Expand Down
226 changes: 226 additions & 0 deletions quesma/optimize/materialized_view_replace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package optimize

import (
"quesma/logger"
"quesma/model"
"strings"
)

type materializedViewReplaceRule struct {
tableName string // table name that we want to replace
condition string // this is string representation of the condition that we want to replace
materializedView string // target
}

type materializedViewReplace struct {
}

// it checks if the WHERE clause is `AND` tree only
func (s *materializedViewReplace) validateWhere(expr model.Expr) bool {

var foundOR bool
var foundNOT bool

visitor := model.NewBaseVisitor()

visitor.OverrideVisitPrefixExpr = func(b *model.BaseExprVisitor, e model.PrefixExpr) interface{} {

if strings.ToUpper(e.Op) == "NOT" {
foundNOT = true
return e
}

b.VisitChildren(e.Args)
return e
}

visitor.OverrideVisitInfix = func(b *model.BaseExprVisitor, e model.InfixExpr) interface{} {
if strings.ToUpper(e.Op) == "OR" {
foundOR = true
return e
}
e.Left.Accept(b)
e.Right.Accept(b)
return e
}

expr.Accept(visitor)

if foundNOT {
return false
}

if foundOR {
return false
}

return true
}

func (s *materializedViewReplace) getTableName(tableName string) string {

res := strings.Replace(tableName, `"`, "", -1)
if strings.Contains(res, ".") {
parts := strings.Split(res, ".")
if len(parts) == 2 {
return parts[1]
}
}
return res
}

func (s *materializedViewReplace) matches(rule materializedViewReplaceRule, expr model.Expr) bool {
current := model.AsString(expr)
return rule.condition == current
}

func (s *materializedViewReplace) applyRule(rule materializedViewReplaceRule, expr model.Expr) (model.Expr, bool) {
if s.matches(rule, expr) {
return model.NewLiteral("TRUE"), true
}

return expr, false
}

func (s *materializedViewReplace) traverse(rule materializedViewReplaceRule, where model.Expr) (model.Expr, bool) {

var foundInNot bool
var replaced bool
var res model.Expr

visitor := model.NewBaseVisitor()

visitor.OverrideVisitInfix = func(b *model.BaseExprVisitor, e model.InfixExpr) interface{} {

// since we replace with "TRUE" we need to check if the operator is "AND"
if strings.ToUpper(e.Op) == "AND" {

left, leftReplaced := s.applyRule(rule, e.Left)
right, rightReplaced := s.applyRule(rule, e.Right)

if !leftReplaced {
left, leftReplaced = e.Left.Accept(b).(model.Expr)
}

if !rightReplaced {
right, rightReplaced = e.Right.Accept(b).(model.Expr)
}

if leftReplaced || rightReplaced {
replaced = true
}
return model.NewInfixExpr(left, e.Op, right)
}

return model.NewInfixExpr(e.Left.Accept(b).(model.Expr), e.Op, e.Right.Accept(b).(model.Expr))
}

res = where.Accept(visitor).(model.Expr)

if foundInNot {
return nil, false
}

return res, replaced
}

func (s *materializedViewReplace) replace(rule materializedViewReplaceRule, query model.SelectCommand) (*model.SelectCommand, bool) {

visitor := model.NewBaseVisitor()
var replaced bool

visitor.OverrideVisitSelectCommand = func(v *model.BaseExprVisitor, query model.SelectCommand) interface{} {

var ctes []*model.SelectCommand
if query.CTEs != nil {
ctes = make([]*model.SelectCommand, 0)
for _, cte := range query.CTEs {
ctes = append(ctes, cte.Accept(v).(*model.SelectCommand))
}
}

from := query.FromClause

if from != nil {
if table, ok := from.(model.TableRef); ok {

tableName := s.getTableName(table.Name) // todo: get table name from data

// if we match the table name
if rule.tableName == tableName { // config param

// we try to replace the where clause
newWhere, whereReplaced := s.applyRule(rule, query.WhereClause)

if !whereReplaced {
// if we have AND tree, we try to traverse it
if s.validateWhere(query.WhereClause) {
// here we try to traverse the whole tree
newWhere, whereReplaced = s.traverse(rule, query.WhereClause)
}
}

// if we replaced the where clause, we replace the from clause
if whereReplaced {
replaced = true
from = model.NewTableRef(rule.materializedView) // config param
return model.NewSelectCommand(query.Columns, query.GroupBy, query.OrderBy, from, newWhere, query.LimitBy, query.Limit, query.SampleLimit, query.IsDistinct, ctes)
}
}
} else {
from = query.FromClause.Accept(v).(model.Expr)
}
}

where := query.WhereClause.Accept(v).(model.Expr)

return model.NewSelectCommand(query.Columns, query.GroupBy, query.OrderBy, from, where, query.LimitBy, query.Limit, query.SampleLimit, query.IsDistinct, ctes)

}

newSelect := query.Accept(visitor).(*model.SelectCommand)

return newSelect, replaced
}

func (s *materializedViewReplace) readRule(properties map[string]string) materializedViewReplaceRule {
rule := materializedViewReplaceRule{
tableName: properties["table"],
condition: properties["condition"],
materializedView: properties["view"],
}
return rule
}

func (s *materializedViewReplace) Name() string {
return "materialized_view_replace"
}

func (s *materializedViewReplace) IsEnabledByDefault() bool {
return false
}

func (s *materializedViewReplace) Transform(queries []*model.Query, properties map[string]string) ([]*model.Query, error) {

//
// TODO add list of rules maybe
//
rule := s.readRule(properties)

for k, query := range queries {

result, replaced := s.replace(rule, query.SelectCommand)

// this is just in case if there was no truncation, we keep the original query
if result != nil && replaced {
logger.Info().Msgf(s.Name()+" triggered, input query: %s", query.SelectCommand.String())
logger.Info().Msgf(s.Name()+" triggered, output query: %s", (*result).String())

queries[k].SelectCommand = *result
query.OptimizeHints.OptimizationsPerformed = append(query.OptimizeHints.OptimizationsPerformed, s.Name())
}
}
return queries, nil
}
38 changes: 25 additions & 13 deletions quesma/optimize/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,18 @@
package optimize

import (
"fmt"
"quesma/model"
"quesma/plugins"
"quesma/quesma/config"
"strings"
"time"
)

// OptimizeTransformer - an interface for query transformers that have a name.
type OptimizeTransformer interface {
plugins.QueryTransformer
Transform(queries []*model.Query, properties map[string]string) ([]*model.Query, error)

Name() string // this name is used to enable/disable the transformer in the configuration
IsEnabledByDefault() bool // should return true for "not aggressive" transformers only
}
Expand All @@ -29,6 +32,7 @@ func NewOptimizePipeline(config config.QuesmaConfiguration) plugins.QueryTransfo
optimizations: []OptimizeTransformer{
&truncateDate{truncateTo: 5 * time.Minute},
&cacheGroupByQueries{},
&materializedViewReplace{},
},
}
}
Expand All @@ -44,27 +48,33 @@ func (s *OptimizePipeline) getIndexName(queries []*model.Query) string {
// ...
// }

return queries[0].TableName
// we assume here that table_name is the index name
tableName := queries[0].TableName
res := strings.Replace(tableName, `"`, "", -1)
if strings.Contains(res, ".") {
parts := strings.Split(res, ".")
if len(parts) == 2 {
return parts[1]
}
}
return res
}

func (s *OptimizePipeline) isEnabledFor(transformer OptimizeTransformer, queries []*model.Query) bool {
func (s *OptimizePipeline) findConfig(transformer OptimizeTransformer, queries []*model.Query) (bool, map[string]string) {

indexName := s.getIndexName(queries)

// first we check index specific settings
if indexCfg, ok := s.config.IndexConfig[indexName]; ok {
if enabled, ok := indexCfg.EnabledOptimizers[transformer.Name()]; ok {
return enabled
fmt.Println("Index specific settings found", indexName)
if optimizerCfg, ok := indexCfg.EnabledOptimizers[transformer.Name()]; ok {
fmt.Println("Optimizer specific settings found", transformer.Name(), optimizerCfg.Enabled)
return optimizerCfg.Enabled, optimizerCfg.Properties
}
}

// then we check global settings
if enabled, ok := s.config.EnabledOptimizers[transformer.Name()]; ok {
return enabled
}

// default is not enabled
return transformer.IsEnabledByDefault()
return transformer.IsEnabledByDefault(), make(map[string]string)
}

func (s *OptimizePipeline) Transform(queries []*model.Query) ([]*model.Query, error) {
Expand All @@ -79,12 +89,14 @@ func (s *OptimizePipeline) Transform(queries []*model.Query) ([]*model.Query, er
// run optimizations on queries
for _, optimization := range s.optimizations {

if !s.isEnabledFor(optimization, queries) {
enabled, properties := s.findConfig(optimization, queries)

if !enabled {
continue
}

var err error
queries, err = optimization.Transform(queries)
queries, err = optimization.Transform(queries, properties)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f70435e

Please sign in to comment.