Skip to content

Commit

Permalink
Algorithm implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
trzysiek committed Dec 27, 2024
1 parent 7afebeb commit de58597
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 57 deletions.
134 changes: 77 additions & 57 deletions quesma/quesma/schema_search_after_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
type (
searchAfterStrategy interface {
// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParameterParsed []model.Expr, err error)
validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParamParsed []model.Expr, err error)
transform(query *model.Query, searchAfterParameterParsed []model.Expr) (*model.Query, error)
}
searchAfterStrategyType int
Expand Down Expand Up @@ -52,49 +52,64 @@ func (s searchAfterStrategyType) String() string {
// pkHashes []string // md5 for now, should be improved to shorten hashes lengths
// searchAfter any

type searchAfterStrategyBulletproof struct{} // TODO, don't look!
type searchAfterStrategyBulletproof struct {
}

// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
func (s searchAfterStrategyBulletproof) validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParameterParsed []model.Expr, err error) {
logger.Debug().Msgf("searchAfter: %v", query.SearchAfter)
if query.SearchAfter == nil {
return nil, nil
func (s searchAfterStrategyBulletproof) validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParamsParsed []model.Expr, err error) {
sortParamsParsed, err := validateAndParseCommonOnlySortParams(query, indexSchema)
if err != nil {
return sortParamsParsed, err
}

asArray, ok := query.SearchAfter.([]any)
if !ok {
return nil, fmt.Errorf("search_after must be an array, got: %v", query.SearchAfter)
searchAfter, isArr := query.SearchAfter.([]any)
if !isArr {
return make([]model.Expr, 0), nil
}

if len(asArray) != len(query.SelectCommand.OrderBy) {
return nil, fmt.Errorf("len(search_after) != len(sortFields), search_after: %v, sortFields: %v", asArray, query.SelectCommand.OrderBy)
searchAfterParamsParsed = make([]model.Expr, 0, len(searchAfter))
searchAfterParamsParsed = append(searchAfterParamsParsed, sortParamsParsed...)
sortParamsNr := len(sortParamsParsed)
for i := sortParamsNr; i < len(searchAfter); i++ {
searchAfterParamsParsed = append(searchAfterParamsParsed, model.NewLiteral(util.SingleQuoteIfString(searchAfter[i])))
}

return nil, nil
/*
return searchAfterParamsParsed, nil
}

var timestampMs int64
if shouldBeTimestamp, ok := util.ExtractNumeric64Maybe(asArray[0]); ok {
if shouldBeTimestamp >= 0 && util.IsFloat64AnInt64(shouldBeTimestamp) {
timestampMs = int64(shouldBeTimestamp)
} else {
return empty, fmt.Errorf("for Bulletproof strategy, search_after[0] must be a unix timestamp in milliseconds")
}
} else {
return empty, fmt.Errorf("for Bulletproof strategy, search_after must be an integer")
func (s searchAfterStrategyBulletproof) transform(query *model.Query, searchAfterParsed []model.Expr) (*model.Query, error) {
// If all order by's would be DESC, we would add to the where clause:
// tuple(sortField1, sortField2, ...) > tuple(searchAfter1, searchAfter2, ...)
// OR (tuple(sortField1, sortField2, ...) == tuple(searchAfter1, searchAfter2, ...)
// AND primary_key NOT IN (searchAfterPrimaryKey1, searchAfterPrimaryKey2, ...))

// Because some fields might be ASC, we need to swap sortField_i with searchAfter_i
fmt.Println("searchAfterParsed", searchAfterParsed)
sortFieldsNr := len(query.SelectCommand.OrderBy)
lhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
rhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
for i, searchAfterValue := range searchAfterParsed {
lhs.Exprs[i] = searchAfterValue
rhs.Exprs[i] = query.SelectCommand.OrderBy[i].Expr
if query.SelectCommand.OrderBy[i].Direction == model.AscOrder {
lhs.Exprs[i], rhs.Exprs[i] = rhs.Exprs[i], lhs.Exprs[i] // swap
}
}

return searchAfterParsedBulletproof{timestampMs: timestampMs, pkHashes: make([]string, 0)}, nil // TODO add parsing pk hashes
newWhereClause1 := model.NewInfixExpr(lhs, ">", rhs)
pkField := query.Schema.GetPrimaryKey()
if len(searchAfterParsed) == sortFieldsNr || pkField == nil {
// It means we have 0 primary keys -> we just imitate basicAndFast strategy
query.SelectCommand.WhereClause = model.And([]model.Expr{query.SelectCommand.WhereClause, newWhereClause1})
return query, nil
}

*/
}
notInTuple := model.NewTupleExpr(searchAfterParsed[sortFieldsNr:]...)
newWhereClause2_1 := model.NewInfixExpr(lhs, "=", rhs)
newWhereClause2_2 := model.NewInfixExpr(model.NewColumnRef(pkField.AsString()), "NOT IN", notInTuple)

func (s searchAfterStrategyBulletproof) transform(query *model.Query, searchAfterParsed []model.Expr) (*model.Query, error) {
//timestampRangeClause := NewInfixExpr(s.timestampField, "<=", NewFunction("fromUnixTimestamp64Milli", NewLiteral(searchAfter.timestampMs))) // TODO fix this for Hydrolix...
logger.Debug().Msgf("search_after: %v, query before: %v", query.SearchAfter, model.AsString(query.SelectCommand))
//query.SelectCommand.WhereClause = And([]Expr{query.SelectCommand.WhereClause, timestampRangeClause})
//query.SelectCommand.Limit += len(s.pkHashes)
logger.Debug().Msgf("query after: %v", model.AsString(query.SelectCommand))
newWhereClauseFull := model.Or([]model.Expr{newWhereClause1, model.And([]model.Expr{newWhereClause2_1, newWhereClause2_2})})
query.SelectCommand.WhereClause = model.And([]model.Expr{query.SelectCommand.WhereClause, newWhereClauseFull})
return query, nil
}

Expand All @@ -105,7 +120,7 @@ func (s searchAfterStrategyBulletproof) transform(query *model.Query, searchAfte
type searchAfterStrategyJustDiscardTheParameter struct{}

// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
func (s searchAfterStrategyJustDiscardTheParameter) validateAndParse(*model.Query, schema.Schema) (searchAfterParameterParsed []model.Expr, err error) {
func (s searchAfterStrategyJustDiscardTheParameter) validateAndParse(*model.Query, schema.Schema) (searchAfterParamParsed []model.Expr, err error) {
return nil, nil
}

Expand All @@ -120,7 +135,32 @@ func (s searchAfterStrategyJustDiscardTheParameter) transform(query *model.Query
type searchAfterStrategyBasicAndFast struct{}

// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParsed []model.Expr, err error) {
func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParamParsed []model.Expr, err error) {
return validateAndParseCommonOnlySortParams(query, indexSchema)
}

func (s searchAfterStrategyBasicAndFast) transform(query *model.Query, searchAfterParsed []model.Expr) (*model.Query, error) {
// If all order by's would be DESC, we would add to the where clause:
// tuple(sortField1, sortField2, ...) > tuple(searchAfter1, searchAfter2, ...)
// But because some fields might be ASC, we need to swap sortField_i with searchAfter_i
fmt.Println("searchAfterParsed", searchAfterParsed)
sortFieldsNr := len(searchAfterParsed)
lhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
rhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
for i, searchAfterValue := range searchAfterParsed {
lhs.Exprs[i] = searchAfterValue
rhs.Exprs[i] = query.SelectCommand.OrderBy[i].Expr
if query.SelectCommand.OrderBy[i].Direction == model.AscOrder {
lhs.Exprs[i], rhs.Exprs[i] = rhs.Exprs[i], lhs.Exprs[i] // swap
}
}

newWhereClause := model.NewInfixExpr(lhs, ">", rhs)
query.SelectCommand.WhereClause = model.And([]model.Expr{query.SelectCommand.WhereClause, newWhereClause})
return query, nil
}

func validateAndParseCommonOnlySortParams(query *model.Query, indexSchema schema.Schema) (searchAfterParamParsed []model.Expr, err error) {
if query.SearchAfter == nil {
return nil, nil
}
Expand All @@ -134,7 +174,7 @@ func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, in
}

sortFieldsNr := len(asArray)
searchAfterParsed = make([]model.Expr, sortFieldsNr)
searchAfterParamParsed = make([]model.Expr, sortFieldsNr)
for i, searchAfterValue := range asArray {
column, ok := query.SelectCommand.OrderBy[i].Expr.(model.ColumnRef)
if !ok {
Expand All @@ -150,39 +190,19 @@ func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, in
if number, isNumber := util.ExtractNumeric64Maybe(searchAfterValue); isNumber {
if number >= 0 && util.IsFloat64AnInt64(number) {
// this param will always be timestamp in milliseconds, as we create it like this while rendering hits
searchAfterParsed[i] = model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(int64(number)))
searchAfterParamParsed[i] = model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(int64(number)))
} else {
return nil, fmt.Errorf("for basicAndFast strategy, search_after must be a unix timestamp in milliseconds")
}
} else {
return nil, fmt.Errorf("for basicAndFast strategy, search_after must be a number")
}
} else {
searchAfterParsed[i] = model.NewLiteral(util.SingleQuoteIfString(searchAfterValue))
}
}

return searchAfterParsed, nil
}

func (s searchAfterStrategyBasicAndFast) transform(query *model.Query, searchAfterParsed []model.Expr) (*model.Query, error) {
// If all order by's would be DESC, we would add to the where clause:
// tuple(sortField1, sortField2, ...) > tuple(searchAfter1, searchAfter2, ...)
// But because some fields might be ASC, we need to swap sortField_i with searchAfter_i
sortFieldsNr := len(searchAfterParsed)
lhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
rhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
for i, searchAfterValue := range searchAfterParsed {
lhs.Exprs[i] = searchAfterValue
rhs.Exprs[i] = query.SelectCommand.OrderBy[i].Expr
if query.SelectCommand.OrderBy[i].Direction == model.AscOrder {
lhs.Exprs[i], rhs.Exprs[i] = rhs.Exprs[i], lhs.Exprs[i] // swap
searchAfterParamParsed[i] = model.NewLiteral(util.SingleQuoteIfString(searchAfterValue))
}
}

newWhereClause := model.NewInfixExpr(lhs, ">", rhs)
query.SelectCommand.WhereClause = model.And([]model.Expr{query.SelectCommand.WhereClause, newWhereClause})
return query, nil
return searchAfterParamParsed, nil
}

func (s *SchemaCheckPass) applySearchAfterParameter(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
Expand Down
5 changes: 5 additions & 0 deletions quesma/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type (
Schema struct {
Fields map[FieldName]Field
Aliases map[FieldName]FieldName
primaryKey *FieldName // nil if no primary key
ExistsInDataSource bool

// DatabaseName is the name of the database/schema in the data source,
Expand Down Expand Up @@ -81,3 +82,7 @@ func (s Schema) ResolveField(fieldName string) (Field, bool) {
field, exists := s.Fields[FieldName(fieldName)]
return field, exists
}

func (s Schema) GetPrimaryKey() *FieldName {
return s.primaryKey
}

0 comments on commit de58597

Please sign in to comment.