Skip to content

Commit

Permalink
Cleanup the remaining fetchers.
Browse files Browse the repository at this point in the history
NOTE: There is a filter in the indexer iterator file that hasn't been
included, again focusing on the "plain text" refactoring.
  • Loading branch information
jsimnz committed Oct 1, 2024
1 parent 99df53a commit 244beeb
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 52 deletions.
34 changes: 11 additions & 23 deletions db/fetcher/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"strings"

"github.com/ipfs/go-cid"
dsq "github.com/ipfs/go-datastore/query"
"github.com/sourcenetwork/corekv"
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/core"
Expand All @@ -28,7 +28,7 @@ type HeadFetcher struct {
spans core.Spans
fieldId immutable.Option[string]

kvIter dsq.Results
kvIter corekv.Iterator
}

func (hf *HeadFetcher) Start(
Expand Down Expand Up @@ -59,35 +59,26 @@ func (hf *HeadFetcher) Start(
hf.spans = spans
hf.fieldId = fieldId

q := dsq.Query{
Prefix: hf.spans.Value[0].Start().ToString(),
Orders: []dsq.Order{dsq.OrderByKey{}},
}

var err error
if hf.kvIter != nil {
if err := hf.kvIter.Close(); err != nil {
if err := hf.kvIter.Close(ctx); err != nil {
return err
}
}
hf.kvIter, err = txn.Headstore().Query(ctx, q)
if err != nil {
return err
}
hf.kvIter = txn.Headstore().Iterator(ctx, corekv.IterOptions{
Prefix: hf.spans.Value[0].Start().Bytes(),
})

return nil
}

func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
res, available := hf.kvIter.NextSync()
if res.Error != nil {
return nil, res.Error
}
hf.kvIter.Next()
available := hf.kvIter.Valid()
if !available {
return nil, nil
}

headStoreKey, err := core.NewHeadStoreKey(res.Key)
headStoreKey, err := core.NewHeadStoreKey(string(hf.kvIter.Key()))
if err != nil {
return nil, err
}
Expand All @@ -101,9 +92,6 @@ func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
}

func (hf *HeadFetcher) Close() error {
if hf.kvIter == nil {
return nil
}

return hf.kvIter.Close()
hf.kvIter.Close(context.TODO())
return nil // clean up
}
41 changes: 21 additions & 20 deletions db/fetcher/indexer_iterators.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/fxamacker/cbor/v2"

"github.com/sourcenetwork/corekv"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/connor"
"github.com/sourcenetwork/defradb/core"
Expand Down Expand Up @@ -56,26 +57,30 @@ type indexIterResult struct {
}

type queryResultIterator struct {
resultIter query.Results
resultIter corekv.Iterator
}

func (i *queryResultIterator) Next() (indexIterResult, error) {
res, hasVal := i.resultIter.NextSync()
if res.Error != nil {
return indexIterResult{}, res.Error
}
i.resultIter.Next()
hasVal := i.resultIter.Valid()
if !hasVal {
return indexIterResult{}, nil
}
key, err := core.NewIndexDataStoreKey(res.Key)

key, err := core.NewIndexDataStoreKey(string(i.resultIter.Key()))
if err != nil {
return indexIterResult{}, err
}
return indexIterResult{key: key, value: res.Value, foundKey: true}, nil
return indexIterResult{
key: key,
value: i.resultIter.Value(),
foundKey: true,
}, nil
}

func (i *queryResultIterator) Close() error {
return i.resultIter.Close()
// ATTENTION WIRE UP CONTEXT
return i.resultIter.Close(context.TODO())
}

type eqPrefixIndexIterator struct {
Expand All @@ -88,12 +93,9 @@ type eqPrefixIndexIterator struct {

func (i *eqPrefixIndexIterator) Init(ctx context.Context, store datastore.DSReaderWriter) error {
i.indexKey.FieldValues = [][]byte{i.value}
resultIter, err := store.Query(ctx, query.Query{
Prefix: i.indexKey.ToString(),
resultIter := store.Iterator(ctx, corekv.IterOptions{
Prefix: i.indexKey.Bytes(),
})
if err != nil {
return err
}
i.resultIter = resultIter
return nil
}
Expand Down Expand Up @@ -139,7 +141,7 @@ func (i *eqSingleIndexIterator) Next() (indexIterResult, error) {
return indexIterResult{}, nil
}
i.indexKey.FieldValues = [][]byte{i.value}
val, err := i.store.Get(i.ctx, i.indexKey.ToDS())
val, err := i.store.Get(i.ctx, i.indexKey.Bytes())
if err != nil {
return indexIterResult{}, err
}
Expand Down Expand Up @@ -267,13 +269,12 @@ type scanningIndexIterator struct {
func (i *scanningIndexIterator) Init(ctx context.Context, store datastore.DSReaderWriter) error {
i.filter.matcher = &execInfoIndexMatcherDecorator{matcher: i.matcher, execInfo: i.execInfo}

iter, err := store.Query(ctx, query.Query{
Prefix: i.indexKey.ToString(),
Filters: []query.Filter{&i.filter},
// ATTENTION: DURING MY PLAIN TEXT REFACTOR
// I AM NOT INCLUDING THIS FILTER/MATCHER
// SYSTEM. WILL LOOK INTO IT.
iter := store.Iterator(ctx, corekv.IterOptions{
Prefix: i.indexKey.Bytes(),
})
if err != nil {
return err
}
i.resultIter = iter

return nil
Expand Down
14 changes: 5 additions & 9 deletions db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import (

dag "github.com/ipfs/boxo/ipld/merkledag"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
format "github.com/ipfs/go-ipld-format"
"github.com/sourcenetwork/corekv/memory"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/datastore/memory"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/errors"
merklecrdt "github.com/sourcenetwork/defradb/merkle/crdt"
Expand Down Expand Up @@ -173,9 +172,9 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans core.Spans) error {
}

// Rootstore returns the rootstore of the VersionedFetcher.
func (vf *VersionedFetcher) Rootstore() ds.Datastore {
return vf.root
}
// func (vf *VersionedFetcher) Rootstore() ds.Datastore {
// return vf.root
// }

// Start a fetcher with the needed info (cid embedded in a span)

Expand Down Expand Up @@ -421,10 +420,7 @@ func (vf *VersionedFetcher) getDAGNode(c cid.Cid) (*dag.ProtoNode, error) {

// Close closes the VersionedFetcher.
func (vf *VersionedFetcher) Close() error {
if err := vf.root.Close(); err != nil {
return err
}

vf.root.Close()
return vf.DocumentFetcher.Close()
}

Expand Down

0 comments on commit 244beeb

Please sign in to comment.