Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Merge retry logic #2719

Merged
merged 16 commits into from
Jun 17, 2024
3 changes: 2 additions & 1 deletion events/dag_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/immutable"
)

Expand All @@ -23,6 +22,8 @@ type DAGMergeChannel = immutable.Option[Channel[DAGMerge]]

// DAGMerge is a notification that a merge can be performed up to the provided CID.
type DAGMerge struct {
// DocID is the unique identifier for the document being merged.
DocID string
// Cid is the id of the composite commit that formed this update in the DAG.
Cid cid.Cid
// SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated.
Expand Down
107 changes: 70 additions & 37 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import (
"container/list"
"context"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/linking"
Expand All @@ -34,6 +35,7 @@
)

func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) {
queue := newMergeQueue()
for {
select {
case <-ctx.Done():
Expand All @@ -43,28 +45,37 @@
return
}
go func() {
err := db.executeMerge(ctx, merge)
// ensure only one merge per docID
queue.add(merge.DocID)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: If the DocID is now part of the message, we can get rid of the getDocIDFromBlock function. The reason I hadn't included DocID before is that I wanted to guarantee that there was no mistake with it. Like the wrong DocID for the provided CID. That was probably a bad reason when I think about it.

defer queue.done(merge.DocID)

var err error
// retry merge up to max txn retries
for i := 0; i < db.MaxTxnRetries(); i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: If you use the merge queue, I expect that a retry will never be needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user updates a doc while a merge is in progress it could still happen.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅 very true

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user updates a doc while a merge is in progress it could still happen.

I'd be super nice / reminder to document that here.

err = db.executeMerge(ctx, merge)
if errors.Is(err, badger.ErrTxnConflict) {
continue // retry merge
}
break // merge success or error
}

if err != nil {
log.ErrorContextE(
ctx,
"Failed to execute merge",
err,
corelog.String("CID", merge.Cid.String()),
corelog.String("Error", err.Error()),
)
corelog.Any("Error", err),
corelog.Any("Event", merge))

Check warning on line 68 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L67-L68

Added lines #L67 - L68 were not covered by tests
}
if merge.Wg != nil {
merge.Wg.Done()
}
}()
}
}
}

func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error {
defer func() {
// Notify the caller that the merge is complete.
if dagMerge.Wg != nil {
dagMerge.Wg.Done()
}
}()
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return err
Expand Down Expand Up @@ -100,35 +111,57 @@
return err
}

for retry := 0; retry < db.MaxTxnRetries(); retry++ {
err := mp.mergeComposites(ctx)
if err != nil {
return err
}
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}
err = txn.Commit(ctx)
if err != nil {
if errors.Is(err, badger.ErrTxnConflict) {
txn, err = db.NewTxn(ctx, false)
if err != nil {
return err
}
ctx = SetContextTxn(ctx, txn)
mp.txn = txn
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage())
// Reset the CRDTs to avoid reusing the old transaction.
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT)
continue
}
return err
}
break
err = mp.mergeComposites(ctx)
if err != nil {
return err

Check warning on line 116 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L116

Added line #L116 was not covered by tests
}

return nil
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err

Check warning on line 121 in internal/db/merge.go

View check run for this annotation

Codecov / codecov/patch

internal/db/merge.go#L121

Added line #L121 was not covered by tests
}

return txn.Commit(ctx)
}

// mergeQueue is synchronization source to ensure that concurrent
// document merges do not cause transaction conflicts.
type mergeQueue struct {
docs map[string]chan struct{}
mu sync.Mutex
}

