Skip to content

Commit

Permalink
feat: Multiple docs with nil value on unique-indexed field (#2276)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #2180

## Description

Allow multiple docs with nil value on unique-indexed field.
This change also included few minor query optimizations that involve
secondary indexes.

Explain request is slightly adjusted not to increase field fetches count
upon fetching indexes.
  • Loading branch information
islamaliev authored Feb 7, 2024
1 parent cb08c18 commit 86b59c6
Show file tree
Hide file tree
Showing 15 changed files with 969 additions and 229 deletions.
11 changes: 11 additions & 0 deletions client/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package client

import (
"encoding/json"
"errors"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -468,6 +469,16 @@ func (doc *Document) GetValue(field string) (*FieldValue, error) {
}
}

// TryGetValue returns the value for a given field, if it exists.
// If the field does not exist then return nil and an error.
func (doc *Document) TryGetValue(field string) (*FieldValue, error) {
val, err := doc.GetValue(field)
if err != nil && errors.Is(err, ErrFieldNotExist) {
return nil, nil
}
return val, err
}

// GetValueWithField gets the Value type from a given Field type
func (doc *Document) GetValueWithField(f Field) (*FieldValue, error) {
doc.mu.RLock()
Expand Down
9 changes: 7 additions & 2 deletions db/fetcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
errFailedToGetDagNode string = "failed to get DAG Node"
errMissingMapper string = "missing document mapper"
errInvalidInOperatorValue string = "invalid _in/_nin value"
errInvalidIndexFilterCondition string = "invalid index filter condition"
errInvalidFilterOperator string = "invalid filter operator is provided"
)

var (
Expand All @@ -44,7 +44,7 @@ var (
ErrMissingMapper = errors.New(errMissingMapper)
ErrSingleSpanOnly = errors.New("spans must contain only a single entry")
ErrInvalidInOperatorValue = errors.New(errInvalidInOperatorValue)
ErrInvalidIndexFilterCondition = errors.New(errInvalidIndexFilterCondition)
ErrInvalidFilterOperator = errors.New(errInvalidFilterOperator)
)

// NewErrFieldIdNotFound returns an error indicating that the given FieldId was not found.
Expand Down Expand Up @@ -97,3 +97,8 @@ func NewErrVFetcherFailedToGetDagLink(inner error) error {
func NewErrFailedToGetDagNode(inner error) error {
return errors.Wrap(errFailedToGetDagNode, inner)
}

// NewErrInvalidFilterOperator returns an error indicating that the given filter operator is invalid.
func NewErrInvalidFilterOperator(operator string) error {
return errors.New(errInvalidFilterOperator, errors.NewKV("Operator", operator))
}
11 changes: 8 additions & 3 deletions db/fetcher/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,28 @@ func (f *IndexFetcher) FetchNext(ctx context.Context) (EncodedDocument, ExecInfo
return nil, f.execInfo, nil
}

// This CBOR-specific value will be gone soon once we implement
// our own encryption package
hasNilField := false
const cborNil = 0xf6
for i, indexedField := range f.indexedFields {
property := &encProperty{
Desc: indexedField,
Raw: res.key.FieldValues[i],
}
if len(res.key.FieldValues[i]) == 1 && res.key.FieldValues[i][0] == cborNil {
hasNilField = true
}

f.doc.properties[indexedField] = property
}

if f.indexDesc.Unique {
if f.indexDesc.Unique && !hasNilField {
f.doc.id = res.value
} else {
f.doc.id = res.key.FieldValues[len(res.key.FieldValues)-1]
}

f.execInfo.FieldsFetched++

if f.docFetcher != nil && len(f.docFields) > 0 {
targetKey := base.MakeDataStoreKeyWithCollectionAndDocID(f.col.Description(), string(f.doc.id))
spans := core.NewSpans(core.NewSpan(targetKey, targetKey.PrefixEnd()))
Expand Down
204 changes: 121 additions & 83 deletions db/fetcher/indexer_iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,14 @@ func (i *queryResultIterator) Close() error {
}

type eqPrefixIndexIterator struct {
indexKey core.IndexDataStoreKey
keyFieldValue []byte
execInfo *ExecInfo
matchers []valueMatcher
indexKey core.IndexDataStoreKey
execInfo *ExecInfo
matchers []valueMatcher

queryResultIterator
}

func (i *eqPrefixIndexIterator) Init(ctx context.Context, store datastore.DSReaderWriter) error {
i.indexKey.FieldValues = [][]byte{i.keyFieldValue}
resultIter, err := store.Query(ctx, query.Query{
Prefix: i.indexKey.ToString(),
})
Expand Down Expand Up @@ -166,11 +164,11 @@ func (i *eqSingleIndexIterator) Close() error {

type inIndexIterator struct {
indexIterator
keyFieldValues [][]byte
nextValIndex int
ctx context.Context
store datastore.DSReaderWriter
hasIterator bool
inValues [][]byte
nextValIndex int
ctx context.Context
store datastore.DSReaderWriter
hasIterator bool
}

func (i *inIndexIterator) nextIterator() (bool, error) {
Expand All @@ -181,15 +179,15 @@ func (i *inIndexIterator) nextIterator() (bool, error) {
}
}

if i.nextValIndex >= len(i.keyFieldValues) {
if i.nextValIndex >= len(i.inValues) {
return false, nil
}

switch fieldIter := i.indexIterator.(type) {
case *eqPrefixIndexIterator:
fieldIter.keyFieldValue = i.keyFieldValues[i.nextValIndex]
fieldIter.indexKey.FieldValues[0] = i.inValues[i.nextValIndex]
case *eqSingleIndexIterator:
fieldIter.keyFieldValues[0] = i.keyFieldValues[i.nextValIndex]
fieldIter.keyFieldValues[0] = i.inValues[i.nextValIndex]
}
err := i.indexIterator.Init(i.ctx, i.store)
if err != nil {
Expand Down Expand Up @@ -443,7 +441,7 @@ func createValueMatcher(op string, filterVal any) (valueMatcher, error) {
return &anyMatcher{}, nil
}

return nil, ErrInvalidIndexFilterCondition
return nil, NewErrInvalidFilterOperator(op)
}

func createValueMatchers(conditions []fieldFilterCond) ([]valueMatcher, error) {
Expand Down Expand Up @@ -494,15 +492,20 @@ func (f *IndexFetcher) determineFieldFilterConditions() []fieldFilterCond {

// isUniqueFetchByFullKey checks if the only index key can be fetched by the full index key.
//
// This method ignores the first condition because it's expected to be called only
// This method ignores the first condition (unless it's nil) because it's expected to be called only
// when the first field is used as a prefix in the index key. So we only check if the
// rest of the conditions are _eq.
func isUniqueFetchByFullKey(indexDesc *client.IndexDescription, conditions []fieldFilterCond) bool {
// we need to check length of conditions because full key fetch is only possible
// if all fields are specified in the filter
// if all fields of the index are specified in the filter
res := indexDesc.Unique && len(conditions) == len(indexDesc.Fields)

// first condition is not required to be _eq, but if is, val must be not nil
res = res && (conditions[0].op != opEq || conditions[0].val != nil)

// for the rest it must be _eq and val must be not nil
for i := 1; i < len(conditions); i++ {
res = res && conditions[i].op == opEq
res = res && (conditions[i].op == opEq && conditions[i].val != nil)
}
return res
}
Expand All @@ -520,9 +523,104 @@ func getFieldsBytes(conditions []fieldFilterCond) ([][]byte, error) {
return result, nil
}

// newPrefixIndexIterator creates a new eqPrefixIndexIterator for fetching indexed data.
// It can modify the input matchers slice.
func (f *IndexFetcher) newPrefixIndexIterator(
fieldConditions []fieldFilterCond,
matchers []valueMatcher,
) (*eqPrefixIndexIterator, error) {
keyFieldValues := make([][]byte, 0, len(fieldConditions))
for i := range fieldConditions {
if fieldConditions[i].op != opEq {
// prefix can be created only for subsequent _eq conditions
// if we encounter any other condition, we built the longest prefix we could
break
}

fieldVal := client.NewFieldValue(client.LWW_REGISTER, fieldConditions[i].val)

keyValueBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}

keyFieldValues = append(keyFieldValues, keyValueBytes)
}

// iterators for _eq filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
}

indexKey := f.newIndexDataStoreKey()
indexKey.FieldValues = keyFieldValues
return &eqPrefixIndexIterator{
indexKey: indexKey,
execInfo: &f.execInfo,
matchers: matchers,
}, nil
}

// newInIndexIterator creates a new inIndexIterator for fetching indexed data.
// It can modify the input matchers slice.
func (f *IndexFetcher) newInIndexIterator(
fieldConditions []fieldFilterCond,
matchers []valueMatcher,
) (*inIndexIterator, error) {
inArr, ok := fieldConditions[0].val.([]any)
if !ok {
return nil, ErrInvalidInOperatorValue
}
keyFieldArr := make([][]byte, 0, len(inArr))
for _, v := range inArr {
fieldVal := client.NewFieldValue(client.LWW_REGISTER, v)
keyFieldBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}
keyFieldArr = append(keyFieldArr, keyFieldBytes)
}

// iterators for _in filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
}

var iter indexIterator
if isUniqueFetchByFullKey(&f.indexDesc, fieldConditions) {
keyFieldValues, e := getFieldsBytes(fieldConditions[1:])
if e != nil {
return nil, e
}
keyFieldValues = append([][]byte{{}}, keyFieldValues...)
iter = &eqSingleIndexIterator{
indexKey: f.newIndexDataStoreKey(),
execInfo: &f.execInfo,
keyFieldValues: keyFieldValues,
}
} else {
indexKey := f.newIndexDataStoreKey()
indexKey.FieldValues = [][]byte{{}}
iter = &eqPrefixIndexIterator{
indexKey: indexKey,
execInfo: &f.execInfo,
matchers: matchers,
}
}
return &inIndexIterator{
indexIterator: iter,
inValues: keyFieldArr,
}, nil
}

func (f *IndexFetcher) newIndexDataStoreKey() core.IndexDataStoreKey {
return core.IndexDataStoreKey{CollectionID: f.col.ID(), IndexID: f.indexDesc.ID}
}

func (f *IndexFetcher) createIndexIterator() (indexIterator, error) {
fieldConditions := f.determineFieldFilterConditions()
indexDataStoreKey := core.IndexDataStoreKey{CollectionID: f.col.ID(), IndexID: f.indexDesc.ID}

matchers, err := createValueMatchers(fieldConditions)
if err != nil {
Expand All @@ -537,82 +635,22 @@ func (f *IndexFetcher) createIndexIterator() (indexIterator, error) {
return nil, err
}
return &eqSingleIndexIterator{
indexKey: indexDataStoreKey,
indexKey: f.newIndexDataStoreKey(),
keyFieldValues: keyFieldsBytes,
execInfo: &f.execInfo,
}, nil
} else {
fieldVal := client.NewFieldValue(client.LWW_REGISTER, fieldConditions[0].val)

keyValueBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}

// iterators for _eq filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
}

return &eqPrefixIndexIterator{
indexKey: indexDataStoreKey,
keyFieldValue: keyValueBytes,
execInfo: &f.execInfo,
matchers: matchers,
}, nil
return f.newPrefixIndexIterator(fieldConditions, matchers)
}
case opIn:
inArr, ok := fieldConditions[0].val.([]any)
if !ok {
return nil, errors.New("invalid _in/_nin value")
}
keyFieldArr := make([][]byte, 0, len(inArr))
for _, v := range inArr {
fieldVal := client.NewFieldValue(client.LWW_REGISTER, v)
keyFieldBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}
keyFieldArr = append(keyFieldArr, keyFieldBytes)
}

// iterators for _in filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
}

var iter indexIterator
if isUniqueFetchByFullKey(&f.indexDesc, fieldConditions) {
restFieldsVals, e := getFieldsBytes(fieldConditions[1:])
if e != nil {
return nil, e
}
restFieldsVals = append([][]byte{{}}, restFieldsVals...)
iter = &eqSingleIndexIterator{
indexKey: indexDataStoreKey,
execInfo: &f.execInfo,
keyFieldValues: restFieldsVals,
}
} else {
iter = &eqPrefixIndexIterator{
indexKey: indexDataStoreKey,
execInfo: &f.execInfo,
matchers: matchers,
}
}
return &inIndexIterator{
indexIterator: iter,
keyFieldValues: keyFieldArr,
}, nil
return f.newInIndexIterator(fieldConditions, matchers)
case opGt, opGe, opLt, opLe, opNe, opNin, opLike, opNlike:
return &scanningIndexIterator{
indexKey: indexDataStoreKey,
indexKey: f.newIndexDataStoreKey(),
matchers: matchers,
execInfo: &f.execInfo,
}, nil
}

return nil, errors.New("invalid index filter condition")
return nil, NewErrInvalidFilterOperator(fieldConditions[0].op)
}
Loading

0 comments on commit 86b59c6

Please sign in to comment.