From a13faa98fcbe8fb5ff8da7a3c2433e982a4bf33f Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Thu, 12 Dec 2024 00:28:44 -0500 Subject: [PATCH] WIP PR FIXUP - Convert deleted fetcher to multi fetcher --- internal/db/fetcher/deleted.go | 95 ---------------------------- internal/db/fetcher/multi.go | 111 +++++++++++++++++++++++++++++++++ internal/db/fetcher/wrapper.go | 12 ++-- 3 files changed, 117 insertions(+), 101 deletions(-) delete mode 100644 internal/db/fetcher/deleted.go create mode 100644 internal/db/fetcher/multi.go diff --git a/internal/db/fetcher/deleted.go b/internal/db/fetcher/deleted.go deleted file mode 100644 index eeb127f479..0000000000 --- a/internal/db/fetcher/deleted.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2024 Democratized Data Foundation -// -// Use of this software is governed by the Business Source License -// included in the file licenses/BSL.txt. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0, included in the file -// licenses/APL.txt. - -package fetcher - -import ( - "errors" - - "github.com/sourcenetwork/immutable" -) - -// deleted is a fetcher that orchastrates the fetching of deleted and active documents. -type deleted struct { - activeFetcher fetcher - activeDocID immutable.Option[string] - - deletedFetcher fetcher - deletedDocID immutable.Option[string] - - currentFetcher fetcher -} - -var _ fetcher = (*deleted)(nil) - -func newDeletedFetcher( - activeFetcher fetcher, - deletedFetcher fetcher, -) *deleted { - return &deleted{ - activeFetcher: activeFetcher, - deletedFetcher: deletedFetcher, - } -} - -func (f *deleted) NextDoc() (immutable.Option[string], error) { - if !f.activeDocID.HasValue() { - var err error - f.activeDocID, err = f.activeFetcher.NextDoc() - if err != nil { - return immutable.None[string](), err - } - } - - if !f.deletedDocID.HasValue() { - var err error - f.deletedDocID, err = f.deletedFetcher.NextDoc() - if err != nil { - return immutable.None[string](), err - } - } - - if !f.activeDocID.HasValue() || (f.deletedDocID.HasValue() && f.deletedDocID.Value() < f.activeDocID.Value()) { - f.currentFetcher = f.deletedFetcher - return f.deletedDocID, nil - } - - f.currentFetcher = f.activeFetcher - return f.activeDocID, nil -} - -func (f *deleted) GetFields() (immutable.Option[EncodedDocument], error) { - doc, err := f.currentFetcher.GetFields() - if err != nil { - return immutable.None[EncodedDocument](), err - } - - if f.activeFetcher == f.currentFetcher { - f.activeDocID = immutable.None[string]() - } else { - f.deletedDocID = immutable.None[string]() - } - - return doc, nil -} - -func (f *deleted) Close() error { - activeErr := f.activeFetcher.Close() - if activeErr != nil { - deletedErr := f.deletedFetcher.Close() - if deletedErr != nil { - return errors.Join(activeErr, deletedErr) - } - - return activeErr - } - - return f.deletedFetcher.Close() -} diff --git a/internal/db/fetcher/multi.go b/internal/db/fetcher/multi.go new file mode 100644 index 0000000000..be519b5c5e --- /dev/null +++ b/internal/db/fetcher/multi.go @@ -0,0 +1,111 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package fetcher + +import ( + "errors" + + "github.com/sourcenetwork/immutable" +) + +// multi is a fetcher that orchastrates the fetching of documents via multiple child fetchers. +// +// The documents are yielded ordered by docID independently of which child fetcher they are sourced from. +type multi struct { + children []fetcherDocID + + currentFetcherIndex int +} + +var _ fetcher = (*multi)(nil) + +func newMultiFetcher( + children []fetcher, +) *multi { + fetcherDocIDs := make([]fetcherDocID, len(children)) + + for i, fetcher := range children { + fetcherDocIDs[i] = fetcherDocID{ + fetcher: fetcher, + } + } + + return &multi{ + children: fetcherDocIDs, + } +} + +type fetcherDocID struct { + fetcher fetcher + docID immutable.Option[string] +} + +func (f *multi) NextDoc() (immutable.Option[string], error) { + selectedFetcherIndex := -1 + var selectedDocID immutable.Option[string] + + for i := 0; i < len(f.children); { + if !f.children[i].docID.HasValue() { + docID, err := f.children[i].fetcher.NextDoc() + if err != nil { + return immutable.None[string](), err + } + f.children[i].docID = docID + } + + if !f.children[i].docID.HasValue() { + err := f.children[i].fetcher.Close() + if err != nil { + return immutable.None[string](), err + } + + f.children = append(f.children[:i], f.children[i+1:]...) + continue + } + + if !selectedDocID.HasValue() || (f.children[i].docID.Value() < selectedDocID.Value()) { + selectedFetcherIndex = i + selectedDocID = f.children[i].docID + } + + i++ + } + + f.currentFetcherIndex = selectedFetcherIndex + return selectedDocID, nil +} + +func (f *multi) GetFields() (immutable.Option[EncodedDocument], error) { + doc, err := f.children[f.currentFetcherIndex].fetcher.GetFields() + if err != nil { + return immutable.None[EncodedDocument](), err + } + + f.children[f.currentFetcherIndex].docID = immutable.None[string]() + + return doc, nil +} + +func (f *multi) Close() error { + errs := []error{} + for _, child := range f.children { + err := child.fetcher.Close() + if err != nil { + errs = append(errs, err) + } + } + + if len(errs) != 0 { + return errors.Join(errs...) + } + + return nil +} diff --git a/internal/db/fetcher/wrapper.go b/internal/db/fetcher/wrapper.go index 1272b303d5..905d0a1aab 100644 --- a/internal/db/fetcher/wrapper.go +++ b/internal/db/fetcher/wrapper.go @@ -120,8 +120,8 @@ func (f *wrapper) Start(ctx context.Context, prefixes ...keys.Walkable) error { var execInfo ExecInfo f.execInfo = &execInfo - var fetcher fetcher - fetcher, err = newPrefixFetcher(ctx, f.txn, dsPrefixes, f.col, fieldsByID, client.Active, &execInfo) + var top fetcher + top, err = newPrefixFetcher(ctx, f.txn, dsPrefixes, f.col, fieldsByID, client.Active, &execInfo) if err != nil { return nil } @@ -132,18 +132,18 @@ func (f *wrapper) Start(ctx context.Context, prefixes ...keys.Walkable) error { return nil } - fetcher = newDeletedFetcher(fetcher, deletedFetcher) + top = newMultiFetcher([]fetcher{top, deletedFetcher}) } if f.acp.HasValue() { - fetcher = newPermissionedFetcher(ctx, f.identity, f.acp.Value(), f.col, fetcher) + top = newPermissionedFetcher(ctx, f.identity, f.acp.Value(), f.col, top) } if f.filter != nil { - fetcher = newFilteredFetcher(f.filter, f.docMapper, fetcher) + top = newFilteredFetcher(f.filter, f.docMapper, top) } - f.fetcher = fetcher + f.fetcher = top return nil }