Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: kvserver: use ClearRawRange to truncate very large Raft logs #75980

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ import (
"go.etcd.io/etcd/raft/v3/tracker"
)

var (
// raftLogTruncationClearRangeThreshold is the number of entries at which Raft
// log truncation uses a Pebble range tombstone rather than point deletes. It
// is set high enough to avoid writing too many range tombstones to Pebble,
// but low enough that we don't do too many point deletes either (in
// particular, we don't want to overflow the Pebble write batch).
//
// In the steady state, Raft log truncation occurs when RaftLogQueueStaleSize
// (64 KB) or RaftLogQueueStaleThreshold (100 entries) is exceeded, so
// truncations are generally small. If followers are lagging, we let the log
// grow to RaftLogTruncationThreshold (16 MB) before truncating.
//
// 100k was chosen because it is unlikely to be hit in most common cases,
// keeping the number of range tombstones low, but will trigger when Raft logs
// have grown abnormally large. RaftLogTruncationThreshold will typically not
// trigger it, unless the average log entry is <= 160 bytes. The key size is
// ~16 bytes, so Pebble point deletion batches will be bounded at ~1.6MB.
raftLogTruncationClearRangeThreshold = uint64(util.ConstantWithMetamorphicTestRange(
"raft-log-truncation-clearrange-threshold", 100000 /* default */, 1 /* min */, 1e6 /* max */))
)

func makeIDKey() kvserverbase.CmdIDKey {
idKeyBuf := make([]byte, 0, raftCommandIDLen)
idKeyBuf = encoding.EncodeUint64Ascending(idKeyBuf, uint64(rand.Int63()))
Expand Down Expand Up @@ -1782,21 +1803,28 @@ func handleTruncatedStateBelowRaft(
// truncation index to the new truncation index. This is performed
// atomically with the raft command application so that the
// TruncatedState index is always consistent with the state of the
// Raft log itself. We can use the distinct writer because we know
// all writes will be to distinct keys.
//
// Intentionally don't use range deletion tombstones (ClearRange())
// due to performance concerns connected to having many range
// deletion tombstones. There is a chance that ClearRange will
// perform well here because the tombstones could be "collapsed",
// but it is hardly worth the risk at this point.
// Raft log itself.
var numTruncatedEntries uint64
if newTruncatedState.Index > oldTruncatedState.Index {
numTruncatedEntries = newTruncatedState.Index - oldTruncatedState.Index
}
prefixBuf := &loader.RangeIDPrefixBuf
for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ {
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState)
if numTruncatedEntries >= raftLogTruncationClearRangeThreshold {
start := prefixBuf.RaftLogKey(oldTruncatedState.Index + 1).Clone()
end := prefixBuf.RaftLogKey(newTruncatedState.Index + 1).Clone() // end is exclusive
if err := readWriter.ClearRawRange(start, end); err != nil {
return false, errors.Wrapf(err,
"unable to clear truncated Raft entries for %+v between indexes %d-%d",
oldTruncatedState, oldTruncatedState.Index+1, newTruncatedState.Index+1)
}
} else {
for idx := oldTruncatedState.Index + 1; idx <= newTruncatedState.Index; idx++ {
// NB: RangeIDPrefixBufs have sufficient capacity (32 bytes) to
// avoid allocating when constructing Raft log keys (16 bytes).
unsafeKey := prefixBuf.RaftLogKey(idx)
if err := readWriter.ClearUnversioned(unsafeKey); err != nil {
return false, errors.Wrapf(err, "unable to clear truncated Raft entries for %+v", newTruncatedState)
}
}
}

Expand Down
65 changes: 49 additions & 16 deletions pkg/kv/kvserver/replica_raft_truncation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,21 @@ package kvserver
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/datadriven"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Expand All @@ -46,6 +49,7 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
datadriven.Walk(t, "testdata/truncated_state_migration", func(t *testing.T, path string) {
const rangeID = 12
loader := stateloader.Make(rangeID)
prefixBuf := &loader.RangeIDPrefixBuf
eng := storage.NewDefaultInMemForTesting()
defer eng.Close()

Expand All @@ -55,9 +59,9 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
d.ScanArgs(t, "index", &prevTruncatedState.Index)
d.ScanArgs(t, "term", &prevTruncatedState.Term)
return ""

case "put":
var index uint64
var term uint64
var index, term uint64
var legacy bool
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)
Expand All @@ -69,16 +73,15 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
}

if legacy {
assert.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState))
require.NoError(t, loader.SetLegacyRaftTruncatedState(ctx, eng, nil, truncState))
} else {
assert.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
require.NoError(t, loader.SetRaftTruncatedState(ctx, eng, truncState))
}
return ""

case "handle":
var buf bytes.Buffer

