Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 27, 2024
1 parent 32be1e0 commit 1cc8dae
Show file tree
Hide file tree
Showing 7 changed files with 644 additions and 0 deletions.
95 changes: 95 additions & 0 deletions internal/db/fetcher/deleted.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"errors"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/immutable"
)

type deleted struct {
activeFetcher fetcher
activeDocID immutable.Option[string]

deletedFetcher fetcher
deletedDocID immutable.Option[string]

currentFetcher fetcher
}

var _ fetcher = (*deleted)(nil)

func newDeletedFetcher(
activeFetcher fetcher,
deletedFetcher fetcher,
) *deleted {
return &deleted{
activeFetcher: activeFetcher,
deletedFetcher: deletedFetcher,
}
}

func (f *deleted) NextDoc() (immutable.Option[string], error) {
if !f.activeDocID.HasValue() {
var err error
f.activeDocID, err = f.activeFetcher.NextDoc()
if err != nil {
return immutable.None[string](), err
}
}

if !f.deletedDocID.HasValue() {
var err error
f.deletedDocID, err = f.deletedFetcher.NextDoc()
if err != nil {
return immutable.None[string](), err
}
}

if f.deletedDocID.Value() < f.activeDocID.Value() {
f.currentFetcher = f.deletedFetcher
return f.deletedDocID, nil
}

f.currentFetcher = f.activeFetcher
return f.activeDocID, nil
}

func (f *deleted) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) {
doc, err := f.currentFetcher.GetFields(fields...)
if err != nil {
return immutable.None[EncodedDocument](), err
}

if f.activeFetcher == f.currentFetcher {
f.activeDocID = immutable.None[string]()
} else {
f.deletedDocID = immutable.None[string]()
}

return doc, nil
}

func (f *deleted) Close() error {
activeErr := f.activeFetcher.Close()
if activeErr != nil {
deletedErr := f.deletedFetcher.Close()
if deletedErr != nil {
return errors.Join(activeErr, deletedErr)
}

return activeErr
}

return f.deletedFetcher.Close()
}
150 changes: 150 additions & 0 deletions internal/db/fetcher/document.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"context"

dsq "github.com/ipfs/go-datastore/query"
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore/iterable"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/immutable"
)

type document struct {
col client.Collection
fieldsByID map[uint32]client.FieldDefinition

kvResultsIter dsq.Results
currentKV immutable.Option[keyValue]
status client.DocumentStatus
}

var _ fetcher = (*document)(nil)

func newDocumentFetcher(
ctx context.Context,
col client.Collection,
fields []client.FieldDefinition, // todo - rip out (you parameterized this (that was for ACP - might need both - consider ripping out the GetFields param))
kvIter iterable.Iterator,
prefix *keys.DataStoreKey,
status client.DocumentStatus,
) (*document, error) {
if len(fields) == 0 {
fields = col.Definition().GetFields()
}

fieldsByID := make(map[uint32]client.FieldDefinition, len(fields))
for _, field := range fields {
fieldsByID[uint32(field.ID)] = field
}

kvResultsIter, err := kvIter.IteratePrefix(ctx, prefix.ToDS(), prefix.PrefixEnd().ToDS())
if err != nil {
return nil, err
}

return &document{
kvResultsIter: kvResultsIter,
status: status,
}, nil
}

func (f *document) NextDoc() (immutable.Option[string], error) {
if f.currentKV.HasValue() {
docID := f.currentKV.Value().Key.DocID
f.currentKV = immutable.None[keyValue]()
return immutable.Some(docID), nil
}

res, ok := f.kvResultsIter.NextSync()
if res.Error != nil {
return immutable.None[string](), res.Error
}
if !ok {
return immutable.None[string](), nil
}

dsKey, err := keys.NewDataStoreKey(res.Key)
if err != nil {
return immutable.None[string](), err
}

f.currentKV = immutable.Some(keyValue{
Key: dsKey,
Value: res.Value,
})

return immutable.Some(dsKey.DocID), nil
}

func (f *document) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) {
results := make([]keyValue, 0, len(f.col.Schema().Fields))
results = append(results, f.currentKV.Value())

res, ok := f.kvResultsIter.NextSync()
for ok {
if !ok {
break
}

dsKey, err := keys.NewDataStoreKey(res.Key)
if err != nil {
return immutable.None[EncodedDocument](), err
}

kv := keyValue{
Key: dsKey,
Value: res.Value,
}

if dsKey.DocID != f.currentKV.Value().Key.DocID {
f.currentKV = immutable.Some(kv)
break
}

results = append(results, kv)
res, ok = f.kvResultsIter.NextSync()
}

encodedDoc := encodedDocument{}
encodedDoc.id = []byte(results[0].Key.DocID)
encodedDoc.status = f.status

for _, r := range results {
if r.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID {
encodedDoc.schemaVersionID = string(r.Value)
continue
}

fieldID, err := r.Key.FieldIDAsUint()
if err != nil {
return immutable.None[EncodedDocument](), err
}

fieldDesc, ok := f.fieldsByID[fieldID]
if !ok {
return immutable.None[EncodedDocument](), nil
}

encodedDoc.properties[fieldDesc] = &encProperty{
Desc: fieldDesc,
Raw: r.Value,
}
}

return immutable.Some[EncodedDocument](&encodedDoc), nil
}

func (f *document) Close() error {
return f.kvResultsIter.Close()
}
6 changes: 6 additions & 0 deletions internal/db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ type Fetcher interface {
Close() error
}

type fetcher interface {
NextDoc() (immutable.Option[string], error)
GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error)
Close() error
}

// keyValue is a KV store response containing the resulting core.Key and byte array value.
type keyValue struct {
Key keys.DataStoreKey
Expand Down
75 changes: 75 additions & 0 deletions internal/db/fetcher/filtered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/planner/mapper"
)

type filtered struct {
filter *mapper.Filter
mapping *core.DocumentMapping

fetcher fetcher
}

var _ fetcher = (*filtered)(nil)

func newFilteredFetcher(
filter *mapper.Filter,
mapping *core.DocumentMapping,
fetcher fetcher,
) *filtered {
return &filtered{
filter: filter,
mapping: mapping,
fetcher: fetcher,
}
}

func (f *filtered) NextDoc() (immutable.Option[string], error) {
return f.fetcher.NextDoc()
}

func (f *filtered) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) {
doc, err := f.fetcher.GetFields(fields...)
if err != nil {
return immutable.None[EncodedDocument](), err
}

if !doc.HasValue() {
return immutable.None[EncodedDocument](), nil
}

decodedDoc, err := DecodeToDoc(doc.Value(), f.mapping, false)
if err != nil {
return immutable.None[EncodedDocument](), err
}

passedFilter, err := mapper.RunFilter(decodedDoc, f.filter)
if err != nil {
return immutable.None[EncodedDocument](), err
}

if !passedFilter {
return immutable.None[EncodedDocument](), nil
}

return doc, nil
}

func (f *filtered) Close() error {
return f.fetcher.Close()
}
Loading

0 comments on commit 1cc8dae

Please sign in to comment.