Skip to content

Commit

Permalink
Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Sep 3, 2024
1 parent 63465ed commit 6179a9c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
8 changes: 6 additions & 2 deletions quesma/queryparser/pancake_json_rendering.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ func (p *pancakeJSONRenderer) selectPrefixRows(prefix string, rows []model.Query
return
}

// rowIndexes - which row in the original result set corresponds to the first row of the bucket
// It's needed for pipeline aggregations, as we might need to take some other columns from the original row to calculate them.
func (p *pancakeJSONRenderer) splitBucketRows(bucket *pancakeModelBucketAggregation, rows []model.QueryResultRow) (
buckets []model.QueryResultRow, subAggrs [][]model.QueryResultRow, rowIndexes []int) {

Expand Down Expand Up @@ -102,8 +104,10 @@ func (p *pancakeJSONRenderer) splitBucketRows(bucket *pancakeModelBucketAggregat
// We accomplish that by increasing limit by one during SQL query and then filtering out during JSON rendering.
// So we either filter out empty or last one if there is none.
// This can't be replaced by WHERE in generic case.
func (p *pancakeJSONRenderer) potentiallyRemoveExtraBucket(layer *pancakeModelLayer,
bucketRows []model.QueryResultRow,
//
// rowIndexes - which row in the original result set corresponds to the first row of the bucket
// It's needed for pipeline aggregations, as we might need to take some other columns from the original row to calculate them.
func (p *pancakeJSONRenderer) potentiallyRemoveExtraBucket(layer *pancakeModelLayer, bucketRows []model.QueryResultRow,
subAggrRows [][]model.QueryResultRow, rowIndexes []int) ([]model.QueryResultRow, [][]model.QueryResultRow, []int) {
// We are filter out null
if layer.nextBucketAggregation.filterOurEmptyKeyBucket {
Expand Down
28 changes: 20 additions & 8 deletions quesma/queryparser/pancake_pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ func (p pancakePipelinesProcessor) calculateThisLayerMetricPipelines(layer *panc
}

thisPipelineResults := p.processSingleMetricPipeline(layer, pipeline, rows)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, thisPipelineResults,
fmt.Sprintf("calculateThisLayerMetricPipelines, pipeline: %s", pipeline.internalName))

errorMsg := fmt.Sprintf("calculateThisLayerMetricPipelines, pipeline: %s", pipeline.internalName)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, thisPipelineResults, errorMsg)
}

return
Expand All @@ -63,13 +64,17 @@ func (p pancakePipelinesProcessor) processSingleMetricPipeline(layer *pancakeMod

for _, pipelineChild := range layer.findPipelineChildren(pipeline) {
childResults := p.processSingleMetricPipeline(layer, pipelineChild, resultRows)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, childResults,
fmt.Sprintf("processSingleMetricPipeline, pipeline: %s, pipelineChild: %s", pipeline.internalName, pipelineChild.internalName))

errorMsg := fmt.Sprintf("processSingleMetricPipeline, pipeline: %s, pipelineChild: %s", pipeline.internalName, pipelineChild.internalName)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, childResults, errorMsg)
}

return
}

// input parameters: bucketRows is a subset of rows (it both has <= columns, and <= rows).
// If e.g. rowIndexes = [2, 5], then bucketRows = [rows[2], rows[5]] (with maybe some columns removed)
// We need rows and rowIndexes to fetch proper metric column from rows.
func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, nextLayer *pancakeModelLayer, bucketRows []model.QueryResultRow,
rows []model.QueryResultRow, rowIndexes []int) (resultRowsPerPipeline map[string][]model.QueryResultRow) {

Expand All @@ -84,7 +89,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, next
needToAddProperMetricColumn := !childPipeline.queryType.IsCount() // If count, last column of bucketRows is already count we need.
if needToAddProperMetricColumn {
columnName := childPipeline.parentColumnName(p.ctx)
bucketRows, oldColumnArr = p.addColumn(columnName, bucketRows, rows, rowIndexes)
bucketRows, oldColumnArr = p.addProperPipelineColumn(columnName, bucketRows, rows, rowIndexes)
}

var bucketRowsTransformedIfNeeded []model.QueryResultRow
Expand All @@ -107,7 +112,7 @@ func (p pancakePipelinesProcessor) calculateThisLayerBucketPipelines(layer, next
}

if needToAddProperMetricColumn {
bucketRows = p.restoreLastColumn(bucketRows, oldColumnArr)
bucketRows = p.restoreOriginalColumn(bucketRows, oldColumnArr)
}
}

Expand Down Expand Up @@ -135,7 +140,13 @@ func (p pancakePipelinesProcessor) processSingleBucketPipeline(layer *pancakeMod
return
}

func (p pancakePipelinesProcessor) addColumn(parentColumnName string, selectedRows, allRows []model.QueryResultRow,
// returns:
// - newSelectedRows: same as selectedRows, but with one column different if needed (value for this column is taken from
// allRows, which has >= columns than selectedRows, and should have the column we need)
// - oldColumnArray: old value of the exchanged column, to be restored in restoreOriginalColumn after processing
//
// Use restoreOriginalColumn after processing to restore original values.
func (p pancakePipelinesProcessor) addProperPipelineColumn(parentColumnName string, selectedRows, allRows []model.QueryResultRow,
selectedRowsIndexes []int) (newSelectedRows []model.QueryResultRow, oldColumnArray []any) {

if len(allRows) == 0 {
Expand Down Expand Up @@ -167,7 +178,8 @@ func (p pancakePipelinesProcessor) addColumn(parentColumnName string, selectedRo
return
}

func (p pancakePipelinesProcessor) restoreLastColumn(rows []model.QueryResultRow, valuesToRestore []any) []model.QueryResultRow {
// used after addProperPipelineColumn
func (p pancakePipelinesProcessor) restoreOriginalColumn(rows []model.QueryResultRow, valuesToRestore []any) []model.QueryResultRow {
for i, row := range rows {
row.Cols[len(row.Cols)-1].Value = valuesToRestore[i]
}
Expand Down

0 comments on commit 6179a9c

Please sign in to comment.