Skip to content

Commit

Permalink
Fix one pipeline aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed May 17, 2024
1 parent 089ef0c commit e8ff2f5
Showing 1 changed file with 45 additions and 22 deletions.
67 changes: 45 additions & 22 deletions quesma/model/pipeline_aggregations/derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,36 +35,59 @@ func (query Derivative) TranslateSqlResponseToJson(rows []model.QueryResultRow,
return response
}

func (query Derivative) CalculateResultWhenMissing(rowIndex int, parentRows []model.QueryResultRow, previousResultsCurrentAggregation []model.QueryResultRow) model.QueryResultRow {
resultRow := parentRows[rowIndex].Copy() // result is the same as parent, with an exception of last element, which we'll change below
var resultValue any
if rowIndex == 0 {
resultValue = nil
} else {
previousValue := parentRows[rowIndex-1].Cols[len(parentRows[rowIndex-1].Cols)-1].Value
currentValue := parentRows[rowIndex].Cols[len(parentRows[rowIndex].Cols)-1].Value
currentValueAsFloat, ok := util.ExtractFloat64Maybe(currentValue)
if ok {
previousValueAsFloat, ok := util.ExtractFloat64Maybe(previousValue)
if ok {
resultValue = currentValueAsFloat - previousValueAsFloat
func (query Derivative) CalculateResultWhenMissing(qwa *model.Query, parentRows []model.QueryResultRow) []model.QueryResultRow {
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
if len(parentRows) == 0 {
return resultRows
}

firstRow := parentRows[0].Copy()
firstRow.Cols[len(firstRow.Cols)-1].Value = nil
resultRows = append(resultRows, firstRow)
if _, firstRowValueIsFloat := util.ExtractFloat64Maybe(parentRows[0].LastColValue()); firstRowValueIsFloat {
for i, currentRow := range parentRows[1:] {
previousRow := parentRows[i]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractFloat64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractFloat64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous value to float: %v, currentValue: %v", previousValue, currentValue)
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw)
resultValue = nil
}
} else {
previousValueAsInt, okPrevious := util.ExtractInt64Maybe(previousValue)
currentValueAsInt, okParent := util.ExtractInt64Maybe(currentValue)
if okPrevious && okParent {
resultValue = currentValueAsInt - previousValueAsInt
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
} else { // cumulative sum must be on numeric, so if it's not float64, it should always be int
for i, currentRow := range parentRows[1:] {
previousRow := parentRows[i]
previousValueRaw := previousRow.LastColValue()
previousValue, okPrevious := util.ExtractInt64Maybe(previousValueRaw)

currentValueRaw := currentRow.LastColValue()
currentValue, okCurrent := util.ExtractInt64Maybe(currentValueRaw)

var resultValue any
if okPrevious && okCurrent {
resultValue = currentValue - previousValue
} else {
logger.WarnWithCtx(query.ctx).Msgf("could not convert previous or current value to int, previousValue: %v, currentValue: %v. Using nil as result", previousValue, currentValue)
logger.WarnWithCtx(query.ctx).Msgf("could not convert value to float: previousValue: %v, type: %T; currentValue: %v, type: %T. Skipping",
previousValueRaw, previousValueRaw, currentValueRaw, currentValueRaw)
resultValue = nil
}
resultRow := currentRow.Copy()
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
resultRows = append(resultRows, resultRow)
}
}
resultRow.Cols[len(resultRow.Cols)-1].Value = resultValue
return resultRow
return resultRows
}

func (query Derivative) PostprocessResults(rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
Expand Down

0 comments on commit e8ff2f5

Please sign in to comment.