From 6eb7987afce6df696b5871d55cd91b5dd8bfb5a2 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Tue, 5 Nov 2024 14:05:06 -0500 Subject: [PATCH] Remove DatastoreKey-is-actually-headstore hacks This is the primary goal of the PR, as otherwise I'd have to extend the hacks even further in order to query collection commits. --- internal/core/data.go | 8 ++-- internal/db/fetcher/dag.go | 28 +------------- internal/db/fetcher/errors.go | 63 ++++++++++++++------------------ internal/db/fetcher/fetcher.go | 15 ++++++-- internal/db/fetcher/versioned.go | 40 +++++++------------- internal/keys/datastore_doc.go | 4 +- internal/keys/headstore_doc.go | 21 ++++++++++- internal/keys/key.go | 17 +++++++++ internal/planner/commit.go | 39 +++++++++++--------- internal/planner/scan.go | 7 ++-- internal/planner/select.go | 17 ++++++--- 11 files changed, 134 insertions(+), 125 deletions(-) diff --git a/internal/core/data.go b/internal/core/data.go index 4fdfe2b14d..d84186826c 100644 --- a/internal/core/data.go +++ b/internal/core/data.go @@ -19,14 +19,14 @@ import ( // Span is a range of keys from [Start, End). type Span struct { // Start represents the starting key of the Span. - Start keys.DataStoreKey + Start keys.Walkable // End represents the ending key of the Span. - End keys.DataStoreKey + End keys.Walkable } // NewSpan creates a new Span from the provided start and end keys. -func NewSpan(start, end keys.DataStoreKey) Span { +func NewSpan(start, end keys.Walkable) Span { return Span{ Start: start, End: end, @@ -122,7 +122,7 @@ func (this Span) Compare(other Span) SpanComparisonResult { return After } -func isAdjacent(this keys.DataStoreKey, other keys.DataStoreKey) bool { +func isAdjacent(this keys.Walkable, other keys.Walkable) bool { return len(this.ToString()) == len(other.ToString()) && (this.PrefixEnd().ToString() == other.ToString() || this.ToString() == other.PrefixEnd().ToString()) diff --git a/internal/db/fetcher/dag.go b/internal/db/fetcher/dag.go index 3fae50c52b..723b821a97 100644 --- a/internal/db/fetcher/dag.go +++ b/internal/db/fetcher/dag.go @@ -12,21 +12,17 @@ package fetcher import ( "context" - "sort" - "strings" "github.com/ipfs/go-cid" dsq "github.com/ipfs/go-datastore/query" "github.com/sourcenetwork/immutable" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/keys" ) // HeadFetcher is a utility to incrementally fetch all the MerkleCRDT heads of a given doc/field. type HeadFetcher struct { - spans []core.Span fieldId immutable.Option[string] kvIter dsq.Results @@ -35,33 +31,13 @@ type HeadFetcher struct { func (hf *HeadFetcher) Start( ctx context.Context, txn datastore.Txn, - spans []core.Span, + prefix keys.HeadStoreKey, fieldId immutable.Option[string], ) error { - if len(spans) == 0 { - spans = []core.Span{ - core.NewSpan( - keys.DataStoreKey{}, - keys.DataStoreKey{}.PrefixEnd(), - ), - } - } - - if len(spans) > 1 { - // if we have multiple spans, we need to sort them by their start position - // so we can do a single iterative sweep - sort.Slice(spans, func(i, j int) bool { - // compare by strings if i < j. - // apply the '!= df.reverse' to reverse the sort - // if we need to - return (strings.Compare(spans[i].Start.ToString(), spans[j].Start.ToString()) < 0) - }) - } - hf.spans = spans hf.fieldId = fieldId q := dsq.Query{ - Prefix: hf.spans[0].Start.ToString(), + Prefix: prefix.ToString(), Orders: []dsq.Order{dsq.OrderByKey{}}, } diff --git a/internal/db/fetcher/errors.go b/internal/db/fetcher/errors.go index 22f0c8b182..0a8a7d21b4 100644 --- a/internal/db/fetcher/errors.go +++ b/internal/db/fetcher/errors.go @@ -18,39 +18,37 @@ import ( ) const ( - errFieldIdNotFound string = "unable to find SchemaFieldDescription for given FieldId" - errFailedToDecodeCIDForVFetcher string = "failed to decode CID for VersionedFetcher" - errFailedToSeek string = "seek failed" - errFailedToMergeState string = "failed merging state" - errVFetcherFailedToFindBlock string = "(version fetcher) failed to find block in blockstore" - errVFetcherFailedToGetBlock string = "(version fetcher) failed to get block in blockstore" - errVFetcherFailedToWriteBlock string = "(version fetcher) failed to write block to blockstore" - errVFetcherFailedToDecodeNode string = "(version fetcher) failed to decode protobuf" - errVFetcherFailedToGetDagLink string = "(version fetcher) failed to get node link from DAG" - errFailedToGetDagNode string = "failed to get DAG Node" - errMissingMapper string = "missing document mapper" - errInvalidInOperatorValue string = "invalid _in/_nin value" - errInvalidFilterOperator string = "invalid filter operator is provided" - errNotSupportedKindByIndex string = "kind is not supported by index" - errUnexpectedTypeValue string = "unexpected type value" + errFieldIdNotFound string = "unable to find SchemaFieldDescription for given FieldId" + errFailedToSeek string = "seek failed" + errFailedToMergeState string = "failed merging state" + errVFetcherFailedToFindBlock string = "(version fetcher) failed to find block in blockstore" + errVFetcherFailedToGetBlock string = "(version fetcher) failed to get block in blockstore" + errVFetcherFailedToWriteBlock string = "(version fetcher) failed to write block to blockstore" + errVFetcherFailedToDecodeNode string = "(version fetcher) failed to decode protobuf" + errVFetcherFailedToGetDagLink string = "(version fetcher) failed to get node link from DAG" + errFailedToGetDagNode string = "failed to get DAG Node" + errMissingMapper string = "missing document mapper" + errInvalidInOperatorValue string = "invalid _in/_nin value" + errInvalidFilterOperator string = "invalid filter operator is provided" + errNotSupportedKindByIndex string = "kind is not supported by index" + errUnexpectedTypeValue string = "unexpected type value" ) var ( - ErrFieldIdNotFound = errors.New(errFieldIdNotFound) - ErrFailedToDecodeCIDForVFetcher = errors.New(errFailedToDecodeCIDForVFetcher) - ErrFailedToSeek = errors.New(errFailedToSeek) - ErrFailedToMergeState = errors.New(errFailedToMergeState) - ErrVFetcherFailedToFindBlock = errors.New(errVFetcherFailedToFindBlock) - ErrVFetcherFailedToGetBlock = errors.New(errVFetcherFailedToGetBlock) - ErrVFetcherFailedToWriteBlock = errors.New(errVFetcherFailedToWriteBlock) - ErrVFetcherFailedToDecodeNode = errors.New(errVFetcherFailedToDecodeNode) - ErrVFetcherFailedToGetDagLink = errors.New(errVFetcherFailedToGetDagLink) - ErrFailedToGetDagNode = errors.New(errFailedToGetDagNode) - ErrMissingMapper = errors.New(errMissingMapper) - ErrSingleSpanOnly = errors.New("spans must contain only a single entry") - ErrInvalidInOperatorValue = errors.New(errInvalidInOperatorValue) - ErrInvalidFilterOperator = errors.New(errInvalidFilterOperator) - ErrUnexpectedTypeValue = errors.New(errUnexpectedTypeValue) + ErrFieldIdNotFound = errors.New(errFieldIdNotFound) + ErrFailedToSeek = errors.New(errFailedToSeek) + ErrFailedToMergeState = errors.New(errFailedToMergeState) + ErrVFetcherFailedToFindBlock = errors.New(errVFetcherFailedToFindBlock) + ErrVFetcherFailedToGetBlock = errors.New(errVFetcherFailedToGetBlock) + ErrVFetcherFailedToWriteBlock = errors.New(errVFetcherFailedToWriteBlock) + ErrVFetcherFailedToDecodeNode = errors.New(errVFetcherFailedToDecodeNode) + ErrVFetcherFailedToGetDagLink = errors.New(errVFetcherFailedToGetDagLink) + ErrFailedToGetDagNode = errors.New(errFailedToGetDagNode) + ErrMissingMapper = errors.New(errMissingMapper) + ErrSingleSpanOnly = errors.New("spans must contain only a single entry") + ErrInvalidInOperatorValue = errors.New(errInvalidInOperatorValue) + ErrInvalidFilterOperator = errors.New(errInvalidFilterOperator) + ErrUnexpectedTypeValue = errors.New(errUnexpectedTypeValue) ) // NewErrFieldIdNotFound returns an error indicating that the given FieldId was not found. @@ -58,11 +56,6 @@ func NewErrFieldIdNotFound(fieldId uint32) error { return errors.New(errFieldIdNotFound, errors.NewKV("FieldId", fieldId)) } -// NewErrFailedToDecodeCIDForVFetcher returns an error indicating that the given CID could not be decoded. -func NewErrFailedToDecodeCIDForVFetcher(inner error) error { - return errors.Wrap(errFailedToDecodeCIDForVFetcher, inner) -} - // NewErrFailedToSeek returns an error indicating that the given target could not be seeked to. func NewErrFailedToSeek(target any, inner error) error { return errors.Wrap(errFailedToSeek, inner, errors.NewKV("Target", target)) diff --git a/internal/db/fetcher/fetcher.go b/internal/db/fetcher/fetcher.go index e273b69318..0ca828c4b2 100644 --- a/internal/db/fetcher/fetcher.go +++ b/internal/db/fetcher/fetcher.go @@ -278,11 +278,20 @@ func (df *DocumentFetcher) start(ctx context.Context, spans []core.Span, withDel } else { valueSpans := make([]core.Span, len(spans)) for i, span := range spans { - // We can only handle value keys, so here we ensure we only read value keys if withDeleted { - valueSpans[i] = core.NewSpan(span.Start.WithDeletedFlag(), span.End.WithDeletedFlag()) + // DocumentFetcher only ever recieves document keys + //nolint:forcetypeassert + valueSpans[i] = core.NewSpan( + span.Start.(keys.DataStoreKey).WithDeletedFlag(), + span.End.(keys.DataStoreKey).WithDeletedFlag(), + ) } else { - valueSpans[i] = core.NewSpan(span.Start.WithValueFlag(), span.End.WithValueFlag()) + // DocumentFetcher only ever recieves document keys + //nolint:forcetypeassert + valueSpans[i] = core.NewSpan( + span.Start.(keys.DataStoreKey).WithValueFlag(), + span.End.(keys.DataStoreKey).WithValueFlag(), + ) } } diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index c2ea4e2cac..baa3acfcfb 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -89,8 +89,7 @@ type VersionedFetcher struct { root datastore.Rootstore store datastore.Txn - dsKey keys.DataStoreKey - version cid.Cid + dsKey keys.DataStoreKey queuedCids *list.List @@ -162,30 +161,25 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans ...core.Span) error return ErrSingleSpanOnly } - // For the VersionedFetcher, the spans needs to be in the format - // Span{Start: DocID, End: CID} - dk := spans[0].Start - cidRaw := spans[0].End - if dk.DocID == "" { + // VersionedFetcher only ever recieves a headstore key + //nolint:forcetypeassert + prefix := spans[0].Start.(keys.HeadStoreKey) + dk := prefix.DocID + cid := prefix.Cid + if dk == "" { return client.NewErrUninitializeProperty("Spans", "DocID") - } else if cidRaw.DocID == "" { // todo: dont abuse DataStoreKey/Span like this! + } else if !cid.Defined() { return client.NewErrUninitializeProperty("Spans", "CID") } - // decode cidRaw from core.Key to cid.Cid - // need to remove '/' prefix from the core.Key - - c, err := cid.Decode(cidRaw.DocID) - if err != nil { - return NewErrFailedToDecodeCIDForVFetcher(err) - } - vf.ctx = ctx - vf.dsKey = dk.WithCollectionRoot(vf.col.Description().RootID) - vf.version = c + vf.dsKey = keys.DataStoreKey{ + CollectionRootID: vf.col.Description().RootID, + DocID: dk, + } - if err := vf.seekTo(vf.version); err != nil { - return NewErrFailedToSeek(c, err) + if err := vf.seekTo(cid); err != nil { + return NewErrFailedToSeek(cid, err) } return vf.DocumentFetcher.Start(ctx) @@ -421,9 +415,3 @@ func (vf *VersionedFetcher) Close() error { return vf.DocumentFetcher.Close() } - -// NewVersionedSpan creates a new VersionedSpan from a DataStoreKey and a version CID. -func NewVersionedSpan(dsKey keys.DataStoreKey, version cid.Cid) core.Span { - // Todo: Dont abuse DataStoreKey for version cid! - return core.NewSpan(dsKey, keys.DataStoreKey{DocID: version.String()}) -} diff --git a/internal/keys/datastore_doc.go b/internal/keys/datastore_doc.go index cd8ac60917..cffa99f6fc 100644 --- a/internal/keys/datastore_doc.go +++ b/internal/keys/datastore_doc.go @@ -41,7 +41,7 @@ type DataStoreKey struct { FieldID string } -var _ Key = (*DataStoreKey)(nil) +var _ Walkable = (*DataStoreKey)(nil) // Creates a new DataStoreKey from a string as best as it can, // splitting the input using '/' as a field deliminator. It assumes @@ -167,7 +167,7 @@ func (k DataStoreKey) ToPrimaryDataStoreKey() PrimaryDataStoreKey { // PrefixEnd determines the end key given key as a prefix, that is the key that sorts precisely // behind all keys starting with prefix: "1" is added to the final byte and the carry propagated. // The special cases of nil and KeyMin always returns KeyMax. -func (k DataStoreKey) PrefixEnd() DataStoreKey { +func (k DataStoreKey) PrefixEnd() Walkable { newKey := k if k.FieldID != "" { diff --git a/internal/keys/headstore_doc.go b/internal/keys/headstore_doc.go index 5d3ec2306e..55809ab236 100644 --- a/internal/keys/headstore_doc.go +++ b/internal/keys/headstore_doc.go @@ -23,7 +23,7 @@ type HeadStoreKey struct { Cid cid.Cid } -var _ Key = (*HeadStoreKey)(nil) +var _ Walkable = (*HeadStoreKey)(nil) // Creates a new HeadStoreKey from a string as best as it can, // splitting the input using '/' as a field deliminator. It assumes @@ -92,3 +92,22 @@ func (k HeadStoreKey) Bytes() []byte { func (k HeadStoreKey) ToDS() ds.Key { return ds.NewKey(k.ToString()) } + +func (k HeadStoreKey) PrefixEnd() Walkable { + newKey := k + + if k.FieldID != "" { + newKey.FieldID = string(bytesPrefixEnd([]byte(k.FieldID))) + return newKey + } + if k.DocID != "" { + newKey.DocID = string(bytesPrefixEnd([]byte(k.DocID))) + return newKey + } + if k.Cid.Defined() { + newKey.Cid = cid.MustParse(bytesPrefixEnd(k.Cid.Bytes())) + return newKey + } + + return newKey +} diff --git a/internal/keys/key.go b/internal/keys/key.go index 893b9790b4..42e5935a1f 100644 --- a/internal/keys/key.go +++ b/internal/keys/key.go @@ -20,3 +20,20 @@ type Key interface { Bytes() []byte ToDS() ds.Key } + +// Walkable represents a key in the database that can be 'walked along' +// by prefixing the end of the key. +type Walkable interface { + Key + PrefixEnd() Walkable +} + +// PrettyPrint returns the human readable version of the given key. +func PrettyPrint(k Key) string { + switch typed := k.(type) { + case DataStoreKey: + return typed.PrettyPrint() + default: + return typed.ToString() + } +} diff --git a/internal/planner/commit.go b/internal/planner/commit.go index 0df6657295..ceecfc46cd 100644 --- a/internal/planner/commit.go +++ b/internal/planner/commit.go @@ -36,7 +36,7 @@ type dagScanNode struct { queuedCids []*cid.Cid fetcher fetcher.HeadFetcher - spans []core.Span + prefix keys.HeadStoreKey commitSelect *mapper.CommitSelect execInfo dagScanExecInfo @@ -67,20 +67,21 @@ func (n *dagScanNode) Kind() string { } func (n *dagScanNode) Init() error { - if len(n.spans) == 0 { + undefined := keys.HeadStoreKey{} + if n.prefix == undefined { if n.commitSelect.DocID.HasValue() { - dsKey := keys.DataStoreKey{}.WithDocID(n.commitSelect.DocID.Value()) + key := keys.HeadStoreKey{}.WithDocID(n.commitSelect.DocID.Value()) if n.commitSelect.FieldID.HasValue() { field := n.commitSelect.FieldID.Value() - dsKey = dsKey.WithFieldID(field) + key = key.WithFieldID(field) } - n.spans = []core.Span{core.NewSpan(dsKey, dsKey.PrefixEnd())} + n.prefix = key } } - return n.fetcher.Start(n.planner.ctx, n.planner.txn, n.spans, n.commitSelect.FieldID) + return n.fetcher.Start(n.planner.ctx, n.planner.txn, n.prefix, n.commitSelect.FieldID) } func (n *dagScanNode) Start() error { @@ -97,10 +98,6 @@ func (n *dagScanNode) Spans(spans []core.Span) { return } - // copy the input spans so that we may mutate freely - headSetSpans := make([]core.Span, len(spans)) - copy(headSetSpans, spans) - var fieldID string if n.commitSelect.FieldID.HasValue() { fieldID = n.commitSelect.FieldID.Value() @@ -108,13 +105,18 @@ func (n *dagScanNode) Spans(spans []core.Span) { fieldID = core.COMPOSITE_NAMESPACE } - for i, span := range headSetSpans { - if span.Start.FieldID != fieldID { - headSetSpans[i] = core.NewSpan(span.Start.WithFieldID(fieldID), keys.DataStoreKey{}) + for _, span := range spans { + var start keys.HeadStoreKey + switch s := span.Start.(type) { + case keys.DataStoreKey: + start = s.ToHeadStoreKey() + case keys.HeadStoreKey: + start = s } - } - n.spans = headSetSpans + n.prefix = start.WithFieldID(fieldID) + return + } } func (n *dagScanNode) Close() error { @@ -142,13 +144,14 @@ func (n *dagScanNode) simpleExplain() (map[string]any, error) { // Build the explanation of the spans attribute. spansExplainer := []map[string]any{} + undefinedHsKey := keys.HeadStoreKey{} // Note: n.headset is `nil` for single commit selection query, so must check for it. - for _, span := range n.spans { + if n.prefix != undefinedHsKey { spansExplainer = append( spansExplainer, map[string]any{ - "start": span.Start.ToString(), - "end": span.End.ToString(), + "start": n.prefix.ToString(), + "end": n.prefix.PrefixEnd().ToString(), }, ) } diff --git a/internal/planner/scan.go b/internal/planner/scan.go index a010e9de1b..c00cda401c 100644 --- a/internal/planner/scan.go +++ b/internal/planner/scan.go @@ -18,6 +18,7 @@ import ( "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/db/base" "github.com/sourcenetwork/defradb/internal/db/fetcher" + "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/lens" "github.com/sourcenetwork/defradb/internal/planner/filter" "github.com/sourcenetwork/defradb/internal/planner/mapper" @@ -263,10 +264,8 @@ func (n *scanNode) explainSpans() []map[string]any { spansExplainer := []map[string]any{} for _, span := range n.spans { spanExplainer := map[string]any{ - // These must be pretty printed as the explain results need to be returnable - // as json via some clients (e.g. http and cli) - "start": span.Start.PrettyPrint(), - "end": span.End.PrettyPrint(), + "start": keys.PrettyPrint(span.Start), + "end": keys.PrettyPrint(span.End), } spansExplainer = append(spansExplainer, spanExplainer) diff --git a/internal/planner/select.go b/internal/planner/select.go index 50e08e4103..e5b53cd997 100644 --- a/internal/planner/select.go +++ b/internal/planner/select.go @@ -18,7 +18,6 @@ import ( "github.com/sourcenetwork/defradb/client/request" "github.com/sourcenetwork/defradb/internal/core" "github.com/sourcenetwork/defradb/internal/db/base" - "github.com/sourcenetwork/defradb/internal/db/fetcher" "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/planner/mapper" ) @@ -264,11 +263,17 @@ func (n *selectNode) initSource() ([]aggregateNode, error) { if err != nil { return nil, err } - span := fetcher.NewVersionedSpan( - keys.DataStoreKey{DocID: n.selectReq.DocIDs.Value()[0]}, - c, - ) // @todo check len - origScan.Spans([]core.Span{span}) + origScan.Spans( + []core.Span{ + core.NewSpan( + keys.HeadStoreKey{ + DocID: n.selectReq.DocIDs.Value()[0], + Cid: c, + }, + keys.HeadStoreKey{}, + ), + }, + ) } else if n.selectReq.DocIDs.HasValue() { // If we *just* have a DocID(s), run a FindByDocID(s) optimization // if we have a FindByDocID filter, create a span for it