Skip to content

Commit

Permalink
Merge pull request #135279 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3.0-rc-135237

release-24.3.0-rc: rac2,tracker: use CircularBuffer for tracker.Inflights
  • Loading branch information
sumeerbhola authored Nov 15, 2024
2 parents e77da9e + 89e8ccf commit f698fa7
Show file tree
Hide file tree
Showing 13 changed files with 125 additions and 161 deletions.
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "rac2",
srcs = [
"circular_buffer.go",
"log_tracker.go",
"metrics.go",
"priority.go",
Expand All @@ -28,6 +27,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/humanizeutil",
"//pkg/util/log",
Expand All @@ -45,7 +45,6 @@ go_library(
go_test(
name = "rac2_test",
srcs = [
"circular_buffer_test.go",
"log_tracker_test.go",
"priority_test.go",
"range_controller_test.go",
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/raft"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
Expand Down Expand Up @@ -109,7 +110,7 @@ type LogTracker struct {
// - waiting[pri][i].Term <= last.Term
// - waiting[pri][i].Index < waiting[pri][i+1].Index
// - waiting[pri][i].Term <= waiting[pri][i+1].Term
waiting [raftpb.NumPriorities]CircularBuffer[LogMark]
waiting [raftpb.NumPriorities]ring.Buffer[LogMark]
}

// NewLogTracker returns a LogTracker initialized to the given log mark. The
Expand Down Expand Up @@ -354,7 +355,7 @@ func (l *LogTracker) errorf(ctx context.Context, format string, args ...any) {

// truncate updates the slice to be a prefix of the ordered log marks slice,
// with all marks at index > after removed from it.
func truncate(marks *CircularBuffer[LogMark], after uint64) {
func truncate(marks *ring.Buffer[LogMark], after uint64) {
n := marks.Length()
for i := n; i > 0; i-- {
if marks.At(i-1).Index <= after {
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

Expand All @@ -31,7 +32,7 @@ type Tracker struct {
term uint64
// tracked contains the per-priority tracked log entries ordered by log index.
// All the tracked entries are in the term's leader log.
tracked [raftpb.NumPriorities]CircularBuffer[tracked]
tracked [raftpb.NumPriorities]ring.Buffer[tracked]

stream kvflowcontrol.Stream // used for logging only
}
Expand All @@ -45,7 +46,7 @@ type tracked struct {
func (t *Tracker) Init(term uint64, stream kvflowcontrol.Stream) {
*t = Tracker{
term: term,
tracked: [raftpb.NumPriorities]CircularBuffer[tracked]{},
tracked: [raftpb.NumPriorities]ring.Buffer[tracked]{},
stream: stream,
}
}
Expand Down Expand Up @@ -136,7 +137,7 @@ func (t *Tracker) UntrackAll() (returned [raftpb.NumPriorities]kvflowcontrol.Tok
returned[pri] += t.tracked[pri].At(i).tokens
}
}
t.tracked = [raftpb.NumPriorities]CircularBuffer[tracked]{}
t.tracked = [raftpb.NumPriorities]ring.Buffer[tracked]{}
return returned
}

Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/roachpb",
"//pkg/util/admission/admissionpb",
"//pkg/util/buildutil",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/stop",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
package replica_rac2

import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/util/container/ring"
)

// lowPriOverrideState records which raft log entries have their priority
Expand Down Expand Up @@ -69,7 +69,7 @@ type lowPriOverrideState struct {
//
// A call to getEffectivePriority for index i causes a prefix of indices <=
// i to be discarded.
intervals rac2.CircularBuffer[interval]
intervals ring.Buffer[interval]
// Highest term observed so far.
leaderTerm uint64
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/raft/tracker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/syncutil",
"@com_github_cockroachdb_redact//:redact",
Expand All @@ -37,6 +38,7 @@ go_test(
"//pkg/raft/raftlogger",
"//pkg/raft/raftpb",
"//pkg/raft/raftstoreliveness",
"//pkg/util/container/ring",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
68 changes: 15 additions & 53 deletions pkg/raft/tracker/inflights.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package tracker

import "github.com/cockroachdb/cockroach/pkg/util/container/ring"

// inflight describes an in-flight MsgApp message.
type inflight struct {
index uint64 // the index of the last entry inside the message
Expand All @@ -29,10 +31,6 @@ type inflight struct {
// they are sending a new append, and release "quota" via FreeLE() whenever an
// ack is received.
type Inflights struct {
// the starting index in the buffer
start int

count int // number of inflight messages in the buffer
bytes uint64 // number of inflight bytes

// TODO(pav-kv): do not store the limits here, pass them to methods. For flow
Expand All @@ -41,7 +39,7 @@ type Inflights struct {
maxBytes uint64 // the max total byte size of inflight messages

// buffer is a ring buffer containing info about all in-flight messages.
buffer []inflight
buffer ring.Buffer[inflight]
}

// NewInflights sets up an Inflights that allows up to size inflight messages,
Expand All @@ -59,7 +57,7 @@ func NewInflights(size int, maxBytes uint64) *Inflights {
// the receiver.
func (in *Inflights) Clone() *Inflights {
ins := *in
ins.buffer = append([]inflight(nil), in.buffer...)
ins.buffer = in.buffer.Clone()
return &ins
}

Expand All @@ -73,78 +71,42 @@ func (in *Inflights) Clone() *Inflights {
// is implemented at the higher app level. The tracker correctly tracks all the
// in-flight entries.
func (in *Inflights) Add(index, bytes uint64) {
next := in.start + in.count
size := in.size
if next >= size {
next -= size
}
if next >= len(in.buffer) {
in.grow()
}
in.buffer[next] = inflight{index: index, bytes: bytes}
in.count++
in.buffer.Push(inflight{index: index, bytes: bytes})
in.bytes += bytes
}

// grow the inflight buffer by doubling up to inflights.size. We grow on demand
// instead of preallocating to inflights.size to handle systems which have
// thousands of Raft groups per process.
func (in *Inflights) grow() {
newSize := len(in.buffer) * 2
if newSize == 0 {
newSize = 1
} else if newSize > in.size {
newSize = in.size
}
newBuffer := make([]inflight, newSize)
copy(newBuffer, in.buffer)
in.buffer = newBuffer
}

// FreeLE frees the inflights smaller or equal to the given `to` flight.
func (in *Inflights) FreeLE(to uint64) {
if in.count == 0 || to < in.buffer[in.start].index {
n := in.buffer.Length()
if n == 0 || to < in.buffer.At(0).index {
// out of the left side of the window
return
}

idx := in.start
var i int
var bytes uint64
for i = 0; i < in.count; i++ {
if to < in.buffer[idx].index { // found the first large inflight
for i = 0; i < n; i++ {
e := in.buffer.At(i)
if to < e.index { // found the first large inflight
break
}
bytes += in.buffer[idx].bytes

// increase index and maybe rotate
size := in.size
if idx++; idx >= size {
idx -= size
}
bytes += e.bytes
}
// free i inflights and set new start index
in.count -= i
in.buffer.Pop(i)
in.bytes -= bytes
in.start = idx
if in.count == 0 {
// inflights is empty, reset the start index so that we don't grow the
// buffer unnecessarily.
in.start = 0
}
}

// Full returns true if no more messages can be sent at the moment.
func (in *Inflights) Full() bool {
return in.count >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
return in.buffer.Length() >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes)
}

// Count returns the number of inflight messages.
func (in *Inflights) Count() int { return in.count }
func (in *Inflights) Count() int { return in.buffer.Length() }

// reset frees all inflights.
func (in *Inflights) reset() {
in.start = 0
in.count = 0
in.buffer.ShrinkToPrefix(0)
in.bytes = 0
}
Loading

0 comments on commit f698fa7

Please sign in to comment.