From c67b2721a548bf5abce5c7cef24e7b548429c498 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Thu, 13 Jun 2024 16:07:53 -0700 Subject: [PATCH] fix merge retry logic --- events/dag_sync.go | 5 +- internal/db/merge.go | 110 +++++++++++++++++++++++--------------- internal/db/merge_test.go | 23 ++++++++ net/server.go | 43 +-------------- net/server_test.go | 25 --------- 5 files changed, 96 insertions(+), 110 deletions(-) diff --git a/events/dag_sync.go b/events/dag_sync.go index 4ab568b7d0..b8f0137ba2 100644 --- a/events/dag_sync.go +++ b/events/dag_sync.go @@ -14,7 +14,6 @@ import ( "sync" "github.com/ipfs/go-cid" - "github.com/sourcenetwork/immutable" ) @@ -23,6 +22,8 @@ type DAGMergeChannel = immutable.Option[Channel[DAGMerge]] // DAGMerge is a notification that a merge can be performed up to the provided CID. type DAGMerge struct { + // DocID is the unique identifier for the document being merged. + DocID string // Cid is the id of the composite commit that formed this update in the DAG. Cid cid.Cid // SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated. @@ -30,4 +31,6 @@ type DAGMerge struct { // Wg is a wait group that can be used to synchronize the merge, // allowing the caller to optionnaly block until the merge is complete. Wg *sync.WaitGroup + // RetryCount is the number of times this merge has been retried due to a conflict. + RetryCount int } diff --git a/internal/db/merge.go b/internal/db/merge.go index 323f7ae92c..63a728e38c 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -13,6 +13,7 @@ package db import ( "container/list" "context" + "sync" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime/linking" @@ -34,6 +35,7 @@ import ( ) func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) { + queue := newMergeQueue() for { select { case <-ctx.Done(): @@ -43,15 +45,23 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event return } go func() { - err := db.executeMerge(ctx, merge) - if err != nil { - log.ErrorContextE( - ctx, - "Failed to execute merge", - err, - corelog.String("CID", merge.Cid.String()), - corelog.String("Error", err.Error()), - ) + // ensure only one merge per docID + queue.add(merge.DocID) + defer queue.done(merge.DocID) + + // retry merge up to max txn retries + for i := 0; i < db.MaxTxnRetries(); i++ { + err := db.executeMerge(ctx, merge) + if errors.Is(err, badger.ErrTxnConflict) { + continue // retry merge + } + if err != nil { + log.ErrorContextE(ctx, "Failed to execute merge", err, corelog.Any("Merge", merge)) + } + if merge.Wg != nil { + merge.Wg.Done() + } + break // merge completed } }() } @@ -59,12 +69,6 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event } func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error { - defer func() { - // Notify the caller that the merge is complete. - if dagMerge.Wg != nil { - dagMerge.Wg.Done() - } - }() ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -100,35 +104,57 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error return err } - for retry := 0; retry < db.MaxTxnRetries(); retry++ { - err := mp.mergeComposites(ctx) - if err != nil { - return err - } - err = syncIndexedDoc(ctx, docID, col) - if err != nil { - return err - } - err = txn.Commit(ctx) - if err != nil { - if errors.Is(err, badger.ErrTxnConflict) { - txn, err = db.NewTxn(ctx, false) - if err != nil { - return err - } - ctx = SetContextTxn(ctx, txn) - mp.txn = txn - mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage()) - // Reset the CRDTs to avoid reusing the old transaction. - mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT) - continue - } - return err - } - break + err = mp.mergeComposites(ctx) + if err != nil { + return err } - return nil + err = syncIndexedDoc(ctx, docID, col) + if err != nil { + return err + } + + return txn.Commit(ctx) +} + +// mergeQueue is synchronization source to ensure that concurrent +// document merges do not cause transaction conflicts. +type mergeQueue struct { + docs map[string]chan struct{} + mu sync.Mutex +} + +func newMergeQueue() *mergeQueue { + return &mergeQueue{ + docs: make(map[string]chan struct{}), + } +} + +// add adds a docID to the queue. If the docID is already in the queue, it will +// wait for the docID to be removed from the queue. For every add call, done must +// be called to remove the docID from the queue. Otherwise, subsequent add calls will +// block forever. +func (dq *mergeQueue) add(docID string) { + dq.mu.Lock() + done, ok := dq.docs[docID] + if !ok { + dq.docs[docID] = make(chan struct{}) + } + dq.mu.Unlock() + if ok { + <-done + dq.add(docID) + } +} + +func (dq *mergeQueue) done(docID string) { + dq.mu.Lock() + defer dq.mu.Unlock() + done, ok := dq.docs[docID] + if ok { + delete(dq.docs, docID) + close(done) + } } type mergeProcessor struct { diff --git a/internal/db/merge_test.go b/internal/db/merge_test.go index b8671a5171..77a6848dbb 100644 --- a/internal/db/merge_test.go +++ b/internal/db/merge_test.go @@ -13,6 +13,7 @@ package db import ( "context" "testing" + "time" "github.com/fxamacker/cbor/v2" "github.com/ipld/go-ipld-prime" @@ -292,3 +293,25 @@ func encodeValue(val any) []byte { } return b } + +func TestMergeQueue(t *testing.T) { + q := newMergeQueue() + + testDocID := "test" + + q.add(testDocID) + go q.add(testDocID) + // give time for the goroutine to block + time.Sleep(10 * time.Millisecond) + require.Len(t, q.docs, 1) + q.done(testDocID) + // give time for the goroutine to add the docID + time.Sleep(10 * time.Millisecond) + q.mu.Lock() + require.Len(t, q.docs, 1) + q.mu.Unlock() + q.done(testDocID) + q.mu.Lock() + require.Len(t, q.docs, 0) + q.mu.Unlock() +} diff --git a/net/server.go b/net/server.go index 94d791854f..fe2a3ae943 100644 --- a/net/server.go +++ b/net/server.go @@ -51,11 +51,6 @@ type server struct { pubSubEmitter event.Emitter pushLogEmitter event.Emitter - // docQueue is used to track which documents are currently being processed. - // This is used to prevent multiple concurrent processing of the same document and - // limit unecessary transaction conflicts. - docQueue *docQueue - pb.UnimplementedServiceServer } @@ -73,9 +68,6 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { peer: p, conns: make(map[libpeer.ID]*grpc.ClientConn), topics: make(map[string]pubsubTopic), - docQueue: &docQueue{ - docs: make(map[string]chan struct{}), - }, } cred := insecure.NewCredentials() @@ -152,38 +144,6 @@ func (s *server) GetLog(ctx context.Context, req *pb.GetLogRequest) (*pb.GetLogR return nil, nil } -type docQueue struct { - docs map[string]chan struct{} - mu sync.Mutex -} - -// add adds a docID to the queue. If the docID is already in the queue, it will -// wait for the docID to be removed from the queue. For every add call, done must -// be called to remove the docID from the queue. Otherwise, subsequent add calls will -// block forever. -func (dq *docQueue) add(docID string) { - dq.mu.Lock() - done, ok := dq.docs[docID] - if !ok { - dq.docs[docID] = make(chan struct{}) - } - dq.mu.Unlock() - if ok { - <-done - dq.add(docID) - } -} - -func (dq *docQueue) done(docID string) { - dq.mu.Lock() - defer dq.mu.Unlock() - done, ok := dq.docs[docID] - if ok { - delete(dq.docs, docID) - close(done) - } -} - // PushLog receives a push log request func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushLogReply, error) { pid, err := peerIDFromContext(ctx) @@ -199,9 +159,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return nil, err } - s.docQueue.add(docID.String()) defer func() { - s.docQueue.done(docID.String()) if s.pushLogEmitter != nil { byPeer, err := libpeer.Decode(req.Body.Creator) if err != nil { @@ -249,6 +207,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL wg := &sync.WaitGroup{} wg.Add(1) s.peer.db.Events().DAGMerges.Value().Publish(events.DAGMerge{ + DocID: docID.String(), Cid: cid, SchemaRoot: string(req.Body.SchemaRoot), Wg: wg, diff --git a/net/server_test.go b/net/server_test.go index 47d6a68aa8..d17705b404 100644 --- a/net/server_test.go +++ b/net/server_test.go @@ -13,7 +13,6 @@ package net import ( "context" "testing" - "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore/query" @@ -227,30 +226,6 @@ func TestGetHeadLog(t *testing.T) { require.Nil(t, err) } -func TestDocQueue(t *testing.T) { - q := docQueue{ - docs: make(map[string]chan struct{}), - } - - testDocID := "test" - - q.add(testDocID) - go q.add(testDocID) - // give time for the goroutine to block - time.Sleep(10 * time.Millisecond) - require.Len(t, q.docs, 1) - q.done(testDocID) - // give time for the goroutine to add the docID - time.Sleep(10 * time.Millisecond) - q.mu.Lock() - require.Len(t, q.docs, 1) - q.mu.Unlock() - q.done(testDocID) - q.mu.Lock() - require.Len(t, q.docs, 0) - q.mu.Unlock() -} - func getHead(ctx context.Context, db client.DB, docID client.DocID) (cid.Cid, error) { prefix := core.DataStoreKeyFromDocID(docID).ToHeadStoreKey().WithFieldId(core.COMPOSITE_NAMESPACE).ToString() results, err := db.Headstore().Query(ctx, query.Query{Prefix: prefix})