Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Multiple docs with nil value on unique-indexed field #2276

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions client/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package client

import (
"encoding/json"
"errors"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -468,6 +469,16 @@ func (doc *Document) GetValue(field string) (*FieldValue, error) {
}
}

// TryGetValue returns the value for a given field, if it exists.
// If the field does not exist then return nil and an error.
func (doc *Document) TryGetValue(field string) (*FieldValue, error) {
val, err := doc.GetValue(field)
if err != nil && errors.Is(err, ErrFieldNotExist) {
return nil, nil
}
return val, err
}

// GetValueWithField gets the Value type from a given Field type
func (doc *Document) GetValueWithField(f Field) (*FieldValue, error) {
doc.mu.RLock()
Expand Down
9 changes: 7 additions & 2 deletions db/fetcher/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
errFailedToGetDagNode string = "failed to get DAG Node"
errMissingMapper string = "missing document mapper"
errInvalidInOperatorValue string = "invalid _in/_nin value"
errInvalidIndexFilterCondition string = "invalid index filter condition"
errInvalidFilterOperator string = "invalid filter operator is provided"
)

var (
Expand All @@ -44,7 +44,7 @@
ErrMissingMapper = errors.New(errMissingMapper)
ErrSingleSpanOnly = errors.New("spans must contain only a single entry")
ErrInvalidInOperatorValue = errors.New(errInvalidInOperatorValue)
ErrInvalidIndexFilterCondition = errors.New(errInvalidIndexFilterCondition)
ErrInvalidFilterOperator = errors.New(errInvalidFilterOperator)
)

// NewErrFieldIdNotFound returns an error indicating that the given FieldId was not found.
Expand Down Expand Up @@ -97,3 +97,8 @@
func NewErrFailedToGetDagNode(inner error) error {
return errors.Wrap(errFailedToGetDagNode, inner)
}

// NewErrInvalidFilterOperator returns an error indicating that the given filter operator is invalid.
func NewErrInvalidFilterOperator(operator string) error {
return errors.New(errInvalidFilterOperator, errors.NewKV("Operator", operator))

Check warning on line 103 in db/fetcher/errors.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/errors.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}
11 changes: 8 additions & 3 deletions db/fetcher/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,23 +125,28 @@ func (f *IndexFetcher) FetchNext(ctx context.Context) (EncodedDocument, ExecInfo
return nil, f.execInfo, nil
}

// This CBOR-specific value will be gone soon once we implement
// our own encryption package
hasNilField := false
const cborNil = 0xf6
for i, indexedField := range f.indexedFields {
property := &encProperty{
Desc: indexedField,
Raw: res.key.FieldValues[i],
}
if len(res.key.FieldValues[i]) == 1 && res.key.FieldValues[i][0] == cborNil {
hasNilField = true
}

f.doc.properties[indexedField] = property
}

