Skip to content

Commit

Permalink
Remove quesma/query_processor (move to quesma/model) (#1026)
Browse files Browse the repository at this point in the history
Single-file module which was depending only on `model`

Related: #1017
  • Loading branch information
mieciu authored Nov 21, 2024
1 parent 642dee3 commit 8e18e9e
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 15 deletions.
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/average_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"quesma/logger"
"quesma/model"
"quesma/queryprocessor"
"quesma/util"
)

Expand All @@ -32,7 +31,7 @@ func (query AverageBucket) CalculateResultWhenMissing(parentRows []model.QueryRe
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := queryprocessor.NewQueryProcessor(query.ctx)
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/max_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"quesma/logger"
"quesma/model"
"quesma/queryprocessor"
"quesma/util"
)

Expand Down Expand Up @@ -47,7 +46,7 @@ func (query MaxBucket) CalculateResultWhenMissing(parentRows []model.QueryResult
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := queryprocessor.NewQueryProcessor(query.ctx)
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/min_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"quesma/logger"
"quesma/model"
"quesma/queryprocessor"
"quesma/util"
)

Expand Down Expand Up @@ -43,7 +42,7 @@ func (query MinBucket) CalculateResultWhenMissing(parentRows []model.QueryResult
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := queryprocessor.NewQueryProcessor(query.ctx)
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
Expand Down
3 changes: 1 addition & 2 deletions quesma/model/pipeline_aggregations/sum_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"quesma/logger"
"quesma/model"
"quesma/queryprocessor"
"quesma/util"
"time"
)
Expand Down Expand Up @@ -44,7 +43,7 @@ func (query SumBucket) CalculateResultWhenMissing(parentRows []model.QueryResult
if len(parentRows) == 0 {
return resultRows // maybe null?
}
qp := queryprocessor.NewQueryProcessor(query.ctx)
qp := model.NewQueryProcessor(query.ctx)
parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value]
// in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols
// so we need to split into buckets based on parent_cols
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package queryprocessor
package model

import (
"context"
"quesma/model"
"reflect"
)

Expand All @@ -17,7 +16,7 @@ func NewQueryProcessor(ctx context.Context) QueryProcessor {
}

// Returns if row1 and row2 have the same values for the first level fields
func (qp *QueryProcessor) sameGroupByFields(row1, row2 model.QueryResultRow, level int) bool {
func (qp *QueryProcessor) sameGroupByFields(row1, row2 QueryResultRow, level int) bool {

isArray := func(val interface{}) bool {
if val == nil {
Expand Down Expand Up @@ -49,18 +48,18 @@ func (qp *QueryProcessor) sameGroupByFields(row1, row2 model.QueryResultRow, lev
// Splits ResultSet into buckets, based on the first level fields
// E.g. if level == 0, we split into buckets based on the first field,
// e.g. [row(1, ...), row(1, ...), row(2, ...), row(2, ...), row(3, ...)] -> [[row(1, ...), row(1, ...)], [row(2, ...), row(2, ...)], [row(3, ...)]]
func (qp *QueryProcessor) SplitResultSetIntoBuckets(ResultSet []model.QueryResultRow, level int) [][]model.QueryResultRow {
func (qp *QueryProcessor) SplitResultSetIntoBuckets(ResultSet []QueryResultRow, level int) [][]QueryResultRow {
if len(ResultSet) == 0 {
return [][]model.QueryResultRow{{}}
return [][]QueryResultRow{{}}
}

lastRow := ResultSet[0]
buckets := [][]model.QueryResultRow{{lastRow}}
buckets := [][]QueryResultRow{{lastRow}}
for _, row := range ResultSet[1:] {
if qp.sameGroupByFields(row, lastRow, level) {
buckets[len(buckets)-1] = append(buckets[len(buckets)-1], row)
} else {
buckets = append(buckets, []model.QueryResultRow{row})
buckets = append(buckets, []QueryResultRow{row})
}
lastRow = row
}
Expand Down

0 comments on commit 8e18e9e

Please sign in to comment.