Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 28, 2024
1 parent 32be1e0 commit 4279b43
Show file tree
Hide file tree
Showing 21 changed files with 781 additions and 686 deletions.
2 changes: 1 addition & 1 deletion internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (c *collection) newFetcher() fetcher.Fetcher {
if c.fetcherFactory != nil {
innerFetcher = c.fetcherFactory()
} else {
innerFetcher = new(fetcher.DocumentFetcher)
innerFetcher = fetcher.NewDocumentFetcher()
}

return lens.NewFetcher(innerFetcher, c.db.LensRegistry())
Expand Down
94 changes: 94 additions & 0 deletions internal/db/fetcher/deleted.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"errors"

"github.com/sourcenetwork/immutable"
)

type deleted struct {
activeFetcher fetcher
activeDocID immutable.Option[string]

deletedFetcher fetcher
deletedDocID immutable.Option[string]

currentFetcher fetcher
}

var _ fetcher = (*deleted)(nil)

func newDeletedFetcher(
activeFetcher fetcher,
deletedFetcher fetcher,
) *deleted {
return &deleted{
activeFetcher: activeFetcher,
deletedFetcher: deletedFetcher,
}
}

func (f *deleted) NextDoc() (immutable.Option[string], error) {
if !f.activeDocID.HasValue() {
var err error
f.activeDocID, err = f.activeFetcher.NextDoc()
if err != nil {
return immutable.None[string](), err
}
}

if !f.deletedDocID.HasValue() {
var err error
f.deletedDocID, err = f.deletedFetcher.NextDoc()
if err != nil {
return immutable.None[string](), err
}
}

if !f.activeDocID.HasValue() || (f.deletedDocID.HasValue() && f.deletedDocID.Value() < f.activeDocID.Value()) {
f.currentFetcher = f.deletedFetcher
return f.deletedDocID, nil
}

f.currentFetcher = f.activeFetcher
return f.activeDocID, nil
}

func (f *deleted) GetFields() (immutable.Option[EncodedDocument], error) {
doc, err := f.currentFetcher.GetFields()
if err != nil {
return immutable.None[EncodedDocument](), err
}

if f.activeFetcher == f.currentFetcher {
f.activeDocID = immutable.None[string]()
} else {
f.deletedDocID = immutable.None[string]()
}

return doc, nil
}

func (f *deleted) Close() error {
activeErr := f.activeFetcher.Close()
if activeErr != nil {
deletedErr := f.deletedFetcher.Close()
if deletedErr != nil {
return errors.Join(activeErr, deletedErr)
}

return activeErr
}

return f.deletedFetcher.Close()
}
198 changes: 198 additions & 0 deletions internal/db/fetcher/document.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"bytes"
"context"

dsq "github.com/ipfs/go-datastore/query"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore/iterable"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/immutable"
)

type document struct {
col client.Collection
fieldsByID map[uint32]client.FieldDefinition

kvResultsIter dsq.Results
currentKV immutable.Option[keyValue]
nextKV immutable.Option[keyValue]
status client.DocumentStatus

execInfo *ExecInfo
}

var _ fetcher = (*document)(nil)

func newDocumentFetcher(
ctx context.Context,
col client.Collection,
fields []client.FieldDefinition, // todo - rip out (you parameterized this (that was for ACP - might need both - consider ripping out the GetFields param))
kvIter iterable.Iterator,
prefix *keys.DataStoreKey,
status client.DocumentStatus,
execInfo *ExecInfo,
) (*document, error) {
if len(fields) == 0 {
fields = col.Definition().GetFields()
}

fieldsByID := make(map[uint32]client.FieldDefinition, len(fields))
for _, field := range fields {
fieldsByID[uint32(field.ID)] = field
}

if status == client.Active {
p := prefix.WithValueFlag()
prefix = &p
} else if status == client.Deleted {
p := prefix.WithDeletedFlag()
prefix = &p
}

kvResultsIter, err := kvIter.IteratePrefix(ctx, prefix.ToDS(), prefix.PrefixEnd().ToDS())
if err != nil {
return nil, err
}

return &document{
col: col,
fieldsByID: fieldsByID,
kvResultsIter: kvResultsIter,
status: status,
execInfo: execInfo,
}, nil
}

// keyValue is a KV store response containing the resulting core.Key and byte array value.
type keyValue struct {
Key keys.DataStoreKey
Value []byte
}

func (f *document) NextDoc() (immutable.Option[string], error) {
if f.nextKV.HasValue() {
docID := f.nextKV.Value().Key.DocID
f.currentKV = f.nextKV

f.nextKV = immutable.None[keyValue]()
f.execInfo.DocsFetched++

return immutable.Some(docID), nil
}

for {
res, ok := f.kvResultsIter.NextSync()
if res.Error != nil {
return immutable.None[string](), res.Error
}
if !ok {
return immutable.None[string](), nil
}

dsKey, err := keys.NewDataStoreKey(res.Key)
if err != nil {
return immutable.None[string](), err
}

if dsKey.DocID != f.currentKV.Value().Key.DocID {
f.currentKV = immutable.Some(keyValue{
Key: dsKey,
Value: res.Value,
})
break
}
}

f.execInfo.DocsFetched++

return immutable.Some(f.currentKV.Value().Key.DocID), nil
}

func (f *document) GetFields() (immutable.Option[EncodedDocument], error) {
doc := encodedDocument{}
doc.id = []byte(f.currentKV.Value().Key.DocID)
doc.status = f.status
doc.properties = map[client.FieldDefinition]*encProperty{}

err := f.appendKv(&doc, f.currentKV.Value())
if err != nil {
return immutable.None[EncodedDocument](), err
}

for {
res, ok := f.kvResultsIter.NextSync()
if !ok {
break
}

dsKey, err := keys.NewDataStoreKey(res.Key)
if err != nil {
return immutable.None[EncodedDocument](), err
}

kv := keyValue{
Key: dsKey,
Value: res.Value,
}

if dsKey.DocID != f.currentKV.Value().Key.DocID {
f.nextKV = immutable.Some(kv)
break
}

err = f.appendKv(&doc, kv)
if err != nil {
return immutable.None[EncodedDocument](), err
}
}

return immutable.Some[EncodedDocument](&doc), nil
}

func (f *document) appendKv(doc *encodedDocument, kv keyValue) error {
if kv.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID {
doc.schemaVersionID = string(kv.Value)
return nil
}

// we have to skip the object marker
if bytes.Equal(kv.Value, []byte{base.ObjectMarker}) {
return nil
}

fieldID, err := kv.Key.FieldIDAsUint()
if err != nil {
return err
}

fieldDesc, ok := f.fieldsByID[fieldID]
if !ok {
return nil
}

f.execInfo.FieldsFetched++

doc.properties[fieldDesc] = &encProperty{
Desc: fieldDesc,
Raw: kv.Value,
}

return nil
}

func (f *document) Close() error {
return f.kvResultsIter.Close()
}
Loading

0 comments on commit 4279b43

Please sign in to comment.