var index uint64
var term uint64
var index, term uint64
d.ScanArgs(t, "index", &index)
d.ScanArgs(t, "term", &term)

Expand All @@ -87,30 +90,60 @@ func TestHandleTruncatedStateBelowRaft(t *testing.T) {
Term: term,
}

apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
if err != nil {
return err.Error()
// Write log entries at start, middle, end, and above the truncated interval.
if newTruncatedState.Index > prevTruncatedState.Index {
indexes := []uint64{
prevTruncatedState.Index + 1, // start
(newTruncatedState.Index + prevTruncatedState.Index + 1) / 2, // middle
newTruncatedState.Index, // end
newTruncatedState.Index + 1, // new head
}
for _, idx := range indexes {
meta := enginepb.MVCCMetadata{RawBytes: make([]byte, 8)}
binary.BigEndian.PutUint64(meta.RawBytes, idx)
value, err := protoutil.Marshal(&meta)
require.NoError(t, err)
require.NoError(t, eng.PutUnversioned(prefixBuf.RaftLogKey(idx), value))
}
}

// Apply truncation.
apply, err := handleTruncatedStateBelowRaft(ctx, &prevTruncatedState, newTruncatedState, loader, eng, false)
require.NoError(t, err)
fmt.Fprintf(&buf, "apply: %t\n", apply)

// Check the truncated state.
for _, key := range []roachpb.Key{
keys.RaftTruncatedStateLegacyKey(rangeID),
keys.RaftTruncatedStateKey(rangeID),
} {
var truncatedState roachpb.RaftTruncatedState
ok, err := storage.MVCCGetProto(ctx, eng, key, hlc.Timestamp{}, &truncatedState, storage.MVCCGetOptions{})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
if !ok {
continue
}
fmt.Fprintf(&buf, "%s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)
fmt.Fprintf(&buf, "state: %s -> index=%d term=%d\n", key, truncatedState.Index, truncatedState.Term)
}

// Find the first untruncated log entry (the log head).
res, err := storage.MVCCScan(ctx, eng,
prefixBuf.RaftLogPrefix().Clone(),
prefixBuf.RaftLogPrefix().PrefixEnd(),
hlc.Timestamp{},
storage.MVCCScanOptions{MaxKeys: 1})
require.NoError(t, err)
var head roachpb.Key
if len(res.KVs) > 0 {
head = res.KVs[0].Key
}
fmt.Fprintf(&buf, "head: %s\n", head)

return buf.String()

default:
return fmt.Sprintf("unsupported: %s", d.Cmd)
}
return fmt.Sprintf("unsupported: %s", d.Cmd)
})
})
}
16 changes: 13 additions & 3 deletions pkg/kv/kvserver/testdata/truncated_state_migration/migration
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@ prev index=100 term=9
handle index=150 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=150 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# Simulate another truncation that moves forward.

handle index=170 term=9
----
apply: true
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:171

# ... and one that moves backwards and should not take effect.

handle index=150 term=9
----
apply: false
/Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
state: /Local/RangeID/12/u/RaftTruncatedState -> index=170 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works.
handle index=12345678901234567890 term=9
----
apply: true
state: /Local/RangeID/12/u/RaftTruncatedState -> index=12345678901234567890 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,27 @@ put legacy=true index=100 term=9
handle index=100 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Min

# Note that the below aren't actually possible in practice
# as a divergence won't happen before the migration.

handle index=150 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

handle index=60 term=9
----
apply: true
/Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:151

# A huge truncation (beyond raftLogTruncationClearRangeThreshold) also works.
handle index=12345678901234567890 term=9
----
apply: true
state: /Local/RangeID/12/r/RaftTruncatedState -> index=100 term=9
head: /Local/RangeID/12/u/RaftLog/logIndex:12345678901234567891
10 changes: 10 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ func bytesPrefixEnd(b []byte) []byte {
return b
}

// Clone returns a copy of the key.
func (k Key) Clone() Key {
if k == nil {
return nil
}
c := make(Key, len(k))
copy(c, k)
return c
}

// Next returns the next key in lexicographic sort order. The method may only
// take a shallow copy of the Key, so both the receiver and the return
// value should be treated as immutable after.
Expand Down
11 changes: 11 additions & 0 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func makeSynTS(walltime int64, logical int32) hlc.Timestamp {
}
}

func TestKeyClone(t *testing.T) {
k := Key{0x01, 0x02, 0x03}
c := k.Clone()
require.Equal(t, k, c)

k[0] = 0xff
require.NotEqual(t, k, c)

require.Nil(t, Key(nil).Clone())
}

// TestKeyNext tests that the method for creating lexicographic
// successors to byte slices works as expected.
func TestKeyNext(t *testing.T) {
Expand Down