Skip to content

Commit

Permalink
First pass on fetcher refactor with corekv Iterator
Browse files Browse the repository at this point in the history
With the new design, we get to remove the dual Iterator structure
(the first isn't really an iterator, but a handle to create
iterators). Now there is just a single iterator `kvIter`.

We also don't need the previous complex `Iterable` interface that was
defined in the `datastore` package. The corekv Iterator natively
supports both Prefix and Range (start, end) iteration controlled by
the `corekv.IterOptions`.

Again, this is a "plain text" refactor without assuming anything.
  • Loading branch information
jsimnz committed Feb 23, 2024
1 parent de908df commit 99df53a
Showing 1 changed file with 32 additions and 46 deletions.
78 changes: 32 additions & 46 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"github.com/bits-and-blooms/bitset"
dsq "github.com/ipfs/go-datastore/query"

"github.com/sourcenetwork/corekv"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/datastore/iterable"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/planner/mapper"
"github.com/sourcenetwork/defradb/request/graphql/parser"
Expand Down Expand Up @@ -120,8 +120,7 @@ type DocumentFetcher struct {
initialized bool

kv *keyValue
kvIter iterable.Iterator
kvResultsIter dsq.Results
kvIter corekv.Iterator
kvEnd bool
isReadingDocument bool

Expand Down Expand Up @@ -181,14 +180,9 @@ func (df *DocumentFetcher) init(
return ErrMissingMapper
}

if df.kvResultsIter != nil {
if err := df.kvResultsIter.Close(); err != nil {
return err
}
}
df.kvResultsIter = nil
if df.kvIter != nil {
if err := df.kvIter.Close(); err != nil {
// TODO: Fix context callsite
if err := df.kvIter.Close(context.TODO()); err != nil {
return err
}
}
Expand Down Expand Up @@ -296,27 +290,19 @@ func (df *DocumentFetcher) startNextSpan(ctx context.Context) (bool, error) {
}

var err error
if df.kvIter == nil {
df.kvIter, err = df.txn.Datastore().GetIterator(dsq.Query{
Orders: df.order,
})
}
if err != nil {
return false, err
}

if df.kvResultsIter != nil {
err = df.kvResultsIter.Close()
span := df.spans.Value[nextSpanIndex]
if df.kvIter != nil {
err = df.kvIter.Close(ctx)
if err != nil {
return false, err
}
}
df.kvIter = df.txn.Datastore().Iterator(ctx, corekv.IterOptions{
Start: span.Start().Bytes(),
End: span.End().Bytes(),
Reverse: df.reverse,
})

span := df.spans.Value[nextSpanIndex]
df.kvResultsIter, err = df.kvIter.IteratePrefix(ctx, span.Start().ToDS(), span.End().ToDS())
if err != nil {
return false, err
}
df.curSpanIndex = nextSpanIndex

_, _, err = df.nextKey(ctx, false)
Expand Down Expand Up @@ -357,7 +343,7 @@ func (df *DocumentFetcher) nextKey(ctx context.Context, seekNext bool) (spanDone

df.kvEnd = spanDone
if df.kvEnd {
err = df.kvResultsIter.Close()
err = df.kvIter.Close(ctx)
if err != nil {
return false, false, err
}
Expand Down Expand Up @@ -389,7 +375,7 @@ func (df *DocumentFetcher) nextKV() (iterDone bool, kv *keyValue, err error) {

kv = &keyValue{
Key: dsKey,
Value: res.Value,
Value: res.value,
}
return false, kv, nil
}
Expand Down Expand Up @@ -422,32 +408,38 @@ func (df *DocumentFetcher) seekKV(key string) (bool, *keyValue, error) {
// equal or greater (first), return a formatted kv
kv := &keyValue{
Key: dsKey,
Value: res.Value, // @todo make lazy
Value: res.value, // @todo make lazy
}
return false, kv, nil
}
}
}

type result struct {
key []byte
value []byte
}

// nextKV is a lower-level utility compared to nextKey. The differences are as follows:
// - It directly interacts with the KVIterator.
// - Returns true if the entire iterator/span is exhausted
// - Returns a kv pair instead of internally updating
func (df *DocumentFetcher) nextKVRaw() (bool, core.DataStoreKey, dsq.Result, error) {
res, available := df.kvResultsIter.NextSync()
func (df *DocumentFetcher) nextKVRaw() (bool, core.DataStoreKey, result, error) {
df.kvIter.Next()
available := df.kvIter.Valid()
if !available {
return true, core.DataStoreKey{}, res, nil
}
err := res.Error
if err != nil {
return true, core.DataStoreKey{}, res, err
return true, core.DataStoreKey{}, result{}, nil
}

dsKey, err := core.NewDataStoreKey(res.Key)
dsKey, err := core.NewDataStoreKey(string(df.kvIter.Key()))
if err != nil {
return true, core.DataStoreKey{}, res, err
return true, core.DataStoreKey{}, result{}, err
}

res := result{
key: df.kvIter.Key(),
value: df.kvIter.Value(),
}
return false, dsKey, res, nil
}

Expand Down Expand Up @@ -657,14 +649,8 @@ func (df *DocumentFetcher) fetchNext(ctx context.Context) (EncodedDocument, Exec
// Close closes the DocumentFetcher.
func (df *DocumentFetcher) Close() error {
if df.kvIter != nil {
err := df.kvIter.Close()
if err != nil {
return err
}
}

if df.kvResultsIter != nil {
err := df.kvResultsIter.Close()
// TODO WIRE UP CONTEXT
err := df.kvIter.Close(context.TODO())
if err != nil {
return err
}
Expand Down

0 comments on commit 99df53a

Please sign in to comment.