From 42a7218bbc937248c1bef459c49eb448403816cf Mon Sep 17 00:00:00 2001 From: Krzysztof Kiewicz Date: Tue, 3 Sep 2024 17:34:10 +0200 Subject: [PATCH] Fix suggestions #1 --- quesma/queryparser/pancake_json_rendering.go | 12 ++++++------ quesma/queryparser/pancake_pipelines.go | 16 ++++++++-------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/quesma/queryparser/pancake_json_rendering.go b/quesma/queryparser/pancake_json_rendering.go index de20de4a3..ffbf953d2 100644 --- a/quesma/queryparser/pancake_json_rendering.go +++ b/quesma/queryparser/pancake_json_rendering.go @@ -13,14 +13,14 @@ import ( ) type pancakeJSONRenderer struct { - ctx context.Context - pipelinesProcessor pancakePipelinesProcessor + ctx context.Context + pipeline pancakePipelinesProcessor } func newPancakeJSONRenderer(ctx context.Context) *pancakeJSONRenderer { return &pancakeJSONRenderer{ - ctx: ctx, - pipelinesProcessor: pancakePipelinesProcessor{ctx: ctx}, + ctx: ctx, + pipeline: pancakePipelinesProcessor{ctx: ctx}, } } @@ -204,7 +204,7 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer, } // pipeline aggregations of metric type behave just like metric - for metricPipelineAggrName, metricPipelineAggrResult := range p.pipelinesProcessor.calculateThisLayerMetricPipelines(layer, rows) { + for metricPipelineAggrName, metricPipelineAggrResult := range p.pipeline.currentPipelineMetricAggregations(layer, rows) { result[metricPipelineAggrName] = metricPipelineAggrResult // TODO: maybe add metadata also here? probably not needed } @@ -236,7 +236,7 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer, hasSubaggregations := len(remainingLayers) > 1 if hasSubaggregations { nextLayer := remainingLayers[1] - pipelineBucketsPerAggregation := p.pipelinesProcessor.calculateThisLayerBucketPipelines(layer, nextLayer, bucketRows, rows, rowIndexes) + pipelineBucketsPerAggregation := p.pipeline.currentPipelineBucketAggregations(layer, nextLayer, bucketRows, rows, rowIndexes) // Add subAggregations (both normal and pipeline) bucketArrRaw, ok := buckets["buckets"] diff --git a/quesma/queryparser/pancake_pipelines.go b/quesma/queryparser/pancake_pipelines.go index dafd5d6a1..22911db4c 100644 --- a/quesma/queryparser/pancake_pipelines.go +++ b/quesma/queryparser/pancake_pipelines.go @@ -33,7 +33,7 @@ func (p pancakePipelinesProcessor) selectPipelineRows(pipeline model.PipelineQue return } -func (p pancakePipelinesProcessor) calculateThisLayerMetricPipelines(layer *pancakeModelLayer, +func (p pancakePipelinesProcessor) currentPipelineMetricAggregations(layer *pancakeModelLayer, rows []model.QueryResultRow) (resultPerPipeline map[string]model.JsonMap) { resultPerPipeline = make(map[string]model.JsonMap) @@ -44,7 +44,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerMetricPipelines(layer *panc continue } - thisPipelineResults := p.processSingleMetricPipeline(layer, pipeline, rows) + thisPipelineResults := p.calcSingleMetricPipeline(layer, pipeline, rows) errorMsg := fmt.Sprintf("calculateThisLayerMetricPipelines, pipeline: %s", pipeline.internalName) resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, thisPipelineResults, errorMsg) @@ -53,7 +53,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerMetricPipelines(layer *panc return } -func (p pancakePipelinesProcessor) processSingleMetricPipeline(layer *pancakeModelLayer, +func (p pancakePipelinesProcessor) calcSingleMetricPipeline(layer *pancakeModelLayer, pipeline *pancakeModelPipelineAggregation, rows []model.QueryResultRow) (resultPerPipeline map[string]model.JsonMap) { resultPerPipeline = make(map[string]model.JsonMap) @@ -63,7 +63,7 @@ func (p pancakePipelinesProcessor) processSingleMetricPipeline(layer *pancakeMod resultPerPipeline[pipeline.name] = pipeline.queryType.TranslateSqlResponseToJson(resultRows, 0) // TODO: fill level? for _, pipelineChild := range layer.findPipelineChildren(pipeline) { - childResults := p.processSingleMetricPipeline(layer, pipelineChild, resultRows) + childResults := p.calcSingleMetricPipeline(layer, pipelineChild, resultRows) errorMsg := fmt.Sprintf("processSingleMetricPipeline, pipeline: %s, pipelineChild: %s", pipeline.internalName, pipelineChild.internalName) resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, childResults, errorMsg) @@ -75,7 +75,7 @@ func (p pancakePipelinesProcessor) processSingleMetricPipeline(layer *pancakeMod // input parameters: bucketRows is a subset of rows (it both has <= columns, and <= rows). // If e.g. rowIndexes = [2, 5], then bucketRows = [rows[2], rows[5]] (with maybe some columns removed) // We need rows and rowIndexes to fetch proper metric column from rows. -func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, nextLayer *pancakeModelLayer, bucketRows []model.QueryResultRow, +func (p pancakePipelinesProcessor) currentPipelineBucketAggregations(layer, nextLayer *pancakeModelLayer, bucketRows []model.QueryResultRow, rows []model.QueryResultRow, rowIndexes []int) (resultRowsPerPipeline map[string][]model.QueryResultRow) { resultRowsPerPipeline = make(map[string][]model.QueryResultRow) @@ -103,7 +103,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, next bucketRowsTransformedIfNeeded = bucketRows } - childResults := p.processSingleBucketPipeline(nextLayer, childPipeline, bucketRowsTransformedIfNeeded) + childResults := p.calcSinglePipelineBucket(nextLayer, childPipeline, bucketRowsTransformedIfNeeded) for pipelineName, pipelineResults := range childResults { if _, alreadyExists := resultRowsPerPipeline[pipelineName]; alreadyExists { // sanity check logger.ErrorWithCtx(p.ctx).Msgf("pipeline %s already exists in resultsPerPipeline", pipelineName) @@ -119,7 +119,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, next return } -func (p pancakePipelinesProcessor) processSingleBucketPipeline(layer *pancakeModelLayer, pipeline *pancakeModelPipelineAggregation, +func (p pancakePipelinesProcessor) calcSinglePipelineBucket(layer *pancakeModelLayer, pipeline *pancakeModelPipelineAggregation, bucketRows []model.QueryResultRow) (resultRowsPerPipeline map[string][]model.QueryResultRow) { resultRowsPerPipeline = make(map[string][]model.QueryResultRow) @@ -128,7 +128,7 @@ func (p pancakePipelinesProcessor) processSingleBucketPipeline(layer *pancakeMod resultRowsPerPipeline[pipeline.name] = currentPipelineResults for _, pipelineChild := range layer.findPipelineChildren(pipeline) { - childPipelineResults := p.processSingleBucketPipeline(layer, pipelineChild, currentPipelineResults) + childPipelineResults := p.calcSinglePipelineBucket(layer, pipelineChild, currentPipelineResults) for name, results := range childPipelineResults { if _, alreadyExists := resultRowsPerPipeline[name]; alreadyExists { // sanity check logger.ErrorWithCtx(p.ctx).Msgf("pipeline %s already exists in resultsPerPipeline", name)