diff --git a/quesma/model/pipeline_aggregations/average_bucket.go b/quesma/model/pipeline_aggregations/average_bucket.go index 832c15607..948c14b82 100644 --- a/quesma/model/pipeline_aggregations/average_bucket.go +++ b/quesma/model/pipeline_aggregations/average_bucket.go @@ -7,7 +7,6 @@ import ( "fmt" "quesma/logger" "quesma/model" - "quesma/queryprocessor" "quesma/util" ) @@ -32,7 +31,7 @@ func (query AverageBucket) CalculateResultWhenMissing(parentRows []model.QueryRe if len(parentRows) == 0 { return resultRows // maybe null? } - qp := queryprocessor.NewQueryProcessor(query.ctx) + qp := model.NewQueryProcessor(query.ctx) parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value] // in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols // so we need to split into buckets based on parent_cols diff --git a/quesma/model/pipeline_aggregations/max_bucket.go b/quesma/model/pipeline_aggregations/max_bucket.go index 0c4af6726..c8a2d587c 100644 --- a/quesma/model/pipeline_aggregations/max_bucket.go +++ b/quesma/model/pipeline_aggregations/max_bucket.go @@ -7,7 +7,6 @@ import ( "fmt" "quesma/logger" "quesma/model" - "quesma/queryprocessor" "quesma/util" ) @@ -47,7 +46,7 @@ func (query MaxBucket) CalculateResultWhenMissing(parentRows []model.QueryResult if len(parentRows) == 0 { return resultRows // maybe null? } - qp := queryprocessor.NewQueryProcessor(query.ctx) + qp := model.NewQueryProcessor(query.ctx) parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value] // in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols // so we need to split into buckets based on parent_cols diff --git a/quesma/model/pipeline_aggregations/min_bucket.go b/quesma/model/pipeline_aggregations/min_bucket.go index 200233394..43f5f471e 100644 --- a/quesma/model/pipeline_aggregations/min_bucket.go +++ b/quesma/model/pipeline_aggregations/min_bucket.go @@ -7,7 +7,6 @@ import ( "fmt" "quesma/logger" "quesma/model" - "quesma/queryprocessor" "quesma/util" ) @@ -43,7 +42,7 @@ func (query MinBucket) CalculateResultWhenMissing(parentRows []model.QueryResult if len(parentRows) == 0 { return resultRows // maybe null? } - qp := queryprocessor.NewQueryProcessor(query.ctx) + qp := model.NewQueryProcessor(query.ctx) parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value] // in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols // so we need to split into buckets based on parent_cols diff --git a/quesma/model/pipeline_aggregations/sum_bucket.go b/quesma/model/pipeline_aggregations/sum_bucket.go index 42be987d9..2bfafda05 100644 --- a/quesma/model/pipeline_aggregations/sum_bucket.go +++ b/quesma/model/pipeline_aggregations/sum_bucket.go @@ -7,7 +7,6 @@ import ( "fmt" "quesma/logger" "quesma/model" - "quesma/queryprocessor" "quesma/util" "time" ) @@ -44,7 +43,7 @@ func (query SumBucket) CalculateResultWhenMissing(parentRows []model.QueryResult if len(parentRows) == 0 { return resultRows // maybe null? } - qp := queryprocessor.NewQueryProcessor(query.ctx) + qp := model.NewQueryProcessor(query.ctx) parentFieldsCnt := len(parentRows[0].Cols) - 2 // -2, because row is [parent_cols..., current_key, current_value] // in calculateSingleAvgBucket we calculate avg all current_keys with the same parent_cols // so we need to split into buckets based on parent_cols diff --git a/quesma/queryprocessor/query_processor.go b/quesma/model/query_processor.go similarity index 82% rename from quesma/queryprocessor/query_processor.go rename to quesma/model/query_processor.go index 57f6a1f44..4b43edf64 100644 --- a/quesma/queryprocessor/query_processor.go +++ b/quesma/model/query_processor.go @@ -1,10 +1,9 @@ // Copyright Quesma, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -package queryprocessor +package model import ( "context" - "quesma/model" "reflect" ) @@ -17,7 +16,7 @@ func NewQueryProcessor(ctx context.Context) QueryProcessor { } // Returns if row1 and row2 have the same values for the first level fields -func (qp *QueryProcessor) sameGroupByFields(row1, row2 model.QueryResultRow, level int) bool { +func (qp *QueryProcessor) sameGroupByFields(row1, row2 QueryResultRow, level int) bool { isArray := func(val interface{}) bool { if val == nil { @@ -49,18 +48,18 @@ func (qp *QueryProcessor) sameGroupByFields(row1, row2 model.QueryResultRow, lev // Splits ResultSet into buckets, based on the first level fields // E.g. if level == 0, we split into buckets based on the first field, // e.g. [row(1, ...), row(1, ...), row(2, ...), row(2, ...), row(3, ...)] -> [[row(1, ...), row(1, ...)], [row(2, ...), row(2, ...)], [row(3, ...)]] -func (qp *QueryProcessor) SplitResultSetIntoBuckets(ResultSet []model.QueryResultRow, level int) [][]model.QueryResultRow { +func (qp *QueryProcessor) SplitResultSetIntoBuckets(ResultSet []QueryResultRow, level int) [][]QueryResultRow { if len(ResultSet) == 0 { - return [][]model.QueryResultRow{{}} + return [][]QueryResultRow{{}} } lastRow := ResultSet[0] - buckets := [][]model.QueryResultRow{{lastRow}} + buckets := [][]QueryResultRow{{lastRow}} for _, row := range ResultSet[1:] { if qp.sameGroupByFields(row, lastRow, level) { buckets[len(buckets)-1] = append(buckets[len(buckets)-1], row) } else { - buckets = append(buckets, []model.QueryResultRow{row}) + buckets = append(buckets, []QueryResultRow{row}) } lastRow = row }