Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Consistent version query semantics #3477

Merged
merged 6 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/data_format_changes/i3477-version-query-consistency.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Consistent version query semantics
Updated semantics for commit/version/time-travel queries when using both DocID and CID.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: There is no data breaking change in this PR, if the change detector failed it is because the integration test definitions were changed, not the data on disk. Please change the wording to reflect this.

51 changes: 40 additions & 11 deletions internal/planner/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/client/request"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/db/fetcher"
Expand Down Expand Up @@ -84,7 +85,11 @@ func (n *dagScanNode) Init() error {
}
}

return n.fetcher.Start(n.planner.ctx, n.planner.txn, n.prefix, n.commitSelect.FieldID)
// only need the head fetcher for non cid specific queries
if !n.commitSelect.Cid.HasValue() {
return n.fetcher.Start(n.planner.ctx, n.planner.txn, n.prefix, n.commitSelect.FieldID)
}
return nil
}

func (n *dagScanNode) Start() error {
Expand Down Expand Up @@ -123,7 +128,10 @@ func (n *dagScanNode) Prefixes(prefixes []keys.Walkable) {
}

func (n *dagScanNode) Close() error {
return n.fetcher.Close()
if !n.commitSelect.Cid.HasValue() {
return n.fetcher.Close()
}
return nil
}

func (n *dagScanNode) Source() planNode { return nil }
Expand Down Expand Up @@ -183,7 +191,14 @@ func (n *dagScanNode) Next() (bool, error) {
if len(n.queuedCids) > 0 {
currentCid = n.queuedCids[0]
n.queuedCids = n.queuedCids[1:(len(n.queuedCids))]
} else {
} else if n.commitSelect.Cid.HasValue() && len(n.visitedNodes) == 0 {
cid, err := cid.Parse(n.commitSelect.Cid.Value())
if err != nil {
return false, err
}

currentCid = &cid
} else if !n.commitSelect.Cid.HasValue() {
cid, err := n.fetcher.FetchNext()
if err != nil || cid == nil {
return false, err
Expand All @@ -192,6 +207,8 @@ func (n *dagScanNode) Next() (bool, error) {
currentCid = cid
// Reset the depthVisited for each head yielded by headset
n.depthVisited = 0
} else {
return false, nil
}

// skip already visited CIDs
Expand All @@ -207,7 +224,7 @@ func (n *dagScanNode) Next() (bool, error) {
// clear the cid after
block, err := store.Get(n.planner.ctx, *currentCid)
if err != nil {
return false, err
return false, errors.Join(ErrMissingCID, err)
}

dagBlock, err := coreblock.GetFromBytes(block.RawData())
Expand All @@ -220,6 +237,18 @@ func (n *dagScanNode) Next() (bool, error) {
return false, err
}

// if this is a time travel query or a latestCommits
// (cid + undefined depth + docId) then we need to make sure the
// target block actually belongs to the doc, since we are
// bypassing the HeadFetcher for the first cid
currentDocID := n.commitSelect.DocumentMapping.FirstOfName(currentValue, request.DocIDArgName)
if n.commitSelect.Cid.HasValue() &&
len(n.visitedNodes) == 0 &&
n.commitSelect.DocID.HasValue() &&
currentDocID != n.commitSelect.DocID.Value() {
return false, ErrIncorrectCIDForDocId
}

// the dagscan node can traverse into the merkle dag
// based on the specified depth limit.
// The default query operation 'latestCommit' only cares about
Expand All @@ -230,7 +259,13 @@ func (n *dagScanNode) Next() (bool, error) {
// HEAD paths.
n.depthVisited++
n.visitedNodes[currentCid.String()] = true // mark the current node as "visited"
if !n.commitSelect.Depth.HasValue() || n.depthVisited < n.commitSelect.Depth.Value() {

// the default behavior for depth is:
// doc ID, max depth
// just doc ID + CID, 0 depth
// doc ID + CID + depth, use depth
if (!n.commitSelect.Depth.HasValue() && !n.commitSelect.Cid.HasValue()) ||
(n.commitSelect.Depth.HasValue() && n.depthVisited < n.commitSelect.Depth.Value()) {
// Insert the newly fetched cids into the slice of queued items, in reverse order
// so that the last new cid will be at the front of the slice
n.queuedCids = append(make([]*cid.Cid, len(dagBlock.Heads)), n.queuedCids...)
Expand All @@ -240,12 +275,6 @@ func (n *dagScanNode) Next() (bool, error) {
}
}

if n.commitSelect.Cid.HasValue() && currentCid.String() != n.commitSelect.Cid.Value() {
// If a specific cid has been requested, and the current item does not
// match, keep searching.
return n.Next()
}

n.currentValue = currentValue
return true, nil
}
Expand Down
2 changes: 2 additions & 0 deletions internal/planner/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ var (
ErrUnknownExplainRequestType = errors.New("can not explain request of unknown type")
ErrUpsertMultipleDocuments = errors.New("cannot upsert multiple matching documents")
ErrMismatchLengthOnSimilarity = errors.New("source and vector must be of the same length")
ErrIncorrectCIDForDocId = errors.New("cid does not belong to document")
ErrMissingCID = errors.New("missing cid")
)

func NewErrUnknownDependency(name string) error {
Expand Down
5 changes: 4 additions & 1 deletion internal/planner/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package planner

import (
"math"
"slices"
"strings"

Expand Down Expand Up @@ -405,9 +406,11 @@ func (n *selectNode) initFields(selectReq *mapper.Select) ([]aggregateNode, []*s
// commit. Instead, _version references the CID
// of that Target version we are querying.
// So instead of a LatestCommit subquery, we need
// a OneCommit subquery, with the supplied parameters.
// a commits query with max depth starting from the
// target CID version
commitSlct.DocID = immutable.Some(selectReq.DocIDs.Value()[0]) // @todo check length
commitSlct.Cid = selectReq.Cid
commitSlct.Depth = immutable.Some(uint64(math.MaxUint64))
}

commitPlan := n.planner.DAGScan(commitSlct)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func TestQueryCommitsBranchables_WithCidAndDocIDParam(t *testing.T) {
Results: map[string]any{
"commits": []map[string]any{},
},
ExpectedError: "cid does not belong to document",
},
},
}
Expand Down
12 changes: 10 additions & 2 deletions tests/integration/query/commits/with_cid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,23 @@ func TestQueryCommitsWithCidForFieldCommit(t *testing.T) {
"age": 21
}`,
},
testUtils.UpdateDoc{
Doc: `{
"name": "Johnn"
}`,
},
testUtils.Request{
Request: `query {
commits(
cid: "bafyreia2vlbfkcbyogdjzmbqcjneabwwwtw7ti2xbd7yor5mbu2sk4pcoy"
cid: "bafyreiexx65zeu6rln4yiw7lav4up5bnfnbkti4kguw3vdencwddqhv45e"
) {
cid
}
}`,
Results: map[string]any{
"commits": []map[string]any{
{
"cid": "bafyreia2vlbfkcbyogdjzmbqcjneabwwwtw7ti2xbd7yor5mbu2sk4pcoy",
"cid": "bafyreiexx65zeu6rln4yiw7lav4up5bnfnbkti4kguw3vdencwddqhv45e",
},
},
},
Expand Down Expand Up @@ -115,6 +120,7 @@ func TestQueryCommitsWithInvalidCid(t *testing.T) {
Results: map[string]any{
"commits": []map[string]any{},
},
ExpectedError: "invalid cid",
},
},
}
Expand Down Expand Up @@ -145,6 +151,7 @@ func TestQueryCommitsWithInvalidShortCid(t *testing.T) {
Results: map[string]any{
"commits": []map[string]any{},
},
ExpectedError: "invalid cid",
},
},
}
Expand Down Expand Up @@ -175,6 +182,7 @@ func TestQueryCommitsWithUnknownCid(t *testing.T) {
Results: map[string]any{
"commits": []map[string]any{},
},
ExpectedError: "missing cid",
},
},
}
Expand Down
52 changes: 51 additions & 1 deletion tests/integration/query/commits/with_doc_id_cid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestQueryCommitsWithDocIDAndCidForDifferentDoc(t *testing.T) {
Results: map[string]any{
"commits": []map[string]any{},
},
ExpectedError: "missing cid",
},
},
}
Expand Down Expand Up @@ -70,14 +71,15 @@ func TestQueryCommitsWithDocIDAndCidForDifferentDocWithUpdate(t *testing.T) {
Request: ` {
commits(
docID: "bae-not-this-doc",
cid: "bafybeica4js2abwqjjrz7dcialbortbz32uxp7ufxu7yljbwvmhjqqxzny"
cid: "bafyreiale6qsjc7qewod3c6h2odwamfwcf7vt4zlqtw7ldcm57xdkgxja4"
Comment on lines -73 to +74
Copy link
Member Author

@jsimnz jsimnz Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just highlighting this for reviewers. This isnt required by the new changes to the commit query semantics. Its actually fixing a unconsequential bug which is using the incorrect CID for the goal of the test. The test wanted to test field IDs, but the original CID was a composite CID.

) {
cid
}
}`,
Results: map[string]any{
"commits": []map[string]any{},
},
ExpectedError: "cid does not belong to document",
},
},
}
Expand Down Expand Up @@ -126,3 +128,51 @@ func TestQueryCommitsWithDocIDAndCidWithUpdate(t *testing.T) {

testUtils.ExecuteTestCase(t, test)
}

func TestQueryCommitsWithDocIDAndCidWithUpdateAndDepth(t *testing.T) {
test := testUtils.TestCase{
Description: "Simple all commits query with docID and cid, with update",
Actions: []any{
updateUserCollectionSchema(),
testUtils.CreateDoc{
CollectionID: 0,
Doc: `{
"name": "John",
"age": 21
}`,
},
testUtils.UpdateDoc{
CollectionID: 0,
DocID: 0,
Doc: `{
"age": 22
}`,
},
// depth is pretty arbitrary here, as long as its big enough to cover the updates
// from the target cid (ie >=2)
testUtils.Request{
Request: ` {
commits(
docID: "bae-c9fb0fa4-1195-589c-aa54-e68333fb90b3",
cid: "bafyreiale6qsjc7qewod3c6h2odwamfwcf7vt4zlqtw7ldcm57xdkgxja4",
depth: 5
) {
cid
}
}`,
Results: map[string]any{
"commits": []map[string]any{
{
"cid": "bafyreiale6qsjc7qewod3c6h2odwamfwcf7vt4zlqtw7ldcm57xdkgxja4",
},
{
"cid": "bafyreia2vlbfkcbyogdjzmbqcjneabwwwtw7ti2xbd7yor5mbu2sk4pcoy",
},
},
},
},
},
}

testUtils.ExecuteTestCase(t, test)
}
11 changes: 11 additions & 0 deletions tests/integration/query/simple/with_cid_doc_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,12 +234,23 @@ func TestQuerySimpleWithUpdateAndMiddleCidAndDocID(t *testing.T) {
docID: "bae-6845cfdf-cb0f-56a3-be3a-b5a67be5fbdc"
) {
name
_version {
cid
}
}
}`,
Results: map[string]any{
"Users": []map[string]any{
{
"name": "Johnn",
"_version": []map[string]any{
{
"cid": "bafyreig2j5zwcozovwzrxr7ivfnptlj7urlabzjbv4lls64hlkh6jmhfim",
},
{
"cid": "bafyreib7afkd5hepl45wdtwwpai433bhnbd3ps5m2rv3masctda7b6mmxe",
},
},
},
},
},
Expand Down
Loading