Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve (date)_range just like filters #970

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions quesma/queryparser/pancake_sql_query_generation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func TestPancakeQueryGeneration(t *testing.T) {
if filters(test.TestName) {
t.Skip("Fix filters")
}
if test.TestName == "complex sum_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Sum Bucket (Bucket: Date Histogram, Metric: Average), Buckets: X-Asis: Histogram(file:opensearch-visualize/pipeline_agg_req,nr:22)" {
t.Skip("error: filter(s)/range/dataRange aggregation must be the last bucket aggregation")

if test.TestName == "Line, Y-axis: Min, Buckets: Date Range, X-Axis: Terms, Split Chart: Date Histogram(file:kibana-visualize/agg_req,nr:9)" {
t.Skip("Date range is broken, fix in progress (PR #971)")
Comment on lines +58 to +59
Copy link
Member Author

@trzysiek trzysiek Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a new test that fails. Unskipping another one above/below as this PR fixes it.

}

if test.TestName == "Terms with order by top metrics(file:kibana-visualize/agg_req,nr:8)" {
Expand All @@ -67,10 +68,6 @@ func TestPancakeQueryGeneration(t *testing.T) {
t.Skip("Was skipped before. Wrong key in max_bucket, should be an easy fix")
}

if test.TestName == "complex sum_bucket. Reproduce: Visualize -> Vertical Bar: Metrics: Sum Bucket (Bucket: Date Histogram, Metric: Average), Buckets: X-Asis: Histogram(file:opensearch-visualize/pipeline_agg_req,nr:24)" {
t.Skip("Was skipped before, no expected results")
}

// TODO: add test for filter(s) both at the beginning and end of aggregation tree

fmt.Println("i:", i, "test:", test.TestName)
Expand Down
41 changes: 27 additions & 14 deletions quesma/queryparser/pancake_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,48 +408,61 @@ func (a *pancakeTransformer) aggregationTreeToPancakes(topLevel pancakeAggregati
// TODO: if both top_hits/top_metrics, and filters, it probably won't work...
// Care: order of these two functions is unfortunately important.
// Should be fixed after this TODO
newFiltersPancakes := a.createFiltersPancakes(&newPancake)
newCombinatorPancakes := a.createCombinatorPancakes(&newPancake)
additionalTopHitPancakes, err := a.createTopHitAndTopMetricsPancakes(&newPancake)
if err != nil {
return nil, err
}

pancakeResults = append(pancakeResults, additionalTopHitPancakes...)
pancakeResults = append(pancakeResults, newFiltersPancakes...)
pancakeResults = append(pancakeResults, newCombinatorPancakes...)
}

return
}

// createFiltersPancakes only does something, if first layer aggregation is Filters.
// It creates new pancakes for each filter in that aggregation, and updates `pancake` to have only first filter.
func (a *pancakeTransformer) createFiltersPancakes(pancake *pancakeModel) (newPancakes []*pancakeModel) {
func (a *pancakeTransformer) createCombinatorPancakes(pancake *pancakeModel) (newPancakes []*pancakeModel) {
if len(pancake.layers) == 0 || pancake.layers[0].nextBucketAggregation == nil {
return
}

firstLayer := pancake.layers[0]
filters, isFilters := firstLayer.nextBucketAggregation.queryType.(bucket_aggregations.Filters)
canSimplyAddFilterToWhereClause := len(firstLayer.currentMetricAggregations) == 0 && len(firstLayer.currentPipelineAggregations) == 0
areNewPancakesReallyNeeded := len(pancake.layers) > 1 // if there is only one layer, it's better to get it done with combinators.
combinator, isCombinator := firstLayer.nextBucketAggregation.queryType.(bucket_aggregations.CombinatorAggregationInterface)
if !isCombinator {
return
}

noMoreBucket := len(pancake.layers) <= 1 || (len(pancake.layers) == 2 && pancake.layers[1].nextBucketAggregation == nil)
noMetricOnFirstLayer := len(firstLayer.currentMetricAggregations) == 0 && len(firstLayer.currentPipelineAggregations) == 0
canSimplyAddCombinatorToWhereClause := noMoreBucket && noMetricOnFirstLayer
if canSimplyAddCombinatorToWhereClause {
return
}

Comment on lines +433 to 443
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think code should be a bit clearer now? Feel free to disagree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though not sure if that's always correct if we count size. Though it might be.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we add size as metric aggregation, so this should be good.

if !isFilters || !canSimplyAddFilterToWhereClause || !areNewPancakesReallyNeeded || len(filters.Filters) == 0 {
areNewPancakesReallyNeeded := len(pancake.layers) > 1 // if there is only one layer above combinator, it easily can be done with 1 pancake, no need for more
groups := combinator.CombinatorGroups()
if !areNewPancakesReallyNeeded || len(groups) == 0 {
return
}

// First create N-1 new pancakes, each with different filter
for i := 1; i < len(filters.Filters); i++ {
combinatorSplit := combinator.CombinatorSplit()
combinatorGroups := combinator.CombinatorGroups()
// First create N-1 new pancakes [1...N), each with different filter
// (important to update the first (0th) pancake at the end)
for i := 1; i < len(groups); i++ {
newPancake := pancake.Clone()
bucketAggr := newPancake.layers[0].nextBucketAggregation.ShallowClone()
bucketAggr.queryType = filters.NewFiltersSingleFilter(i)
bucketAggr.queryType = combinatorSplit[i]
newPancake.layers[0] = newPancakeModelLayer(&bucketAggr)
newPancake.whereClause = model.And([]model.Expr{newPancake.whereClause, filters.Filters[i].Sql.WhereClause})
newPancake.whereClause = model.And([]model.Expr{newPancake.whereClause, combinatorGroups[i].WhereClause})
newPancakes = append(newPancakes, newPancake)
}

// Then update original to have 1 filter as well
pancake.layers[0].nextBucketAggregation.queryType = filters.NewFiltersSingleFilter(0)
pancake.whereClause = model.And([]model.Expr{pancake.whereClause, filters.Filters[0].Sql.WhereClause})
// Update original
pancake.layers[0].nextBucketAggregation.queryType = combinatorSplit[0]
pancake.whereClause = model.And([]model.Expr{pancake.whereClause, combinatorGroups[0].WhereClause})

return
}
Loading
Loading