From 99df53afc048fd5b0041c65f2bec86a21dfbca6b Mon Sep 17 00:00:00 2001 From: John-Alan Simmons Date: Thu, 22 Feb 2024 23:08:55 -0500 Subject: [PATCH] First pass on fetcher refactor with corekv Iterator With the new design, we get to remove the dual Iterator structure (the first isn't really an iterator, but a handle to create iterators). Now there is just a single iterator `kvIter`. We also don't need the previous complex `Iterable` interface that was defined in the `datastore` package. The corekv Iterator natively supports both Prefix and Range (start, end) iteration controlled by the `corekv.IterOptions`. Again, this is a "plain text" refactor without assuming anything. --- db/fetcher/fetcher.go | 78 ++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 46 deletions(-) diff --git a/db/fetcher/fetcher.go b/db/fetcher/fetcher.go index da7a0df1e1..c6876d218e 100644 --- a/db/fetcher/fetcher.go +++ b/db/fetcher/fetcher.go @@ -18,10 +18,10 @@ import ( "github.com/bits-and-blooms/bitset" dsq "github.com/ipfs/go-datastore/query" + "github.com/sourcenetwork/corekv" "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/datastore/iterable" "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/planner/mapper" "github.com/sourcenetwork/defradb/request/graphql/parser" @@ -120,8 +120,7 @@ type DocumentFetcher struct { initialized bool kv *keyValue - kvIter iterable.Iterator - kvResultsIter dsq.Results + kvIter corekv.Iterator kvEnd bool isReadingDocument bool @@ -181,14 +180,9 @@ func (df *DocumentFetcher) init( return ErrMissingMapper } - if df.kvResultsIter != nil { - if err := df.kvResultsIter.Close(); err != nil { - return err - } - } - df.kvResultsIter = nil if df.kvIter != nil { - if err := df.kvIter.Close(); err != nil { + // TODO: Fix context callsite + if err := df.kvIter.Close(context.TODO()); err != nil { return err } } @@ -296,27 +290,19 @@ func (df *DocumentFetcher) startNextSpan(ctx context.Context) (bool, error) { } var err error - if df.kvIter == nil { - df.kvIter, err = df.txn.Datastore().GetIterator(dsq.Query{ - Orders: df.order, - }) - } - if err != nil { - return false, err - } - - if df.kvResultsIter != nil { - err = df.kvResultsIter.Close() + span := df.spans.Value[nextSpanIndex] + if df.kvIter != nil { + err = df.kvIter.Close(ctx) if err != nil { return false, err } } + df.kvIter = df.txn.Datastore().Iterator(ctx, corekv.IterOptions{ + Start: span.Start().Bytes(), + End: span.End().Bytes(), + Reverse: df.reverse, + }) - span := df.spans.Value[nextSpanIndex] - df.kvResultsIter, err = df.kvIter.IteratePrefix(ctx, span.Start().ToDS(), span.End().ToDS()) - if err != nil { - return false, err - } df.curSpanIndex = nextSpanIndex _, _, err = df.nextKey(ctx, false) @@ -357,7 +343,7 @@ func (df *DocumentFetcher) nextKey(ctx context.Context, seekNext bool) (spanDone df.kvEnd = spanDone if df.kvEnd { - err = df.kvResultsIter.Close() + err = df.kvIter.Close(ctx) if err != nil { return false, false, err } @@ -389,7 +375,7 @@ func (df *DocumentFetcher) nextKV() (iterDone bool, kv *keyValue, err error) { kv = &keyValue{ Key: dsKey, - Value: res.Value, + Value: res.value, } return false, kv, nil } @@ -422,32 +408,38 @@ func (df *DocumentFetcher) seekKV(key string) (bool, *keyValue, error) { // equal or greater (first), return a formatted kv kv := &keyValue{ Key: dsKey, - Value: res.Value, // @todo make lazy + Value: res.value, // @todo make lazy } return false, kv, nil } } } +type result struct { + key []byte + value []byte +} + // nextKV is a lower-level utility compared to nextKey. The differences are as follows: // - It directly interacts with the KVIterator. // - Returns true if the entire iterator/span is exhausted // - Returns a kv pair instead of internally updating -func (df *DocumentFetcher) nextKVRaw() (bool, core.DataStoreKey, dsq.Result, error) { - res, available := df.kvResultsIter.NextSync() +func (df *DocumentFetcher) nextKVRaw() (bool, core.DataStoreKey, result, error) { + df.kvIter.Next() + available := df.kvIter.Valid() if !available { - return true, core.DataStoreKey{}, res, nil - } - err := res.Error - if err != nil { - return true, core.DataStoreKey{}, res, err + return true, core.DataStoreKey{}, result{}, nil } - dsKey, err := core.NewDataStoreKey(res.Key) + dsKey, err := core.NewDataStoreKey(string(df.kvIter.Key())) if err != nil { - return true, core.DataStoreKey{}, res, err + return true, core.DataStoreKey{}, result{}, err } + res := result{ + key: df.kvIter.Key(), + value: df.kvIter.Value(), + } return false, dsKey, res, nil } @@ -657,14 +649,8 @@ func (df *DocumentFetcher) fetchNext(ctx context.Context) (EncodedDocument, Exec // Close closes the DocumentFetcher. func (df *DocumentFetcher) Close() error { if df.kvIter != nil { - err := df.kvIter.Close() - if err != nil { - return err - } - } - - if df.kvResultsIter != nil { - err := df.kvResultsIter.Close() + // TODO WIRE UP CONTEXT + err := df.kvIter.Close(context.TODO()) if err != nil { return err }