diff --git a/internal/db/fetcher/deleted.go b/internal/db/fetcher/deleted.go new file mode 100644 index 0000000000..d74ce3c674 --- /dev/null +++ b/internal/db/fetcher/deleted.go @@ -0,0 +1,74 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/immutable" +) + +type deleted struct { + activeFetcher fetcher + activeDocID immutable.Option[string] + + deletedFetcher fetcher + deletedDocID immutable.Option[string] + + currentFetcher fetcher +} + +var _ fetcher = (*deleted)(nil) + +func (f *deleted) NextDoc() (immutable.Option[string], error) { + if !f.activeDocID.HasValue() { + var err error + f.activeDocID, err = f.activeFetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + } + + if !f.deletedDocID.HasValue() { + var err error + f.deletedDocID, err = f.deletedFetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + } + + if f.deletedDocID.Value() < f.activeDocID.Value() { + f.currentFetcher = f.deletedFetcher + return f.deletedDocID, nil + } + + f.currentFetcher = f.activeFetcher + return f.activeDocID, nil +} + +func (f *deleted) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + doc, err := f.currentFetcher.GetFields(fields...) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if f.activeFetcher == f.currentFetcher { + f.activeDocID = immutable.None[string]() + } else { + f.deletedDocID = immutable.None[string]() + } + + return doc, nil +} + +func (f *deleted) Close() { + f.activeFetcher.Close() + f.deletedFetcher.Close() +} diff --git a/internal/db/fetcher/document.go b/internal/db/fetcher/document.go new file mode 100644 index 0000000000..93b188e9e4 --- /dev/null +++ b/internal/db/fetcher/document.go @@ -0,0 +1,148 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + dsq "github.com/ipfs/go-datastore/query" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore/iterable" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/immutable" +) + +type document struct { + col client.Collection + fieldsByID map[uint32]client.FieldDefinition + + kvResultsIter dsq.Results + currentKV immutable.Option[keyValue] + status client.DocumentStatus +} + +var _ fetcher = (*document)(nil) + +func newDocumentFetcher( + ctx context.Context, + col client.Collection, + fields []client.FieldDefinition, // todo - rip out (you parameterized this (that was for ACP - might need both - consider ripping out the GetFields param)) + kvIter iterable.Iterator, + prefix keys.DataStoreKey, +) (*document, error) { + if len(fields) == 0 { + fields = col.Definition().GetFields() + } + + fieldsByID := make(map[uint32]client.FieldDefinition, len(fields)) + for _, field := range fields { + fieldsByID[uint32(field.ID)] = field + } + + kvResultsIter, err := kvIter.IteratePrefix(ctx, prefix.ToDS(), prefix.PrefixEnd().ToDS()) + if err != nil { + return nil, err + } + + return &document{ + kvResultsIter: kvResultsIter, + }, nil +} + +func (f *document) NextDoc() (immutable.Option[string], error) { + if f.currentKV.HasValue() { + docID := f.currentKV.Value().Key.DocID + f.currentKV = immutable.None[keyValue]() + return immutable.Some(docID), nil + } + + res, ok := f.kvResultsIter.NextSync() + if res.Error != nil { + return immutable.None[string](), res.Error + } + if !ok { + return immutable.None[string](), nil + } + + dsKey, err := keys.NewDataStoreKey(res.Key) + if err != nil { + return immutable.None[string](), err + } + + f.currentKV = immutable.Some(keyValue{ + Key: dsKey, + Value: res.Value, + }) + + return immutable.Some(dsKey.DocID), nil +} + +func (f *document) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + results := make([]keyValue, 0, len(f.col.Schema().Fields)) + results = append(results, f.currentKV.Value()) + + res, ok := f.kvResultsIter.NextSync() + for ok { + if !ok { + break + } + + dsKey, err := keys.NewDataStoreKey(res.Key) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + kv := keyValue{ + Key: dsKey, + Value: res.Value, + } + + if dsKey.DocID != f.currentKV.Value().Key.DocID { + f.currentKV = immutable.Some(kv) + break + } + + results = append(results, kv) + res, ok = f.kvResultsIter.NextSync() + } + + encodedDoc := encodedDocument{} + encodedDoc.id = []byte(results[0].Key.DocID) + encodedDoc.status = f.status + + for _, r := range results { + if r.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID { + encodedDoc.schemaVersionID = string(r.Value) + continue + } + + fieldID, err := r.Key.FieldIDAsUint() + if err != nil { + return immutable.None[EncodedDocument](), err + } + + fieldDesc, ok := f.fieldsByID[fieldID] + if !ok { + return immutable.None[EncodedDocument](), nil + } + + encodedDoc.properties[fieldDesc] = &encProperty{ + Desc: fieldDesc, + Raw: r.Value, + } + } + + return immutable.Some[EncodedDocument](&encodedDoc), nil +} + +func (f *document) Close() error { + return f.kvResultsIter.Close() +} diff --git a/internal/db/fetcher/fetcher.go b/internal/db/fetcher/fetcher.go index 877cbfa7c8..5abaa7c54c 100644 --- a/internal/db/fetcher/fetcher.go +++ b/internal/db/fetcher/fetcher.go @@ -77,6 +77,12 @@ type Fetcher interface { Close() error } +type fetcher interface { + NextDoc() (immutable.Option[string], error) + GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) + Close() error +} + // keyValue is a KV store response containing the resulting core.Key and byte array value. type keyValue struct { Key keys.DataStoreKey diff --git a/internal/db/fetcher/filtered.go b/internal/db/fetcher/filtered.go new file mode 100644 index 0000000000..ea5d1c8579 --- /dev/null +++ b/internal/db/fetcher/filtered.go @@ -0,0 +1,67 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/planner/mapper" +) + +type filtered struct { + ctx context.Context + + filter *mapper.Filter + mapping *core.DocumentMapping + + fetcher fetcher +} + +var _ fetcher = (*filtered)(nil) + +func (f *filtered) NextDoc() (immutable.Option[string], error) { + return f.fetcher.NextDoc() +} + +func (f *filtered) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + doc, err := f.fetcher.GetFields(fields...) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if !doc.HasValue() { + return immutable.None[EncodedDocument](), nil + } + + decodedDoc, err := DecodeToDoc(doc.Value(), f.mapping, false) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + passedFilter, err := mapper.RunFilter(decodedDoc, f.filter) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if !passedFilter { + return immutable.None[EncodedDocument](), nil + } + + return doc, nil +} + +func (f *filtered) Close() { + f.fetcher.Close() +} diff --git a/internal/db/fetcher/permissioned.go b/internal/db/fetcher/permissioned.go new file mode 100644 index 0000000000..bd6bbad889 --- /dev/null +++ b/internal/db/fetcher/permissioned.go @@ -0,0 +1,68 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/acp" + acpIdentity "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/db/permission" +) + +type permissioned struct { + ctx context.Context + + identity immutable.Option[acpIdentity.Identity] + acp acp.ACP + col client.Collection + + fetcher fetcher +} + +var _ fetcher = (*permissioned)(nil) + +func (f *permissioned) NextDoc() (immutable.Option[string], error) { + docID, err := f.fetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + + if !docID.HasValue() { + return immutable.None[string](), nil + } + + hasPermission, err := permission.CheckAccessOfDocOnCollectionWithACP( + f.ctx, + f.identity, + f.acp, + f.col, + acp.ReadPermission, + docID.Value(), + ) + + if !hasPermission { + return f.NextDoc() + } + + return docID, nil +} + +func (f *permissioned) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + return f.GetFields(fields...) +} + +func (f *permissioned) Close() { + f.fetcher.Close() +} diff --git a/internal/db/fetcher/prefix.go b/internal/db/fetcher/prefix.go new file mode 100644 index 0000000000..520281d15e --- /dev/null +++ b/internal/db/fetcher/prefix.go @@ -0,0 +1,83 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + dsq "github.com/ipfs/go-datastore/query" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/datastore/iterable" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/immutable" +) + +type prefix struct { + kvIter iterable.Iterator + + prefixes []keys.DataStoreKey + currentPrefix int + + fetcher fetcher + + ctx context.Context + col client.Collection + fields []client.FieldDefinition +} + +var _ fetcher = (*prefix)(nil) + +func newPrefixFetcher(txn datastore.Txn) (*prefix, error) { + kvIter, err := txn.Datastore().GetIterator(dsq.Query{ + Orders: []dsq.Order{dsq.OrderByKey{}}, + }) + if err != nil { + return nil, err + } + + return &prefix{ + kvIter: kvIter, + currentPrefix: -1, + }, nil +} + +func (f *prefix) NextDoc() (immutable.Option[string], error) { + docID, err := f.fetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + + if !docID.HasValue() { + if len(f.prefixes) > f.currentPrefix { + if f.fetcher != nil { + f.fetcher.Close() + } + f.currentPrefix++ + + prefix := f.prefixes[f.currentPrefix] + + f.fetcher, err = newDocumentFetcher(f.ctx, f.col, f.fields, f.kvIter, prefix) + + return f.NextDoc() + } + } + + return docID, nil +} + +func (f *prefix) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + return f.fetcher.GetFields(fields...) +} + +func (f *prefix) Close() error { + return f.kvIter.Close() +} diff --git a/internal/db/fetcher/wrapper.go b/internal/db/fetcher/wrapper.go new file mode 100644 index 0000000000..954df50e81 --- /dev/null +++ b/internal/db/fetcher/wrapper.go @@ -0,0 +1,64 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/acp" + acpIdentity "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/planner/mapper" +) + +type wrapper struct { + identity immutable.Option[acpIdentity.Identity] + acp immutable.Option[acp.ACP] + col client.Collection + + fetcher fetcher +} + +var _ Fetcher = (*wrapper)(nil) + +func (f *wrapper) Init( + ctx context.Context, + identity immutable.Option[acpIdentity.Identity], + txn datastore.Txn, + acp immutable.Option[acp.ACP], + col client.Collection, + fields []client.FieldDefinition, + filter *mapper.Filter, + docMapper *core.DocumentMapping, + showDeleted bool, +) error { + f.identity = identity + f.acp = acp + + return f.docFetcher.Init(ctx, identity, txn, acp, col, fields, filter, docMapper, showDeleted) +} + +func (f *wrapper) Start(ctx context.Context, prefixes ...keys.Walkable) error { + return f.docFetcher.Start(ctx, prefixes...) +} + +func (f *wrapper) FetchNext(ctx context.Context) (EncodedDocument, ExecInfo, error) { + +} + +func (f *wrapper) Close() error { + return f.fetcher.Close() +}