From 4d7f14ca3a7cf7caea32c187558825ea35d8d0b7 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Wed, 27 Nov 2024 15:41:08 -0500 Subject: [PATCH] WIP --- internal/db/fetcher/deleted.go | 74 ++++++++++++++++ internal/db/fetcher/document.go | 126 ++++++++++++++++++++++++++++ internal/db/fetcher/fetcher.go | 6 ++ internal/db/fetcher/filtered.go | 67 +++++++++++++++ internal/db/fetcher/permissioned.go | 68 +++++++++++++++ internal/db/fetcher/wrapper.go | 64 ++++++++++++++ 6 files changed, 405 insertions(+) create mode 100644 internal/db/fetcher/deleted.go create mode 100644 internal/db/fetcher/document.go create mode 100644 internal/db/fetcher/filtered.go create mode 100644 internal/db/fetcher/permissioned.go create mode 100644 internal/db/fetcher/wrapper.go diff --git a/internal/db/fetcher/deleted.go b/internal/db/fetcher/deleted.go new file mode 100644 index 0000000000..d74ce3c674 --- /dev/null +++ b/internal/db/fetcher/deleted.go @@ -0,0 +1,74 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/immutable" +) + +type deleted struct { + activeFetcher fetcher + activeDocID immutable.Option[string] + + deletedFetcher fetcher + deletedDocID immutable.Option[string] + + currentFetcher fetcher +} + +var _ fetcher = (*deleted)(nil) + +func (f *deleted) NextDoc() (immutable.Option[string], error) { + if !f.activeDocID.HasValue() { + var err error + f.activeDocID, err = f.activeFetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + } + + if !f.deletedDocID.HasValue() { + var err error + f.deletedDocID, err = f.deletedFetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + } + + if f.deletedDocID.Value() < f.activeDocID.Value() { + f.currentFetcher = f.deletedFetcher + return f.deletedDocID, nil + } + + f.currentFetcher = f.activeFetcher + return f.activeDocID, nil +} + +func (f *deleted) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + doc, err := f.currentFetcher.GetFields(fields...) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if f.activeFetcher == f.currentFetcher { + f.activeDocID = immutable.None[string]() + } else { + f.deletedDocID = immutable.None[string]() + } + + return doc, nil +} + +func (f *deleted) Close() { + f.activeFetcher.Close() + f.deletedFetcher.Close() +} diff --git a/internal/db/fetcher/document.go b/internal/db/fetcher/document.go new file mode 100644 index 0000000000..cfea4819eb --- /dev/null +++ b/internal/db/fetcher/document.go @@ -0,0 +1,126 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + dsq "github.com/ipfs/go-datastore/query" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/immutable" +) + +type document struct { + txn datastore.Txn + prefixes []keys.DataStoreKey + col client.Collection + fieldsByID map[uint32]client.FieldDefinition + + currentPrefix int // todo - move out + kvResultsIter dsq.Results + currentKV immutable.Option[keyValue] +} + +var _ fetcher = (*document)(nil) + +func newDocumentFetcher() { + +} + +func (f *document) NextDoc() (immutable.Option[string], error) { + if f.currentKV.HasValue() { + docID := f.currentKV.Value().Key.DocID + f.currentKV = immutable.None[keyValue]() + return immutable.Some(docID), nil + } + + res, ok := f.kvResultsIter.NextSync() + if res.Error != nil { + return immutable.None[string](), res.Error + } + if !ok { + return immutable.None[string](), nil + } + + dsKey, err := keys.NewDataStoreKey(res.Key) + if err != nil { + return immutable.None[string](), err + } + + f.currentKV = immutable.Some(keyValue{ + Key: dsKey, + Value: res.Value, + }) + + return immutable.Some(dsKey.DocID), nil +} + +func (f *document) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + results := make([]keyValue, 0, len(f.col.Schema().Fields)) + results = append(results, f.currentKV.Value()) + + res, ok := f.kvResultsIter.NextSync() + for ok { + if !ok { + break + } + + dsKey, err := keys.NewDataStoreKey(res.Key) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + kv := keyValue{ + Key: dsKey, + Value: res.Value, + } + + if dsKey.DocID != f.currentKV.Value().Key.DocID { + f.currentKV = immutable.Some(kv) + break + } + + results = append(results, kv) + res, ok = f.kvResultsIter.NextSync() + } + + encodedDoc := encodedDocument{} + encodedDoc.id = []byte(results[0].Key.DocID) + encodedDoc.status = client.Active //todo :) + + for _, r := range results { + if r.Key.FieldID == keys.DATASTORE_DOC_VERSION_FIELD_ID { + encodedDoc.schemaVersionID = string(r.Value) + continue + } + + fieldID, err := r.Key.FieldIDAsUint() + if err != nil { + return immutable.None[EncodedDocument](), err + } + + fieldDesc, ok := f.fieldsByID[fieldID] + if !ok { + return immutable.None[EncodedDocument](), nil + } + + encodedDoc.properties[fieldDesc] = &encProperty{ + Desc: fieldDesc, + Raw: r.Value, + } + } + + return immutable.Some[EncodedDocument](&encodedDoc), nil +} + +func (f *document) Close() { + +} diff --git a/internal/db/fetcher/fetcher.go b/internal/db/fetcher/fetcher.go index 877cbfa7c8..ffd881fbf8 100644 --- a/internal/db/fetcher/fetcher.go +++ b/internal/db/fetcher/fetcher.go @@ -77,6 +77,12 @@ type Fetcher interface { Close() error } +type fetcher interface { + NextDoc() (immutable.Option[string], error) + GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) + Close() +} + // keyValue is a KV store response containing the resulting core.Key and byte array value. type keyValue struct { Key keys.DataStoreKey diff --git a/internal/db/fetcher/filtered.go b/internal/db/fetcher/filtered.go new file mode 100644 index 0000000000..ea5d1c8579 --- /dev/null +++ b/internal/db/fetcher/filtered.go @@ -0,0 +1,67 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/planner/mapper" +) + +type filtered struct { + ctx context.Context + + filter *mapper.Filter + mapping *core.DocumentMapping + + fetcher fetcher +} + +var _ fetcher = (*filtered)(nil) + +func (f *filtered) NextDoc() (immutable.Option[string], error) { + return f.fetcher.NextDoc() +} + +func (f *filtered) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + doc, err := f.fetcher.GetFields(fields...) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if !doc.HasValue() { + return immutable.None[EncodedDocument](), nil + } + + decodedDoc, err := DecodeToDoc(doc.Value(), f.mapping, false) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + passedFilter, err := mapper.RunFilter(decodedDoc, f.filter) + if err != nil { + return immutable.None[EncodedDocument](), err + } + + if !passedFilter { + return immutable.None[EncodedDocument](), nil + } + + return doc, nil +} + +func (f *filtered) Close() { + f.fetcher.Close() +} diff --git a/internal/db/fetcher/permissioned.go b/internal/db/fetcher/permissioned.go new file mode 100644 index 0000000000..bd6bbad889 --- /dev/null +++ b/internal/db/fetcher/permissioned.go @@ -0,0 +1,68 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/acp" + acpIdentity "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/db/permission" +) + +type permissioned struct { + ctx context.Context + + identity immutable.Option[acpIdentity.Identity] + acp acp.ACP + col client.Collection + + fetcher fetcher +} + +var _ fetcher = (*permissioned)(nil) + +func (f *permissioned) NextDoc() (immutable.Option[string], error) { + docID, err := f.fetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + + if !docID.HasValue() { + return immutable.None[string](), nil + } + + hasPermission, err := permission.CheckAccessOfDocOnCollectionWithACP( + f.ctx, + f.identity, + f.acp, + f.col, + acp.ReadPermission, + docID.Value(), + ) + + if !hasPermission { + return f.NextDoc() + } + + return docID, nil +} + +func (f *permissioned) GetFields(fields ...client.FieldID) (immutable.Option[EncodedDocument], error) { + return f.GetFields(fields...) +} + +func (f *permissioned) Close() { + f.fetcher.Close() +} diff --git a/internal/db/fetcher/wrapper.go b/internal/db/fetcher/wrapper.go new file mode 100644 index 0000000000..954df50e81 --- /dev/null +++ b/internal/db/fetcher/wrapper.go @@ -0,0 +1,64 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/acp" + acpIdentity "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/planner/mapper" +) + +type wrapper struct { + identity immutable.Option[acpIdentity.Identity] + acp immutable.Option[acp.ACP] + col client.Collection + + fetcher fetcher +} + +var _ Fetcher = (*wrapper)(nil) + +func (f *wrapper) Init( + ctx context.Context, + identity immutable.Option[acpIdentity.Identity], + txn datastore.Txn, + acp immutable.Option[acp.ACP], + col client.Collection, + fields []client.FieldDefinition, + filter *mapper.Filter, + docMapper *core.DocumentMapping, + showDeleted bool, +) error { + f.identity = identity + f.acp = acp + + return f.docFetcher.Init(ctx, identity, txn, acp, col, fields, filter, docMapper, showDeleted) +} + +func (f *wrapper) Start(ctx context.Context, prefixes ...keys.Walkable) error { + return f.docFetcher.Start(ctx, prefixes...) +} + +func (f *wrapper) FetchNext(ctx context.Context) (EncodedDocument, ExecInfo, error) { + +} + +func (f *wrapper) Close() error { + return f.fetcher.Close() +}