Skip to content

Commit

Permalink
Merge pull request #119560 from cockroachdb/blathers/backport-staging…
Browse files Browse the repository at this point in the history
…-v22.2.19-119545

staging-v22.2.19: release-22.2: kvserver: refresh range cache on rangefeed barrier failure
  • Loading branch information
erikgrinaker authored Feb 22, 2024
2 parents a4ba2c3 + 1397daf commit c207600
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 3 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/raftutil",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
"//pkg/kv/kvserver/readsummary/rspb",
"//pkg/kv/kvserver/replicastats",
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/split"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -550,3 +552,13 @@ func WatchForDisappearingReplicas(t testing.TB, store *Store) {
}
}
}

func NewRangefeedTxnPusher(
ir *intentresolver.IntentResolver, r *Replica, span roachpb.RSpan,
) rangefeed.TxnPusher {
return &rangefeedTxnPusher{
ir: ir,
r: r,
span: span,
}
}
15 changes: 12 additions & 3 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,19 @@ func (tp *rangefeedTxnPusher) Barrier(ctx context.Context) error {
// Execute a Barrier on the leaseholder, and obtain its LAI. Error out on any
// range changes (e.g. splits/merges) that we haven't applied yet.
lai, desc, err := tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey)
if err != nil {
if errors.HasType(err, &roachpb.RangeKeyMismatchError{}) {
return errors.Wrap(err, "range barrier failed, range split")
if err != nil && errors.HasType(err, &roachpb.RangeKeyMismatchError{}) {
// The DistSender may have a stale range descriptor, e.g. following a merge.
// Failed unsplittable requests don't trigger a refresh, so we have to
// attempt to refresh it by sending a Get request to the start key.
//
// TODO(erikgrinaker): the DistSender should refresh its cache instead.
if _, err := tp.r.store.db.Get(ctx, tp.span.Key); err != nil {
return errors.Wrap(err, "range barrier failed: range descriptor refresh failed")
}
// Retry the Barrier.
lai, desc, err = tp.r.store.db.BarrierWithLAI(ctx, tp.span.Key, tp.span.EndKey)
}
if err != nil {
return errors.Wrap(err, "range barrier failed")
}
if lai == 0 {
Expand Down
100 changes: 100 additions & 0 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
Expand Down Expand Up @@ -1600,3 +1601,102 @@ func TestNewRangefeedForceLeaseRetry(t *testing.T) {
rangeFeedCancel()

}

// TestRangefeedTxnPusherBarrierRangeKeyMismatch is a regression test for
// https://github.com/cockroachdb/cockroach/issues/119333
//
// Specifically, it tests that a Barrier call that encounters a
// RangeKeyMismatchError will eagerly attempt to refresh the DistSender range
// cache. The DistSender does not do this itself for unsplittable requests, so
// it might otherwise continually fail.
func TestRangefeedTxnPusherBarrierRangeKeyMismatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderRace(t) // too slow, times out
skip.UnderDeadlock(t)

// Use a timeout, to prevent a hung test.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Start a cluster with 3 nodes.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{},
},
},
})
defer tc.Stopper().Stop(ctx)
defer cancel()

n1 := tc.Server(0)
n3 := tc.Server(2)
db1 := n1.DB()
db3 := n3.DB()

// Split off a range and upreplicate it, with leaseholder on n1. This is the
// range we'll run the barrier across.
prefix := keys.ScratchRangeMin.Clone()
_, _, err := n1.SplitRange(prefix)
require.NoError(t, err)
desc := tc.AddVotersOrFatal(t, prefix, tc.Targets(1, 2)...)
t.Logf("split off range %s", desc)

rspan := desc.RSpan()
span := rspan.AsRawSpanWithNoLocals()

// Split off three other ranges.
splitKeys := []roachpb.Key{
append(prefix.Clone(), roachpb.Key("/a")...),
append(prefix.Clone(), roachpb.Key("/b")...),
append(prefix.Clone(), roachpb.Key("/c")...),
}
for _, key := range splitKeys {
_, desc, err = n1.SplitRange(key)
require.NoError(t, err)
t.Logf("split off range %s", desc)
}

// Scan the ranges on n3 to update the range caches, then run a barrier
// request which should fail with RangeKeyMismatchError.
_, err = db3.Scan(ctx, span.Key, span.EndKey, 0)
require.NoError(t, err)

_, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey)
require.Error(t, err)
require.IsType(t, &roachpb.RangeKeyMismatchError{}, err)
t.Logf("n3 barrier returned %s", err)

// Merge the ranges on n1.
for range splitKeys {
desc, err = n1.MergeRanges(span.Key)
require.NoError(t, err)
t.Logf("merged range %s", desc)
}

// Barriers should now succeed on n1, which have an updated range cache, but
// fail on n3 which doesn't.
lai, _, err := db1.BarrierWithLAI(ctx, span.Key, span.EndKey)
require.NoError(t, err)
t.Logf("n1 barrier returned LAI %d", lai)

// NB: this could potentially flake if n3 somehow updates its range cache. If
// it does, we can remove this assertion, but it's nice to make sure we're
// actually testing what we think we're testing.
_, _, err = db3.BarrierWithLAI(ctx, span.Key, span.EndKey)
require.Error(t, err)
require.IsType(t, &roachpb.RangeKeyMismatchError{}, err)
t.Logf("n3 barrier returned %s", err)

// However, rangefeedTxnPusher.Barrier() will refresh the cache and
// successfully apply the barrier.
s3 := tc.GetFirstStoreFromServer(t, 2)
repl3 := s3.LookupReplica(roachpb.RKey(span.Key))
t.Logf("repl3 desc: %s", repl3.Desc())
txnPusher := kvserver.NewRangefeedTxnPusher(nil, repl3, rspan)
require.NoError(t, txnPusher.Barrier(ctx))
t.Logf("n3 txnPusher barrier succeeded")
}

0 comments on commit c207600

Please sign in to comment.