Skip to content

Commit

Permalink
Merge pull request #137700 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.2-135945

release-24.2: kvserver: add setting to reject overly large transactions
  • Loading branch information
andrewbaptist authored Dec 19, 2024
2 parents 1c10546 + b971391 commit 2bfb782
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 15 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -1691,6 +1691,8 @@
<tr><td>APPLICATION</td><td>txn.condensed_intent_spans</td><td>KV transactions that have exceeded their intent tracking memory budget (kv.transaction.max_intents_bytes). See also txn.condensed_intent_spans_gauge for a gauge of such transactions currently running.</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.condensed_intent_spans_gauge</td><td>KV transactions currently running that have exceeded their intent tracking memory budget (kv.transaction.max_intents_bytes). See also txn.condensed_intent_spans for a perpetual counter/rate.</td><td>KV Transactions</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>txn.condensed_intent_spans_rejected</td><td>KV transactions that have been aborted because they exceeded their intent tracking memory budget (kv.transaction.max_intents_bytes). Rejection is caused by kv.transaction.reject_over_max_intents_budget.</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.count_limit_on_response</td><td>KV transactions that have exceeded the count limit on a response</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.count_limit_rejected</td><td>KV transactions that have been aborted because they exceeded the max number of writes and locking reads allowed</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.durations</td><td>KV transaction durations</td><td>KV Txn Duration</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>txn.inflight_locks_over_tracking_budget</td><td>KV transactions whose in-flight writes and locking reads have exceeded the intent tracking memory budget (kv.transaction.max_intents_bytes).</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>txn.parallelcommits</td><td>Number of KV transaction parallel commits</td><td>KV Transactions</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ kv.protectedts.reconciliation.interval duration 5m0s the frequency for reconcili
kv.rangefeed.client.stream_startup_rate integer 100 controls the rate per second the client will initiate new rangefeed stream for a single range; 0 implies unlimited application
kv.rangefeed.closed_timestamp_refresh_interval duration 3s the interval at which closed-timestamp updatesare delivered to rangefeeds; set to 0 to use kv.closed_timestamp.side_transport_interval system-visible
kv.rangefeed.enabled boolean false if set, rangefeed registration is enabled system-visible
kv.transaction.max_intents_and_locks integer 0 maximum count of inserts or durable locks for a single transactions, 0 to disable application
kv.transaction.max_intents_bytes integer 4194304 maximum number of bytes used to track locks in transactions application
kv.transaction.max_refresh_spans_bytes integer 4194304 maximum number of bytes used to track refresh spans in serializable transactions application
kv.transaction.randomized_anchor_key.enabled boolean false dictates whether a transactions anchor key is randomized or not application
Expand Down
1 change: 1 addition & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
<tr><td><div id="setting-kv-replication-reports-interval" class="anchored"><code>kv.replication_reports.interval</code></div></td><td>duration</td><td><code>1m0s</code></td><td>the frequency for generating the replication_constraint_stats, replication_stats_report and replication_critical_localities reports (set to 0 to disable)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-snapshot-rebalance-max-rate" class="anchored"><code>kv.snapshot_rebalance.max_rate</code></div></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-snapshot-receiver-excise-enabled" class="anchored"><code>kv.snapshot_receiver.excise.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to false to disable excises in place of range deletions for KV snapshots</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-intents-and-locks" class="anchored"><code>kv.transaction.max_intents_and_locks</code></div></td><td>integer</td><td><code>0</code></td><td>maximum count of inserts or durable locks for a single transactions, 0 to disable</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-intents-bytes" class="anchored"><code>kv.transaction.max_intents_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-max-refresh-spans-bytes" class="anchored"><code>kv.transaction.max_refresh_spans_bytes</code></div></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-kv-transaction-randomized-anchor-key-enabled" class="anchored"><code>kv.transaction.randomized_anchor_key.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>dictates whether a transactions anchor key is randomized or not</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
Expand Down
79 changes: 68 additions & 11 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,16 @@ var rejectTxnOverTrackedWritesBudget = settings.RegisterBoolSetting(
false,
settings.WithPublic)

// rejectTxnMaxCount will reject transactions if the number of inserts or locks
// exceeds this value. It is preferable to use this setting instead of
// kv.transaction.reject_over_max_intents_budget.enabled.
var rejectTxnMaxCount = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.transaction.max_intents_and_locks",
"maximum count of inserts or durable locks for a single transactions, 0 to disable",
0,
settings.WithPublic)