if f.indexDesc.Unique {
if f.indexDesc.Unique && !hasNilField {
f.doc.id = res.value
} else {
f.doc.id = res.key.FieldValues[len(res.key.FieldValues)-1]
}

f.execInfo.FieldsFetched++

if f.docFetcher != nil && len(f.docFields) > 0 {
targetKey := base.MakeDataStoreKeyWithCollectionAndDocID(f.col.Description(), string(f.doc.id))
spans := core.NewSpans(core.NewSpan(targetKey, targetKey.PrefixEnd()))
Expand Down
204 changes: 121 additions & 83 deletions db/fetcher/indexer_iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,14 @@
}

type eqPrefixIndexIterator struct {
indexKey core.IndexDataStoreKey
keyFieldValue []byte
execInfo *ExecInfo
matchers []valueMatcher
indexKey core.IndexDataStoreKey
execInfo *ExecInfo
matchers []valueMatcher

queryResultIterator
}

func (i *eqPrefixIndexIterator) Init(ctx context.Context, store datastore.DSReaderWriter) error {
i.indexKey.FieldValues = [][]byte{i.keyFieldValue}
resultIter, err := store.Query(ctx, query.Query{
Prefix: i.indexKey.ToString(),
})
Expand Down Expand Up @@ -166,11 +164,11 @@

type inIndexIterator struct {
indexIterator
keyFieldValues [][]byte
nextValIndex int
ctx context.Context
store datastore.DSReaderWriter
hasIterator bool
inValues [][]byte
nextValIndex int
ctx context.Context
store datastore.DSReaderWriter
hasIterator bool
}

func (i *inIndexIterator) nextIterator() (bool, error) {
Expand All @@ -181,15 +179,15 @@
}
}

if i.nextValIndex >= len(i.keyFieldValues) {
if i.nextValIndex >= len(i.inValues) {
return false, nil
}

switch fieldIter := i.indexIterator.(type) {
case *eqPrefixIndexIterator:
fieldIter.keyFieldValue = i.keyFieldValues[i.nextValIndex]
fieldIter.indexKey.FieldValues[0] = i.inValues[i.nextValIndex]
case *eqSingleIndexIterator:
fieldIter.keyFieldValues[0] = i.keyFieldValues[i.nextValIndex]
fieldIter.keyFieldValues[0] = i.inValues[i.nextValIndex]
}
err := i.indexIterator.Init(i.ctx, i.store)
if err != nil {
Expand Down Expand Up @@ -443,7 +441,7 @@
return &anyMatcher{}, nil
}

return nil, ErrInvalidIndexFilterCondition
return nil, NewErrInvalidFilterOperator(op)

Check warning on line 444 in db/fetcher/indexer_iterators.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/indexer_iterators.go#L444

Added line #L444 was not covered by tests
}

func createValueMatchers(conditions []fieldFilterCond) ([]valueMatcher, error) {
Expand Down Expand Up @@ -494,15 +492,20 @@

// isUniqueFetchByFullKey checks if the only index key can be fetched by the full index key.
//
// This method ignores the first condition because it's expected to be called only
// This method ignores the first condition (unless it's nil) because it's expected to be called only
// when the first field is used as a prefix in the index key. So we only check if the
// rest of the conditions are _eq.
func isUniqueFetchByFullKey(indexDesc *client.IndexDescription, conditions []fieldFilterCond) bool {
// we need to check length of conditions because full key fetch is only possible
// if all fields are specified in the filter
// if all fields of the index are specified in the filter
res := indexDesc.Unique && len(conditions) == len(indexDesc.Fields)

// first condition is not required to be _eq, but if is, val must be not nil
res = res && (conditions[0].op != opEq || conditions[0].val != nil)

// for the rest it must be _eq and val must be not nil
for i := 1; i < len(conditions); i++ {
res = res && conditions[i].op == opEq
res = res && (conditions[i].op == opEq && conditions[i].val != nil)
}
return res
}
Expand All @@ -520,9 +523,104 @@
return result, nil
}

// newPrefixIndexIterator creates a new eqPrefixIndexIterator for fetching indexed data.
// It can modify the input matchers slice.
func (f *IndexFetcher) newPrefixIndexIterator(
fieldConditions []fieldFilterCond,
matchers []valueMatcher,
) (*eqPrefixIndexIterator, error) {
keyFieldValues := make([][]byte, 0, len(fieldConditions))
for i := range fieldConditions {
if fieldConditions[i].op != opEq {
// prefix can be created only for subsequent _eq conditions
// if we encounter any other condition, we built the longest prefix we could
break
islamaliev marked this conversation as resolved.
Show resolved Hide resolved
}

fieldVal := client.NewFieldValue(client.LWW_REGISTER, fieldConditions[i].val)

keyValueBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}

Check warning on line 545 in db/fetcher/indexer_iterators.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/indexer_iterators.go#L544-L545

Added lines #L544 - L545 were not covered by tests

keyFieldValues = append(keyFieldValues, keyValueBytes)
}

// iterators for _eq filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
}

indexKey := f.newIndexDataStoreKey()
indexKey.FieldValues = keyFieldValues
return &eqPrefixIndexIterator{
indexKey: indexKey,
execInfo: &f.execInfo,
matchers: matchers,
}, nil
}

// newInIndexIterator creates a new inIndexIterator for fetching indexed data.
// It can modify the input matchers slice.
func (f *IndexFetcher) newInIndexIterator(
fieldConditions []fieldFilterCond,
matchers []valueMatcher,
) (*inIndexIterator, error) {
inArr, ok := fieldConditions[0].val.([]any)
if !ok {
return nil, ErrInvalidInOperatorValue
}

Check warning on line 574 in db/fetcher/indexer_iterators.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/indexer_iterators.go#L573-L574

Added lines #L573 - L574 were not covered by tests
keyFieldArr := make([][]byte, 0, len(inArr))
for _, v := range inArr {
fieldVal := client.NewFieldValue(client.LWW_REGISTER, v)
keyFieldBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}

Check warning on line 581 in db/fetcher/indexer_iterators.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/indexer_iterators.go#L580-L581

Added lines #L580 - L581 were not covered by tests
keyFieldArr = append(keyFieldArr, keyFieldBytes)
}

