Skip to content

Commit

Permalink
WIP PR FIXUP - Convert deleted fetcher to multi fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Dec 12, 2024
1 parent f03a867 commit 3d757ec
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 101 deletions.
95 changes: 0 additions & 95 deletions internal/db/fetcher/deleted.go

This file was deleted.

99 changes: 99 additions & 0 deletions internal/db/fetcher/multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package fetcher

import (
"errors"

"github.com/sourcenetwork/immutable"
)

// multi is a fetcher that orchastrates the fetching of documents via multiple child fetchers.
//
// The documents are yielded ordered by docID independently of which child fetcher they are sourced from.
type multi struct {
children []fetcherDocID

currentFetcherIndex int
}

var _ fetcher = (*multi)(nil)

func newMultiFetcher(
children []fetcher,
) *multi {
fetcherDocIDs := make([]fetcherDocID, len(children))

for i, fetcher := range children {
fetcherDocIDs[i] = fetcherDocID{
fetcher: fetcher,
}
}

return &multi{
children: fetcherDocIDs,
}
}

type fetcherDocID struct {
fetcher fetcher
docID immutable.Option[string]
}

func (f *multi) NextDoc() (immutable.Option[string], error) {
selectedFetcherIndex := -1
var selectedDocID immutable.Option[string]

for i, child := range f.children {
if !child.docID.HasValue() {
docID, err := child.fetcher.NextDoc()
if err != nil {
return immutable.None[string](), err
}
child.docID = docID
}

if !selectedDocID.HasValue() || (child.docID.HasValue() && child.docID.Value() < selectedDocID.Value()) {
selectedFetcherIndex = i
selectedDocID = child.docID
}
}

f.currentFetcherIndex = selectedFetcherIndex
return selectedDocID, nil
}

func (f *multi) GetFields() (immutable.Option[EncodedDocument], error) {
doc, err := f.children[f.currentFetcherIndex].fetcher.GetFields()
if err != nil {
return immutable.None[EncodedDocument](), err
}

f.children[f.currentFetcherIndex].docID = immutable.None[string]()

return doc, nil
}

func (f *multi) Close() error {
errs := []error{}
for _, child := range f.children {
err := child.fetcher.Close()
if err != nil {
errs = append(errs, err)
}
}

if len(errs) != 0 {
return errors.Join(errs...)
}

return nil
}
12 changes: 6 additions & 6 deletions internal/db/fetcher/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ func (f *wrapper) Start(ctx context.Context, prefixes ...keys.Walkable) error {
var execInfo ExecInfo
f.execInfo = &execInfo

var fetcher fetcher
fetcher, err = newPrefixFetcher(ctx, f.txn, dsPrefixes, f.col, fieldsByID, client.Active, &execInfo)
var top fetcher
top, err = newPrefixFetcher(ctx, f.txn, dsPrefixes, f.col, fieldsByID, client.Active, &execInfo)
if err != nil {
return nil
}
Expand All @@ -132,18 +132,18 @@ func (f *wrapper) Start(ctx context.Context, prefixes ...keys.Walkable) error {
return nil
}

fetcher = newDeletedFetcher(fetcher, deletedFetcher)
top = newMultiFetcher([]fetcher{top, deletedFetcher})
}

if f.acp.HasValue() {
fetcher = newPermissionedFetcher(ctx, f.identity, f.acp.Value(), f.col, fetcher)
top = newPermissionedFetcher(ctx, f.identity, f.acp.Value(), f.col, top)
}

if f.filter != nil {
fetcher = newFilteredFetcher(f.filter, f.docMapper, fetcher)
top = newFilteredFetcher(f.filter, f.docMapper, top)
}

f.fetcher = fetcher
f.fetcher = top
return nil
}

Expand Down

0 comments on commit 3d757ec

Please sign in to comment.