Skip to content

Commit

Permalink
Fix suggestions #1
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Sep 3, 2024
1 parent 6179a9c commit 42a7218
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
12 changes: 6 additions & 6 deletions quesma/queryparser/pancake_json_rendering.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ import (
)

type pancakeJSONRenderer struct {
ctx context.Context
pipelinesProcessor pancakePipelinesProcessor
ctx context.Context
pipeline pancakePipelinesProcessor
}

func newPancakeJSONRenderer(ctx context.Context) *pancakeJSONRenderer {
return &pancakeJSONRenderer{
ctx: ctx,
pipelinesProcessor: pancakePipelinesProcessor{ctx: ctx},
ctx: ctx,
pipeline: pancakePipelinesProcessor{ctx: ctx},
}
}

Expand Down Expand Up @@ -204,7 +204,7 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
}

// pipeline aggregations of metric type behave just like metric
for metricPipelineAggrName, metricPipelineAggrResult := range p.pipelinesProcessor.calculateThisLayerMetricPipelines(layer, rows) {
for metricPipelineAggrName, metricPipelineAggrResult := range p.pipeline.currentPipelineMetricAggregations(layer, rows) {
result[metricPipelineAggrName] = metricPipelineAggrResult
// TODO: maybe add metadata also here? probably not needed
}
Expand Down Expand Up @@ -236,7 +236,7 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
hasSubaggregations := len(remainingLayers) > 1
if hasSubaggregations {
nextLayer := remainingLayers[1]
pipelineBucketsPerAggregation := p.pipelinesProcessor.calculateThisLayerBucketPipelines(layer, nextLayer, bucketRows, rows, rowIndexes)
pipelineBucketsPerAggregation := p.pipeline.currentPipelineBucketAggregations(layer, nextLayer, bucketRows, rows, rowIndexes)

// Add subAggregations (both normal and pipeline)
bucketArrRaw, ok := buckets["buckets"]
Expand Down
16 changes: 8 additions & 8 deletions quesma/queryparser/pancake_pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (p pancakePipelinesProcessor) selectPipelineRows(pipeline model.PipelineQue
return
}

func (p pancakePipelinesProcessor) calculateThisLayerMetricPipelines(layer *pancakeModelLayer,
func (p pancakePipelinesProcessor) currentPipelineMetricAggregations(layer *pancakeModelLayer,
rows []model.QueryResultRow) (resultPerPipeline map[string]model.JsonMap) {

resultPerPipeline = make(map[string]model.JsonMap)
Expand All @@ -44,7 +44,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerMetricPipelines(layer *panc
continue
}

thisPipelineResults := p.processSingleMetricPipeline(layer, pipeline, rows)
thisPipelineResults := p.calcSingleMetricPipeline(layer, pipeline, rows)

errorMsg := fmt.Sprintf("calculateThisLayerMetricPipelines, pipeline: %s", pipeline.internalName)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, thisPipelineResults, errorMsg)
Expand All @@ -53,7 +53,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerMetricPipelines(layer *panc
return
}

func (p pancakePipelinesProcessor) processSingleMetricPipeline(layer *pancakeModelLayer,
func (p pancakePipelinesProcessor) calcSingleMetricPipeline(layer *pancakeModelLayer,
pipeline *pancakeModelPipelineAggregation, rows []model.QueryResultRow) (resultPerPipeline map[string]model.JsonMap) {

resultPerPipeline = make(map[string]model.JsonMap)
Expand All @@ -63,7 +63,7 @@ func (p pancakePipelinesProcessor) processSingleMetricPipeline(layer *pancakeMod
resultPerPipeline[pipeline.name] = pipeline.queryType.TranslateSqlResponseToJson(resultRows, 0) // TODO: fill level?

for _, pipelineChild := range layer.findPipelineChildren(pipeline) {
childResults := p.processSingleMetricPipeline(layer, pipelineChild, resultRows)
childResults := p.calcSingleMetricPipeline(layer, pipelineChild, resultRows)

errorMsg := fmt.Sprintf("processSingleMetricPipeline, pipeline: %s, pipelineChild: %s", pipeline.internalName, pipelineChild.internalName)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, childResults, errorMsg)
Expand All @@ -75,7 +75,7 @@ func (p pancakePipelinesProcessor) processSingleMetricPipeline(layer *pancakeMod
// input parameters: bucketRows is a subset of rows (it both has <= columns, and <= rows).
// If e.g. rowIndexes = [2, 5], then bucketRows = [rows[2], rows[5]] (with maybe some columns removed)
// We need rows and rowIndexes to fetch proper metric column from rows.
func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, nextLayer *pancakeModelLayer, bucketRows []model.QueryResultRow,
func (p pancakePipelinesProcessor) currentPipelineBucketAggregations(layer, nextLayer *pancakeModelLayer, bucketRows []model.QueryResultRow,
rows []model.QueryResultRow, rowIndexes []int) (resultRowsPerPipeline map[string][]model.QueryResultRow) {

resultRowsPerPipeline = make(map[string][]model.QueryResultRow)
Expand Down Expand Up @@ -103,7 +103,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, next
bucketRowsTransformedIfNeeded = bucketRows
}

childResults := p.processSingleBucketPipeline(nextLayer, childPipeline, bucketRowsTransformedIfNeeded)
childResults := p.calcSinglePipelineBucket(nextLayer, childPipeline, bucketRowsTransformedIfNeeded)
for pipelineName, pipelineResults := range childResults {
if _, alreadyExists := resultRowsPerPipeline[pipelineName]; alreadyExists { // sanity check
logger.ErrorWithCtx(p.ctx).Msgf("pipeline %s already exists in resultsPerPipeline", pipelineName)
Expand All @@ -119,7 +119,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, next
return
}

func (p pancakePipelinesProcessor) processSingleBucketPipeline(layer *pancakeModelLayer, pipeline *pancakeModelPipelineAggregation,
func (p pancakePipelinesProcessor) calcSinglePipelineBucket(layer *pancakeModelLayer, pipeline *pancakeModelPipelineAggregation,
bucketRows []model.QueryResultRow) (resultRowsPerPipeline map[string][]model.QueryResultRow) {

resultRowsPerPipeline = make(map[string][]model.QueryResultRow)
Expand All @@ -128,7 +128,7 @@ func (p pancakePipelinesProcessor) processSingleBucketPipeline(layer *pancakeMod
resultRowsPerPipeline[pipeline.name] = currentPipelineResults

for _, pipelineChild := range layer.findPipelineChildren(pipeline) {
childPipelineResults := p.processSingleBucketPipeline(layer, pipelineChild, currentPipelineResults)
childPipelineResults := p.calcSinglePipelineBucket(layer, pipelineChild, currentPipelineResults)
for name, results := range childPipelineResults {
if _, alreadyExists := resultRowsPerPipeline[name]; alreadyExists { // sanity check
logger.ErrorWithCtx(p.ctx).Msgf("pipeline %s already exists in resultsPerPipeline", name)
Expand Down

0 comments on commit 42a7218

Please sign in to comment.