Skip to content

Commit

Permalink
Enable/disable optimizer via configuration (#530)
Browse files Browse the repository at this point in the history
We may enable/disable each optimizing transformer on global or index
scope.

Here is a configuration snippet that enables/disables existing
transformers:
```
# enable on the global scope
optimizers:
  cache_group_by_queries: true
  truncate_date: true

# enable on index scope
indexes:
  some-index:
    enabled: true
    optimizers:
       cache_group_by_queries: true
       truncate_date: true
```
  • Loading branch information
nablaone authored Jul 15, 2024
1 parent d77d241 commit 898cd02
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 18 deletions.
11 changes: 10 additions & 1 deletion quesma/optimize/cache_group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ import "quesma/model"
type cacheGroupByQueries struct {
}

func (s *cacheGroupByQueries) Name() string {
return "cache_group_by_queries"
}

func (s *cacheGroupByQueries) IsEnabledByDefault() bool {
// this transformer can use a lot of memory on database side
return false
}

func (s *cacheGroupByQueries) Transform(queries []*model.Query) ([]*model.Query, error) {

for _, query := range queries {
Expand All @@ -30,7 +39,7 @@ func (s *cacheGroupByQueries) Transform(queries []*model.Query) ([]*model.Query,
// TODO add CTE here
if len(query.SelectCommand.GroupBy) > 0 {
query.OptimizeHints.Settings["use_query_cache"] = true
query.OptimizeHints.OptimizationsPerformed = append(query.OptimizeHints.OptimizationsPerformed, "cacheGroupByQueries")
query.OptimizeHints.OptimizationsPerformed = append(query.OptimizeHints.OptimizationsPerformed, s.Name())
}
}
return queries, nil
Expand Down
55 changes: 52 additions & 3 deletions quesma/optimize/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,68 @@ package optimize
import (
"quesma/model"
"quesma/plugins"
"quesma/quesma/config"
"time"
)

// OptimizeTransformer - an interface for query transformers that have a name.
type OptimizeTransformer interface {
plugins.QueryTransformer
Name() string // this name is used to enable/disable the transformer in the configuration
IsEnabledByDefault() bool // should return true for "not aggressive" transformers only
}

// OptimizePipeline - a transformer that optimizes queries
type OptimizePipeline struct {
optimizations []plugins.QueryTransformer
config config.QuesmaConfiguration
optimizations []OptimizeTransformer
}

func NewOptimizePipeline() plugins.QueryTransformer {
func NewOptimizePipeline(config config.QuesmaConfiguration) plugins.QueryTransformer {

return &OptimizePipeline{
optimizations: []plugins.QueryTransformer{
config: config,
optimizations: []OptimizeTransformer{
&truncateDate{truncateTo: 5 * time.Minute},
&cacheGroupByQueries{},
},
}
}

func (s *OptimizePipeline) getIndexName(queries []*model.Query) string {

// HACK - this is a temporary solution
// We should have struct:
// sth like this:
// type ExecutionPlan struct {
// IndexName string
// Queries []*model.Query
// ...
// }

return queries[0].TableName
}

func (s *OptimizePipeline) isEnabledFor(transformer OptimizeTransformer, queries []*model.Query) bool {

indexName := s.getIndexName(queries)

// first we check index specific settings
if indexCfg, ok := s.config.IndexConfig[indexName]; ok {
if enabled, ok := indexCfg.EnabledOptimizers[transformer.Name()]; ok {
return enabled
}
}

// then we check global settings
if enabled, ok := s.config.EnabledOptimizers[transformer.Name()]; ok {
return enabled
}

// default is not enabled
return transformer.IsEnabledByDefault()
}

func (s *OptimizePipeline) Transform(queries []*model.Query) ([]*model.Query, error) {

// add hints if not present
Expand All @@ -34,6 +78,11 @@ func (s *OptimizePipeline) Transform(queries []*model.Query) ([]*model.Query, er

// run optimizations on queries
for _, optimization := range s.optimizations {

if !s.isEnabledFor(optimization, queries) {
continue
}

var err error
queries, err = optimization.Transform(queries)
if err != nil {
Expand Down
47 changes: 42 additions & 5 deletions quesma/optimize/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/stretchr/testify/assert"
"quesma/model"
"quesma/quesma/config"
"testing"
)

Expand Down Expand Up @@ -37,6 +38,10 @@ func Test_cacheGroupBy(t *testing.T) {
// Add CTE here
}

cfg := config.QuesmaConfiguration{}
cfg.EnabledOptimizers = make(config.OptimizersConfiguration)
cfg.EnabledOptimizers["cache_group_by_queries"] = true

for _, tt := range tests {

t.Run(tt.name, func(t *testing.T) {
Expand All @@ -46,7 +51,7 @@ func Test_cacheGroupBy(t *testing.T) {
SelectCommand: tt.query,
},
}
pipeline := NewOptimizePipeline()
pipeline := NewOptimizePipeline(cfg)
optimized, err := pipeline.Transform(queries)
if err != nil {
t.Fatalf("error optimizing query: %v", err)
Expand Down Expand Up @@ -91,12 +96,14 @@ func Test_dateTrunc(t *testing.T) {
}

tests := []struct {
name string
query model.SelectCommand
expected model.SelectCommand
name string
tableName string
query model.SelectCommand
expected model.SelectCommand
}{
{
"select all",
"foo",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
Expand All @@ -109,6 +116,7 @@ func Test_dateTrunc(t *testing.T) {

{
"select all where date ",
"foo",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
Expand All @@ -123,6 +131,7 @@ func Test_dateTrunc(t *testing.T) {

{
"select all where and between dates (>24h)",
"foo",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
Expand All @@ -137,6 +146,7 @@ func Test_dateTrunc(t *testing.T) {

{
"select all where and between dates (<24h)",
"foo",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo"),
Expand All @@ -151,6 +161,7 @@ func Test_dateTrunc(t *testing.T) {

{
"select a, count() from foo group by 1",
"foo",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("a"), model.NewFunction("count", model.NewColumnRef("*"))},
FromClause: model.NewTableRef("foo"),
Expand All @@ -162,19 +173,45 @@ func Test_dateTrunc(t *testing.T) {
GroupBy: []model.Expr{model.NewLiteral(1)},
},
},
{
"select all where and between dates (>24h), disabled index ",
"foo2",
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo2"),
WhereClause: and(gt(col("a"), date("2024-01-06T10:08:53.675Z")), lt(col("a"), date("2024-06-06T13:10:53.675Z"))),
},
model.SelectCommand{
Columns: []model.Expr{model.NewColumnRef("*")},
FromClause: model.NewTableRef("foo2"),
WhereClause: and(gt(col("a"), date("2024-01-06T10:08:53.675Z")), lt(col("a"), date("2024-06-06T13:10:53.675Z"))),
},
},
// Add CTE here
}

cfg := config.QuesmaConfiguration{}
cfg.EnabledOptimizers = make(config.OptimizersConfiguration)
cfg.EnabledOptimizers["truncate_date"] = false

cfg.IndexConfig = make(map[string]config.IndexConfiguration)
cfg.IndexConfig["foo"] = config.IndexConfiguration{
EnabledOptimizers: config.OptimizersConfiguration{
"truncate_date": true,
},
}

for _, tt := range tests {

t.Run(tt.name, func(t *testing.T) {

queries := []*model.Query{
{
TableName: tt.tableName,
SelectCommand: tt.query,
},
}
pipeline := NewOptimizePipeline()
pipeline := NewOptimizePipeline(cfg)
optimized, err := pipeline.Transform(queries)

if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion quesma/optimize/trunc_date.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@ type truncateDate struct {
truncateTo time.Duration
}

func (s *truncateDate) Name() string {
return "truncate_date"
}

func (s *truncateDate) IsEnabledByDefault() bool {
// This optimization is not enabled by default.
// Tt returns different results than the original query
// So it should be used with caution
return false
}

func (s *truncateDate) Transform(queries []*model.Query) ([]*model.Query, error) {

for k, query := range queries {
Expand All @@ -277,7 +288,7 @@ func (s *truncateDate) Transform(queries []*model.Query) ([]*model.Query, error)
// this is just in case if there was no truncation, we keep the original query
if visitor.truncated && result != nil {
queries[k].SelectCommand = *result
query.OptimizeHints.OptimizationsPerformed = append(query.OptimizeHints.OptimizationsPerformed, "truncateDate")
query.OptimizeHints.OptimizationsPerformed = append(query.OptimizeHints.OptimizationsPerformed, s.Name())
}
}
return queries, nil
Expand Down
37 changes: 36 additions & 1 deletion quesma/quesma/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type QuesmaConfiguration struct {
PublicTcpPort network.Port `koanf:"port"`
IngestStatistics bool `koanf:"ingestStatistics"`
QuesmaInternalTelemetryUrl *Url `koanf:"internalTelemetryUrl"`
EnabledOptimizers OptimizersConfiguration `koanf:"optimizers"`
}

type LoggingConfiguration struct {
Expand All @@ -63,6 +64,8 @@ type RelationalDbConfiguration struct {
AdminUrl *Url `koanf:"adminUrl"`
}

type OptimizersConfiguration map[string]bool

func (c *RelationalDbConfiguration) IsEmpty() bool {
return c != nil && c.Url == nil && c.User == "" && c.Password == "" && c.Database == ""
}
Expand Down Expand Up @@ -233,6 +236,36 @@ func (c *QuesmaConfiguration) WritesToElasticsearch() bool {
return c.Mode != ClickHouse
}

func (c *QuesmaConfiguration) optimizersConfigAsString(s string, cfg OptimizersConfiguration) string {

var lines []string

lines = append(lines, fmt.Sprintf(" %s:", s))
for k, v := range cfg {
lines = append(lines, fmt.Sprintf(" %s: %v", k, v))
}

return strings.Join(lines, "\n")
}

func (c *QuesmaConfiguration) OptimizersConfigAsString() string {

var lines []string

lines = append(lines, "\n")

lines = append(lines, c.optimizersConfigAsString("Global", c.EnabledOptimizers))

for indexName, indexConfig := range c.IndexConfig {
if indexConfig.EnabledOptimizers != nil && len(indexConfig.EnabledOptimizers) > 0 {
lines = append(lines, c.optimizersConfigAsString(indexName, indexConfig.EnabledOptimizers))
}
}

lines = append(lines, "\n")
return strings.Join(lines, "\n")
}

func (c *QuesmaConfiguration) String() string {
var indexConfigs string
for _, idx := range c.IndexConfig {
Expand Down Expand Up @@ -297,7 +330,8 @@ Quesma Configuration:
Log Level: %v
Public TCP Port: %d
Ingest Statistics: %t,
Quesma Telemetry URL: %s`,
Quesma Telemetry URL: %s
Optimizers: %s`,
c.Mode.String(),
elasticUrl,
elasticsearchExtra,
Expand All @@ -311,6 +345,7 @@ Quesma Configuration:
c.PublicTcpPort,
c.IngestStatistics,
quesmaInternalTelemetryUrl,
c.OptimizersConfigAsString(),
)
}

Expand Down
3 changes: 2 additions & 1 deletion quesma/quesma/config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type IndexConfiguration struct {
TimestampField *string `koanf:"timestampField"`
// this is hidden from the user right now
// deprecated
SchemaConfiguration *SchemaConfiguration `koanf:"static-schema"`
SchemaConfiguration *SchemaConfiguration `koanf:"static-schema"`
EnabledOptimizers OptimizersConfiguration `koanf:"optimizers"`
}

func (c IndexConfiguration) HasFullTextField(fieldName string) bool {
Expand Down
5 changes: 1 addition & 4 deletions quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,8 @@ func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, logManager *clickhous

queryRunner.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral

// TODO this should be configurable somehow
//
// tests should not be run with optimization enabled by default
// TODO: Enable it in YAML
// queryRunner.EnableQueryOptimization()
queryRunner.EnableQueryOptimization(config)

router := configureRouter(config, schemaRegistry, logManager, quesmaManagementConsole, phoneHomeAgent, queryRunner)
return &Quesma{
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ type QueryRunner struct {
schemaRegistry schema.Registry
}

func (q *QueryRunner) EnableQueryOptimization() {
q.transformationPipeline.transformers = append(q.transformationPipeline.transformers, optimize.NewOptimizePipeline())
func (q *QueryRunner) EnableQueryOptimization(cfg config.QuesmaConfiguration) {
q.transformationPipeline.transformers = append(q.transformationPipeline.transformers, optimize.NewOptimizePipeline(cfg))
}

func NewQueryRunner(lm *clickhouse.LogManager, cfg config.QuesmaConfiguration, im elasticsearch.IndexManagement, qmc *ui.QuesmaManagementConsole, schemaRegistry schema.Registry) *QueryRunner {
Expand Down

0 comments on commit 898cd02

Please sign in to comment.