Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reapply #8644 #9242

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
17 changes: 15 additions & 2 deletions batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/lightningnetwork/lnd/kvdb"
"github.com/lightningnetwork/lnd/sqldb"
)

// errSolo is a sentinel error indicating that the requester should re-run the
Expand Down Expand Up @@ -55,8 +56,20 @@ func (b *batch) run() {
for i, req := range b.reqs {
err := req.Update(tx)
if err != nil {
failIdx = i
return err
// If we get a serialization error, we
// want the underlying SQL retry
// mechanism to retry the entire batch.
// Otherwise, we can succeed in an
// sqldb retry and still re-execute the
// failing request individually.
dbErr := sqldb.MapSQLError(err)
if !sqldb.IsSerializationError(dbErr) {
failIdx = i

return err
}

return dbErr
aakselrod marked this conversation as resolved.
Show resolved Hide resolved
}
}
return nil
Expand Down
74 changes: 74 additions & 0 deletions batch/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package batch

import (
"errors"
"path/filepath"
"sync"
"testing"
"time"

"github.com/btcsuite/btcwallet/walletdb"
"github.com/lightningnetwork/lnd/kvdb"
"github.com/stretchr/testify/require"
)

func TestRetry(t *testing.T) {
dbDir := t.TempDir()

dbName := filepath.Join(dbDir, "weks.db")
db, err := walletdb.Create(
"bdb", dbName, true, kvdb.DefaultDBTimeout,
)
if err != nil {
t.Fatalf("unable to create walletdb: %v", err)
}
t.Cleanup(func() {
db.Close()
})

var (
mu sync.Mutex
called int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could use atmoic.Uint instead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used a mutex because the underlying TimeScheduler already provides a facility to do so, but I can pass a nil in its place and use an atomic if that's what people prefer? I don't have a strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a nit-picking, so nvm

)
sched := NewTimeScheduler(db, &mu, time.Second)

// First, we construct a request that should retry individually and
// execute it non-lazily. It should still return the error the second
// time.
req := &Request{
Update: func(tx kvdb.RwTx) error {
called++

return errors.New("test")
},
}
err = sched.Execute(req)

// Check and reset the called counter.
mu.Lock()
require.Equal(t, 2, called)
called = 0
mu.Unlock()

require.ErrorContains(t, err, "test")

// Now, we construct a request that should NOT retry because it returns
// a serialization error, which should cause the underlying postgres
// transaction to retry. Since we aren't using postgres, this will
// cause the transaction to not be retried at all.
req = &Request{
Update: func(tx kvdb.RwTx) error {
called++

return errors.New("could not serialize access")
},
}
err = sched.Execute(req)

// Check the called counter.
mu.Lock()
require.Equal(t, 1, called)
mu.Unlock()

require.ErrorContains(t, err, "could not serialize access")
}