Skip to content

Commit

Permalink
add retry for raw kv client put
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Jan 17, 2025
1 parent 20e6bd2 commit 563cabd
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 6 deletions.
2 changes: 2 additions & 0 deletions br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ go_test(
"//br/pkg/mock",
"//br/pkg/restore",
"//br/pkg/restore/internal/import_client",
"//br/pkg/restore/internal/rawkv",
"//br/pkg/restore/split",
"//br/pkg/restore/utils",
"//br/pkg/storage",
Expand Down Expand Up @@ -132,6 +133,7 @@ go_test(
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_log//:log",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//rawkv",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//keepalive",
"@org_golang_google_grpc//status",
Expand Down
28 changes: 22 additions & 6 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,17 @@ import (
"google.golang.org/grpc/keepalive"
)

const MetaKVBatchSize = 64 * 1024 * 1024
const maxSplitKeysOnce = 10240

// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
const rawKVBatchCount = 64
const (
MetaKVBatchSize = 64 * 1024 * 1024
maxSplitKeysOnce = 10240
// rawKVBatchCount specifies the count of entries that the rawkv client puts into TiKV.
rawKVBatchCount = 64

// raw KV retry constants
rawKVMaxRetries = 5
rawKVInitialRetryInterval = 500 * time.Millisecond
rawKVMaxRetryInterval = 5 * time.Second
)

// LogRestoreManager is a comprehensive wrapper that encapsulates all logic related to log restoration,
// including concurrency management, checkpoint handling, and file importing for efficient log processing.
Expand Down Expand Up @@ -1493,7 +1499,7 @@ func (rc *LogClient) restoreMetaKvEntries(
failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
})
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.Ts); err != nil {
return 0, 0, errors.Trace(err)
}
// for failpoint, we need to flush the cache in rawKVClient every time
Expand Down Expand Up @@ -2053,3 +2059,13 @@ func (rc *LogClient) FailpointDoChecksumForLogRestore(

return eg.Wait()
}

func PutRawKvWithRetry(ctx context.Context, client *rawkv.RawKVBatchClient, key, value []byte, originTs uint64) error {
err := utils.WithRetry(ctx, func() error {
return client.Put(ctx, key, value, originTs)
}, utils.NewRawClientBackoffStrategy())
if err != nil {
return errors.Errorf("failed to put raw kv after retry")
}
return nil
}
69 changes: 69 additions & 0 deletions br/pkg/restore/log_client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/br/pkg/gluetidb"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
rawclient "github.com/pingcap/tidb/br/pkg/restore/internal/rawkv"
logclient "github.com/pingcap/tidb/br/pkg/restore/log_client"
"github.com/pingcap/tidb/br/pkg/restore/split"
"github.com/pingcap/tidb/br/pkg/restore/utils"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/pkg/util/sqlexec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -1986,3 +1988,70 @@ func fakeRowKey(tableID, rowID int64) kv.Key {
func fakeRowRawKey(tableID, rowID int64) kv.Key {
return tablecodec.EncodeRecordKey(tablecodec.GenTableRecordPrefix(tableID), kv.IntHandle(rowID))
}

type mockRawKVClient struct {
rawkv.Client
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := rawclient.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := logclient.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
19 changes: 19 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
recoveryMaxAttempts = 16
recoveryDelayTime = 30 * time.Second
recoveryMaxDelayTime = 4 * time.Minute

rawClientMaxAttempts = 5
rawClientDelayTime = 500 * time.Millisecond
rawClientMaxDelayTime = 5 * time.Second
)

// BackoffStrategy implements a backoff strategy for retry operations.
Expand Down Expand Up @@ -343,6 +347,7 @@ func NewDiskCheckBackoffStrategy() BackoffStrategy {
return NewBackoffStrategy(
WithRemainingAttempts(resetTSRetryTime),
WithDelayTime(resetTSWaitInterval),
WithMaxDelayTime(resetTSMaxWaitInterval),
WithErrorContext(NewZeroRetryContext("disk check")),
WithRetryErrorFunc(isRetryErrFunc),
WithNonRetryErrorFunc(alwaysFalseFunc()),
Expand All @@ -353,6 +358,7 @@ func NewRecoveryBackoffStrategy(isRetryErrFunc func(error) bool) BackoffStrategy
return NewBackoffStrategy(
WithRemainingAttempts(recoveryMaxAttempts),
WithDelayTime(recoveryDelayTime),
WithMaxDelayTime(recoveryMaxDelayTime),
WithErrorContext(NewZeroRetryContext("recovery")),
WithRetryErrorFunc(isRetryErrFunc),
WithNonRetryErrorFunc(alwaysFalseFunc()),
Expand All @@ -363,6 +369,7 @@ func NewFlashBackBackoffStrategy() BackoffStrategy {
return NewBackoffStrategy(
WithRemainingAttempts(FlashbackRetryTime),
WithDelayTime(FlashbackWaitInterval),
WithMaxDelayTime(FlashbackMaxWaitInterval),
WithErrorContext(NewZeroRetryContext("flashback")),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(alwaysFalseFunc()),
Expand All @@ -373,12 +380,24 @@ func NewChecksumBackoffStrategy() BackoffStrategy {
return NewBackoffStrategy(
WithRemainingAttempts(ChecksumRetryTime),
WithDelayTime(ChecksumWaitInterval),
WithMaxDelayTime(ChecksumMaxWaitInterval),
WithErrorContext(NewZeroRetryContext("checksum")),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func NewRawClientBackoffStrategy() BackoffStrategy {
return NewBackoffStrategy(
WithRemainingAttempts(rawClientMaxAttempts),
WithDelayTime(rawClientDelayTime),
WithMaxDelayTime(rawClientMaxDelayTime),
WithErrorContext(NewZeroRetryContext("raw client")),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func (bo *backoffStrategyImpl) NextBackoff(err error) time.Duration {
errs := multierr.Errors(err)
lastErr := errs[len(errs)-1]
Expand Down

0 comments on commit 563cabd

Please sign in to comment.