// iterators for _in filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
islamaliev marked this conversation as resolved.
Show resolved Hide resolved
}

var iter indexIterator
if isUniqueFetchByFullKey(&f.indexDesc, fieldConditions) {
keyFieldValues, e := getFieldsBytes(fieldConditions[1:])
if e != nil {
return nil, e
}

Check warning on line 596 in db/fetcher/indexer_iterators.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/indexer_iterators.go#L595-L596

Added lines #L595 - L596 were not covered by tests
keyFieldValues = append([][]byte{{}}, keyFieldValues...)
iter = &eqSingleIndexIterator{
indexKey: f.newIndexDataStoreKey(),
execInfo: &f.execInfo,
keyFieldValues: keyFieldValues,
}
} else {
indexKey := f.newIndexDataStoreKey()
indexKey.FieldValues = [][]byte{{}}
iter = &eqPrefixIndexIterator{
indexKey: indexKey,
execInfo: &f.execInfo,
matchers: matchers,
}
}
return &inIndexIterator{
indexIterator: iter,
inValues: keyFieldArr,
}, nil
}

func (f *IndexFetcher) newIndexDataStoreKey() core.IndexDataStoreKey {
return core.IndexDataStoreKey{CollectionID: f.col.ID(), IndexID: f.indexDesc.ID}
}

func (f *IndexFetcher) createIndexIterator() (indexIterator, error) {
fieldConditions := f.determineFieldFilterConditions()
indexDataStoreKey := core.IndexDataStoreKey{CollectionID: f.col.ID(), IndexID: f.indexDesc.ID}

matchers, err := createValueMatchers(fieldConditions)
if err != nil {
Expand All @@ -537,82 +635,22 @@
return nil, err
}
return &eqSingleIndexIterator{
indexKey: indexDataStoreKey,
indexKey: f.newIndexDataStoreKey(),
keyFieldValues: keyFieldsBytes,
execInfo: &f.execInfo,
}, nil
} else {
fieldVal := client.NewFieldValue(client.LWW_REGISTER, fieldConditions[0].val)

keyValueBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}

// iterators for _eq filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
}

return &eqPrefixIndexIterator{
indexKey: indexDataStoreKey,
keyFieldValue: keyValueBytes,
execInfo: &f.execInfo,
matchers: matchers,
}, nil
return f.newPrefixIndexIterator(fieldConditions, matchers)
}
case opIn:
inArr, ok := fieldConditions[0].val.([]any)
if !ok {
return nil, errors.New("invalid _in/_nin value")
}
keyFieldArr := make([][]byte, 0, len(inArr))
for _, v := range inArr {
fieldVal := client.NewFieldValue(client.LWW_REGISTER, v)
keyFieldBytes, err := fieldVal.Bytes()
if err != nil {
return nil, err
}
keyFieldArr = append(keyFieldArr, keyFieldBytes)
}

// iterators for _in filter already iterate over keys with first field value
// matching the filter value, so we can skip the first matcher
if len(matchers) > 1 {
matchers[0] = &anyMatcher{}
}

var iter indexIterator
if isUniqueFetchByFullKey(&f.indexDesc, fieldConditions) {
restFieldsVals, e := getFieldsBytes(fieldConditions[1:])
if e != nil {
return nil, e
}
restFieldsVals = append([][]byte{{}}, restFieldsVals...)
iter = &eqSingleIndexIterator{
indexKey: indexDataStoreKey,
execInfo: &f.execInfo,
keyFieldValues: restFieldsVals,
}
} else {
iter = &eqPrefixIndexIterator{
indexKey: indexDataStoreKey,
execInfo: &f.execInfo,
matchers: matchers,
}
}
return &inIndexIterator{
indexIterator: iter,
keyFieldValues: keyFieldArr,
}, nil
return f.newInIndexIterator(fieldConditions, matchers)
case opGt, opGe, opLt, opLe, opNe, opNin, opLike, opNlike:
return &scanningIndexIterator{
indexKey: indexDataStoreKey,
indexKey: f.newIndexDataStoreKey(),
matchers: matchers,
execInfo: &f.execInfo,
}, nil
}

return nil, errors.New("invalid index filter condition")
return nil, NewErrInvalidFilterOperator(fieldConditions[0].op)

Check warning on line 655 in db/fetcher/indexer_iterators.go

View check run for this annotation

Codecov / codecov/patch

db/fetcher/indexer_iterators.go#L655

Added line #L655 was not covered by tests
}
Loading
Loading