// txnPipeliner is a txnInterceptor that pipelines transactional writes by using
// asynchronous consensus. The interceptor then tracks all writes that have been
// asynchronously proposed through Raft and ensures that all interfering
Expand Down Expand Up @@ -253,6 +263,11 @@ type txnPipeliner struct {
// contains all keys spans that the transaction will need to eventually
// clean up upon its completion.
lockFootprint condensableSpanSet

// writeCount counts the number of replicated lock acquisitions and intents
// written by this txnPipeliner. This includes both in-flight and successful
// operations.
writeCount int64
}

// condensableSpanSetRangeIterator describes the interface of RangeIterator
Expand Down Expand Up @@ -298,18 +313,17 @@ func (tp *txnPipeliner) SendLocked(
return nil, pErr
}

// If we're configured to reject txns over budget, we pre-emptively check
// If we're configured to reject txns over budget, we preemptively check
// whether this current batch is likely to push us over the edge and, if it
// does, we reject it. Note that this check is not precise because generally
// we can't know exactly the size of the locks that will be taken by a
// request (think ResumeSpan); even if the check passes, we might end up over
// budget.
rejectOverBudget := rejectTxnOverTrackedWritesBudget.Get(&tp.st.SV)
maxBytes := TrackedWritesMaxSize.Get(&tp.st.SV)
if rejectOverBudget {
if err := tp.maybeRejectOverBudget(ba, maxBytes); err != nil {
return nil, kvpb.NewError(err)
}
rejectTxnMaxCount := rejectTxnMaxCount.Get(&tp.st.SV)
if err := tp.maybeRejectOverBudget(ba, maxBytes, rejectOverBudget, rejectTxnMaxCount); err != nil {
return nil, kvpb.NewError(err)
}

ba.AsyncConsensus = tp.canUseAsyncConsensus(ctx, ba)
Expand All @@ -331,7 +345,7 @@ func (tp *txnPipeliner) SendLocked(
// budget. Further requests will be rejected if they attempt to take more
// locks.
if err := tp.updateLockTracking(
ctx, ba, br, pErr, maxBytes, !rejectOverBudget, /* condenseLocksIfOverBudget */
ctx, ba, br, pErr, maxBytes, !rejectOverBudget /* condenseLocksIfOverBudget */, rejectTxnMaxCount,
); err != nil {
return nil, kvpb.NewError(err)
}
Expand All @@ -356,7 +370,9 @@ func (tp *txnPipeliner) SendLocked(
// the transaction commits. If it fails, then we'd add the lock spans to our
// tracking and exceed the budget. It's easier for this code and more
// predictable for the user if we just reject this batch, though.
func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes int64) error {
func (tp *txnPipeliner) maybeRejectOverBudget(
ba *kvpb.BatchRequest, maxBytes int64, rejectIfWouldCondense bool, rejectTxnMaxCount int64,
) error {
// Bail early if the current request is not locking, even if we are already
// over budget. In particular, we definitely want to permit rollbacks. We also
// want to permit lone commits, since the damage in taking too much memory has
Expand All @@ -365,9 +381,20 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
return nil
}

// NB: The reqEstimate is a count the number of spans in this request with
// replicated durability. This is an estimate since accurate accounting
// requires the response as well. For point requests this will be accurate,
// but for scans, we will count 1 for every span. In reality for scans, it
// could be 0 or many replicated locks. When we receive the response we will
// get the actual counts in `updateLockTracking` and update
// `txnPipeliner.writeCount`.
var reqEstimate int64
var spans []roachpb.Span
if err := ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, _ lock.Durability) {
if err := ba.LockSpanIterate(nil /* br */, func(sp roachpb.Span, durability lock.Durability) {
spans = append(spans, sp)
if durability == lock.Replicated {
reqEstimate++
}
}); err != nil {
return errors.Wrap(err, "iterating lock spans")
}
Expand All @@ -378,11 +405,23 @@ func (tp *txnPipeliner) maybeRejectOverBudget(ba *kvpb.BatchRequest, maxBytes in
locksBudget := maxBytes - tp.ifWrites.byteSize()

estimate := tp.lockFootprint.estimateSize(spans, locksBudget)
if estimate > locksBudget {
if rejectIfWouldCondense && estimate > locksBudget {
tp.txnMetrics.TxnsRejectedByLockSpanBudget.Inc(1)
bErr := newLockSpansOverBudgetError(estimate+tp.ifWrites.byteSize(), maxBytes, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}

// This counts from three different sources. The inflight writes are
// included in the tp.writeCount.
estimateCount := tp.writeCount + reqEstimate
// TODO(baptist): We use the same error message as the one above, to avoid
// adding additional encoding and decoding for a backport. We could consider
// splitting this error message in the future.
if rejectTxnMaxCount > 0 && estimateCount > rejectTxnMaxCount {
tp.txnMetrics.TxnsRejectedByCountLimit.Inc(1)
bErr := newLockSpansOverBudgetError(estimateCount, rejectTxnMaxCount, ba)
return pgerror.WithCandidateCode(bErr, pgcode.ConfigurationLimitExceeded)
}
return nil
}

Expand Down Expand Up @@ -677,6 +716,7 @@ func (tp *txnPipeliner) updateLockTracking(
pErr *kvpb.Error,
maxBytes int64,
condenseLocksIfOverBudget bool,
rejectTxnMaxCount int64,
) error {
if err := tp.updateLockTrackingInner(ctx, ba, br, pErr); err != nil {
return err
Expand All @@ -695,6 +735,17 @@ func (tp *txnPipeliner) updateLockTracking(
}
tp.txnMetrics.TxnsInFlightLocksOverTrackingBudget.Inc(1)
}
// Similar to the in-flight writes case above, we may have gone over the
// rejectTxnMaxCount threshold because we don't accurately estimate the
// number of ranged locking reads before sending the request.
if rejectTxnMaxCount > 0 && tp.writeCount > rejectTxnMaxCount {
if tp.inflightOverBudgetEveryN.ShouldLog() || log.ExpensiveLogEnabled(ctx, 2) {
log.Warningf(ctx, "a transaction has exceeded the maximum number of writes "+
"allowed by kv.transaction.max_intents_and_locks: "+
"count: %d, txn: %s, ba: %s", tp.writeCount, ba.Txn, ba.Summary())
}
tp.txnMetrics.TxnsResponseOverCountLimit.Inc(1)
}

// Deal with compacting the lock spans.

Expand Down Expand Up @@ -818,7 +869,7 @@ func (tp *txnPipeliner) updateLockTrackingInner(
if readOnlyReq, ok := req.(kvpb.LockingReadRequest); ok {
str, _ = readOnlyReq.KeyLocking()
}
trackLocks := func(span roachpb.Span, _ lock.Durability) {
trackLocks := func(span roachpb.Span, durability lock.Durability) {
if ba.AsyncConsensus {
// Record any writes that were performed asynchronously. We'll
// need to prove that these succeeded sometime before we commit.
Expand All @@ -831,6 +882,9 @@ func (tp *txnPipeliner) updateLockTrackingInner(
// then add them directly to our lock footprint.
tp.lockFootprint.insert(span)
}
if durability == lock.Replicated {
tp.writeCount++
}
}
if err := kvpb.LockSpanIterate(req, resp, trackLocks); err != nil {
return errors.Wrap(err, "iterating lock spans")
Expand All @@ -840,8 +894,11 @@ func (tp *txnPipeliner) updateLockTrackingInner(
return nil
}

func (tp *txnPipeliner) trackLocks(s roachpb.Span, _ lock.Durability) {
func (tp *txnPipeliner) trackLocks(s roachpb.Span, durability lock.Durability) {
tp.lockFootprint.insert(s)
if durability == lock.Replicated {
tp.writeCount++
}
}

// stripQueryIntents adjusts the BatchResponse to hide the fact that this
Expand Down
89 changes: 85 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2590,11 +2590,13 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
// request is expected to be rejected.
expRejectIdx int
maxSize int64
maxCount int64
}{
{name: "large request",
reqs: []*kvpb.BatchRequest{largeWrite},
expRejectIdx: 0,
maxSize: int64(len(largeAs)) - 1 + roachpb.SpanOverhead,
maxCount: 0,
},
{name: "requests that add up",
reqs: []*kvpb.BatchRequest{
Expand All @@ -2604,7 +2606,17 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
expRejectIdx: 2,
// maxSize is such that first two requests fit and the third one
// goes above the limit.
maxSize: 9 + 2*roachpb.SpanOverhead,
maxSize: 9 + 2*roachpb.SpanOverhead,
maxCount: 0,
},
{name: "requests that count up",
reqs: []*kvpb.BatchRequest{
putBatchNoAsyncConsensus(roachpb.Key("aaaa"), nil),
putBatchNoAsyncConsensus(roachpb.Key("bbbb"), nil),
putBatchNoAsyncConsensus(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
maxSize: 0,
maxCount: 2,
},
{name: "async requests that add up",
// Like the previous test, but this time the requests run with async
Expand All @@ -2616,6 +2628,19 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
putBatch(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{name: "async requests that count up",
// Like the previous test, but this time the requests run with async
// consensus. Being tracked as in-flight writes, this test shows that
// in-flight writes count towards the budget.
reqs: []*kvpb.BatchRequest{
putBatch(roachpb.Key("aaaa"), nil),
putBatch(roachpb.Key("bbbb"), nil),
putBatch(roachpb.Key("cccc"), nil)},
expRejectIdx: 2,
maxSize: 0,
maxCount: 2,
},
{
name: "scan response goes over budget, next request rejected",
Expand All @@ -2625,6 +2650,17 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: 1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "scan response goes over count, next request rejected",
// A request returns a response with many locked keys. Then the next
// request will be rejected.
reqs: []*kvpb.BatchRequest{lockingScanRequest, putBatch(roachpb.Key("a"), nil)},
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: 1,
maxSize: 0,
maxCount: 1,
},
{
name: "scan response goes over budget",
Expand All @@ -2635,6 +2671,18 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: -1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "scan response goes over count",
// Like the previous test, except here we don't have a followup request
// once we're above budget. The test runner will commit the txn, and this
// test checks that committing is allowed.
reqs: []*kvpb.BatchRequest{lockingScanRequest},
resp: []*kvpb.BatchResponse{lockingScanResp},
expRejectIdx: -1,
maxSize: 0,
maxCount: 1,
},
{
name: "del range response goes over budget, next request rejected",
Expand All @@ -2644,6 +2692,17 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: 1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "del range response goes over count, next request rejected",
// A request returns a response with a large set of locked keys, which
// takes up the budget. Then the next request will be rejected.
reqs: []*kvpb.BatchRequest{delRange, putBatch(roachpb.Key("a"), nil)},
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: 1,
maxSize: 0,
maxCount: 1,
},
{
name: "del range response goes over budget",
Expand All @@ -2654,6 +2713,18 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: -1,
maxSize: 10 + roachpb.SpanOverhead,
maxCount: 0,
},
{
name: "del range response goes over count",
// Like the previous test, except here we don't have a followup request
// once we're above budget. The test runner will commit the txn, and this
// test checks that committing is allowed.
reqs: []*kvpb.BatchRequest{delRange},
resp: []*kvpb.BatchResponse{delRangeResp},
expRejectIdx: -1,
maxSize: 0,
maxCount: 1,
},
}
for _, tc := range testCases {
Expand All @@ -2663,8 +2734,13 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
}

tp, mockSender := makeMockTxnPipeliner(nil /* iter */)
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
if tc.maxCount > 0 {
rejectTxnMaxCount.Override(ctx, &tp.st.SV, tc.maxCount)
}
if tc.maxSize > 0 {
TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize)
rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true)
}

txn := makeTxnProto()

Expand Down Expand Up @@ -2703,7 +2779,12 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) {
t.Fatalf("expected lockSpansOverBudgetError, got %+v", pErr.GoError())
}
require.Equal(t, pgcode.ConfigurationLimitExceeded, pgerror.GetPGCode(pErr.GoError()))
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
if tc.maxSize > 0 {
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByLockSpanBudget.Count())
}
if tc.maxCount > 0 {
require.Equal(t, int64(1), tp.txnMetrics.TxnsRejectedByCountLimit.Count())
}

// Make sure rolling back the txn works.
rollback := &kvpb.BatchRequest{}
Expand Down
Loading

0 comments on commit 2bfb782

Please sign in to comment.