func newMergeQueue() *mergeQueue {
return &mergeQueue{
docs: make(map[string]chan struct{}),
}
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// block forever.
func (dq *mergeQueue) add(docID string) {
dq.mu.Lock()
done, ok := dq.docs[docID]
if !ok {
dq.docs[docID] = make(chan struct{})
}
dq.mu.Unlock()
if ok {
<-done
dq.add(docID)
}
}

func (dq *mergeQueue) done(docID string) {
dq.mu.Lock()
defer dq.mu.Unlock()
done, ok := dq.docs[docID]
if ok {
delete(dq.docs, docID)
close(done)
}
}

type mergeProcessor struct {
Expand Down
23 changes: 23 additions & 0 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package db
import (
"context"
"testing"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -292,3 +293,25 @@ func encodeValue(val any) []byte {
}
return b
}

func TestMergeQueue(t *testing.T) {
q := newMergeQueue()

testDocID := "test"

q.add(testDocID)
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mu.Lock()
require.Len(t, q.docs, 1)
q.mu.Unlock()
q.done(testDocID)
q.mu.Lock()
require.Len(t, q.docs, 0)
q.mu.Unlock()
}
43 changes: 1 addition & 42 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,6 @@ type server struct {
pubSubEmitter event.Emitter
pushLogEmitter event.Emitter

Comment on lines 52 to 53
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: If you remove the docQueue, it opens the door for transaction conflicts on the dag sync process. One of the test matrix run failed for this exact reason. I suggest you keep it in :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still there but in the db package instead. I think it makes more sense to have the blocking occur there.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that if the conflict happens in exactly the wrong way, blocks will be missing from the blockstore. I wanted to remove it when I refactored the dag sync process and I found out that we have to keep it in. Alternatively we need to change the dags sync logic to deal with conflicts.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right I remember now. Would a simple fix be to ignore transaction conflict errors?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetBlocks doesn't handle errors very well. Even GetBlock doesn't return the block on put conflict so we would need to retry. It would be an efficien retry because the block would be found locally on the next try. I'll let you decide what to do from here :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The updated sync process should now handle conflicts.

// docQueue is used to track which documents are currently being processed.
// This is used to prevent multiple concurrent processing of the same document and
// limit unecessary transaction conflicts.
docQueue *docQueue

pb.UnimplementedServiceServer
}

Expand All @@ -73,9 +68,6 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) {
peer: p,
conns: make(map[libpeer.ID]*grpc.ClientConn),
topics: make(map[string]pubsubTopic),
docQueue: &docQueue{
docs: make(map[string]chan struct{}),
},
}

cred := insecure.NewCredentials()
Expand Down Expand Up @@ -152,38 +144,6 @@ func (s *server) GetLog(ctx context.Context, req *pb.GetLogRequest) (*pb.GetLogR
return nil, nil
}

type docQueue struct {
docs map[string]chan struct{}
mu sync.Mutex
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// block forever.
func (dq *docQueue) add(docID string) {
dq.mu.Lock()
done, ok := dq.docs[docID]
if !ok {
dq.docs[docID] = make(chan struct{})
}
dq.mu.Unlock()
if ok {
<-done
dq.add(docID)
}
}

func (dq *docQueue) done(docID string) {
dq.mu.Lock()
defer dq.mu.Unlock()
done, ok := dq.docs[docID]
if ok {
delete(dq.docs, docID)
close(done)
}
}

// PushLog receives a push log request
func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushLogReply, error) {
pid, err := peerIDFromContext(ctx)
Expand All @@ -199,9 +159,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
return nil, err
}

s.docQueue.add(docID.String())
defer func() {
s.docQueue.done(docID.String())
if s.pushLogEmitter != nil {
byPeer, err := libpeer.Decode(req.Body.Creator)
if err != nil {
Expand Down Expand Up @@ -249,6 +207,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
wg := &sync.WaitGroup{}
wg.Add(1)
s.peer.db.Events().DAGMerges.Value().Publish(events.DAGMerge{
DocID: docID.String(),
Cid: cid,
SchemaRoot: string(req.Body.SchemaRoot),
Wg: wg,
Expand Down
25 changes: 0 additions & 25 deletions net/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package net
import (
"context"
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore/query"
Expand Down Expand Up @@ -227,30 +226,6 @@ func TestGetHeadLog(t *testing.T) {
require.Nil(t, err)
}

func TestDocQueue(t *testing.T) {
q := docQueue{
docs: make(map[string]chan struct{}),
}

testDocID := "test"

q.add(testDocID)
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mu.Lock()
require.Len(t, q.docs, 1)
q.mu.Unlock()
q.done(testDocID)
q.mu.Lock()
require.Len(t, q.docs, 0)
q.mu.Unlock()
}

func getHead(ctx context.Context, db client.DB, docID client.DocID) (cid.Cid, error) {
prefix := core.DataStoreKeyFromDocID(docID).ToHeadStoreKey().WithFieldId(core.COMPOSITE_NAMESPACE).ToString()
results, err := db.Headstore().Query(ctx, query.Query{Prefix: prefix})
Expand Down
Loading