From 89e8ccf27d494e202b9e6c53ee5d644e02c0ca56 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Thu, 14 Nov 2024 23:56:21 +0000 Subject: [PATCH] rac2,tracker: use CircularBuffer for tracker.Inflights The custom circular/ring buffer implementation in Raft's tracker.Inflights does not shrink and unnecessarily grows to the maximum configured size (unless the buffer becomes empty). With RAC2, the maximum size is no longer a maximum, in lazy replication mode. There was a bug in incorrectly trying to impose this maximum, which motivated this slightly bigger cleanup. In general, it is beneficial for the inflight buffer to also shrink, and by using the existing CircularBuffer, we can simplify the code in tracker.Inflights. As part of this change rac2.CircularBuffer is moved to util/container/ring and becomes ring.Buffer. There is another ring.Buffer in util/ring, which doesn't shrink and is not optimized to use bit arithmetic -- there is a todo added there to replace it with the implementation in util/container/ring. Fixes #135223 Epic: none Release note: None --- .../kvserver/kvflowcontrol/rac2/BUILD.bazel | 3 +- .../kvflowcontrol/rac2/log_tracker.go | 5 +- .../kvflowcontrol/rac2/token_tracker.go | 7 +- .../kvflowcontrol/replica_rac2/BUILD.bazel | 1 + .../kvflowcontrol/replica_rac2/admission.go | 4 +- pkg/raft/tracker/BUILD.bazel | 2 + pkg/raft/tracker/inflights.go | 68 +++------- pkg/raft/tracker/inflights_test.go | 127 +++++++----------- pkg/util/container/ring/BUILD.bazel | 17 ++- .../container/ring/buffer.go} | 28 ++-- .../container/ring/buffer_test.go} | 18 ++- .../container/ring/testdata/buffer} | 0 pkg/util/ring/ring_buffer.go | 6 +- 13 files changed, 125 insertions(+), 161 deletions(-) rename pkg/{kv/kvserver/kvflowcontrol/rac2/circular_buffer.go => util/container/ring/buffer.go} (89%) rename pkg/{kv/kvserver/kvflowcontrol/rac2/circular_buffer_test.go => util/container/ring/buffer_test.go} (87%) rename pkg/{kv/kvserver/kvflowcontrol/rac2/testdata/circular_buffer => util/container/ring/testdata/buffer} (100%) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel index 3cc9def99165..2c78422bb6ec 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/BUILD.bazel @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "rac2", srcs = [ - "circular_buffer.go", "log_tracker.go", "metrics.go", "priority.go", @@ -28,6 +27,7 @@ go_library( "//pkg/settings/cluster", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/humanizeutil", "//pkg/util/log", @@ -45,7 +45,6 @@ go_library( go_test( name = "rac2_test", srcs = [ - "circular_buffer_test.go", "log_tracker_test.go", "priority_test.go", "range_controller_test.go", diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go index f61bed4de202..40c11181b408 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/log_tracker.go @@ -13,6 +13,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/raft" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -109,7 +110,7 @@ type LogTracker struct { // - waiting[pri][i].Term <= last.Term // - waiting[pri][i].Index < waiting[pri][i+1].Index // - waiting[pri][i].Term <= waiting[pri][i+1].Term - waiting [raftpb.NumPriorities]CircularBuffer[LogMark] + waiting [raftpb.NumPriorities]ring.Buffer[LogMark] } // NewLogTracker returns a LogTracker initialized to the given log mark. The @@ -354,7 +355,7 @@ func (l *LogTracker) errorf(ctx context.Context, format string, args ...any) { // truncate updates the slice to be a prefix of the ordered log marks slice, // with all marks at index > after removed from it. -func truncate(marks *CircularBuffer[LogMark], after uint64) { +func truncate(marks *ring.Buffer[LogMark], after uint64) { n := marks.Length() for i := n; i > 0; i-- { if marks.At(i-1).Index <= after { diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go index 6697ca75f61e..96b5cae71316 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/token_tracker.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" "github.com/cockroachdb/cockroach/pkg/util/log" ) @@ -31,7 +32,7 @@ type Tracker struct { term uint64 // tracked contains the per-priority tracked log entries ordered by log index. // All the tracked entries are in the term's leader log. - tracked [raftpb.NumPriorities]CircularBuffer[tracked] + tracked [raftpb.NumPriorities]ring.Buffer[tracked] stream kvflowcontrol.Stream // used for logging only } @@ -45,7 +46,7 @@ type tracked struct { func (t *Tracker) Init(term uint64, stream kvflowcontrol.Stream) { *t = Tracker{ term: term, - tracked: [raftpb.NumPriorities]CircularBuffer[tracked]{}, + tracked: [raftpb.NumPriorities]ring.Buffer[tracked]{}, stream: stream, } } @@ -136,7 +137,7 @@ func (t *Tracker) UntrackAll() (returned [raftpb.NumPriorities]kvflowcontrol.Tok returned[pri] += t.tracked[pri].At(i).tokens } } - t.tracked = [raftpb.NumPriorities]CircularBuffer[tracked]{} + t.tracked = [raftpb.NumPriorities]ring.Buffer[tracked]{} return returned } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel index c10e7bde5082..9892e7cbaec3 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//pkg/roachpb", "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go index a14d248ceb9f..73b96c3e77a2 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/admission.go @@ -6,8 +6,8 @@ package replica_rac2 import ( - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/rac2" "github.com/cockroachdb/cockroach/pkg/raft/raftpb" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" ) // lowPriOverrideState records which raft log entries have their priority @@ -69,7 +69,7 @@ type lowPriOverrideState struct { // // A call to getEffectivePriority for index i causes a prefix of indices <= // i to be discarded. - intervals rac2.CircularBuffer[interval] + intervals ring.Buffer[interval] // Highest term observed so far. leaderTerm uint64 } diff --git a/pkg/raft/tracker/BUILD.bazel b/pkg/raft/tracker/BUILD.bazel index d7c8e6bbd70d..4608fc6f2633 100644 --- a/pkg/raft/tracker/BUILD.bazel +++ b/pkg/raft/tracker/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//pkg/raft/raftlogger", "//pkg/raft/raftpb", "//pkg/raft/raftstoreliveness", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/syncutil", "@com_github_cockroachdb_redact//:redact", @@ -37,6 +38,7 @@ go_test( "//pkg/raft/raftlogger", "//pkg/raft/raftpb", "//pkg/raft/raftstoreliveness", + "//pkg/util/container/ring", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", diff --git a/pkg/raft/tracker/inflights.go b/pkg/raft/tracker/inflights.go index b9bb090ed9ed..f2f38bedf247 100644 --- a/pkg/raft/tracker/inflights.go +++ b/pkg/raft/tracker/inflights.go @@ -17,6 +17,8 @@ package tracker +import "github.com/cockroachdb/cockroach/pkg/util/container/ring" + // inflight describes an in-flight MsgApp message. type inflight struct { index uint64 // the index of the last entry inside the message @@ -29,10 +31,6 @@ type inflight struct { // they are sending a new append, and release "quota" via FreeLE() whenever an // ack is received. type Inflights struct { - // the starting index in the buffer - start int - - count int // number of inflight messages in the buffer bytes uint64 // number of inflight bytes // TODO(pav-kv): do not store the limits here, pass them to methods. For flow @@ -41,7 +39,7 @@ type Inflights struct { maxBytes uint64 // the max total byte size of inflight messages // buffer is a ring buffer containing info about all in-flight messages. - buffer []inflight + buffer ring.Buffer[inflight] } // NewInflights sets up an Inflights that allows up to size inflight messages, @@ -59,7 +57,7 @@ func NewInflights(size int, maxBytes uint64) *Inflights { // the receiver. func (in *Inflights) Clone() *Inflights { ins := *in - ins.buffer = append([]inflight(nil), in.buffer...) + ins.buffer = in.buffer.Clone() return &ins } @@ -73,78 +71,42 @@ func (in *Inflights) Clone() *Inflights { // is implemented at the higher app level. The tracker correctly tracks all the // in-flight entries. func (in *Inflights) Add(index, bytes uint64) { - next := in.start + in.count - size := in.size - if next >= size { - next -= size - } - if next >= len(in.buffer) { - in.grow() - } - in.buffer[next] = inflight{index: index, bytes: bytes} - in.count++ + in.buffer.Push(inflight{index: index, bytes: bytes}) in.bytes += bytes } -// grow the inflight buffer by doubling up to inflights.size. We grow on demand -// instead of preallocating to inflights.size to handle systems which have -// thousands of Raft groups per process. -func (in *Inflights) grow() { - newSize := len(in.buffer) * 2 - if newSize == 0 { - newSize = 1 - } else if newSize > in.size { - newSize = in.size - } - newBuffer := make([]inflight, newSize) - copy(newBuffer, in.buffer) - in.buffer = newBuffer -} - // FreeLE frees the inflights smaller or equal to the given `to` flight. func (in *Inflights) FreeLE(to uint64) { - if in.count == 0 || to < in.buffer[in.start].index { + n := in.buffer.Length() + if n == 0 || to < in.buffer.At(0).index { // out of the left side of the window return } - idx := in.start var i int var bytes uint64 - for i = 0; i < in.count; i++ { - if to < in.buffer[idx].index { // found the first large inflight + for i = 0; i < n; i++ { + e := in.buffer.At(i) + if to < e.index { // found the first large inflight break } - bytes += in.buffer[idx].bytes - - // increase index and maybe rotate - size := in.size - if idx++; idx >= size { - idx -= size - } + bytes += e.bytes } // free i inflights and set new start index - in.count -= i + in.buffer.Pop(i) in.bytes -= bytes - in.start = idx - if in.count == 0 { - // inflights is empty, reset the start index so that we don't grow the - // buffer unnecessarily. - in.start = 0 - } } // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { - return in.count >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) + return in.buffer.Length() >= in.size || (in.maxBytes != 0 && in.bytes >= in.maxBytes) } // Count returns the number of inflight messages. -func (in *Inflights) Count() int { return in.count } +func (in *Inflights) Count() int { return in.buffer.Length() } // reset frees all inflights. func (in *Inflights) reset() { - in.start = 0 - in.count = 0 + in.buffer.ShrinkToPrefix(0) in.bytes = 0 } diff --git a/pkg/raft/tracker/inflights_test.go b/pkg/raft/tracker/inflights_test.go index dafcd15d2a08..036b9971b2ab 100644 --- a/pkg/raft/tracker/inflights_test.go +++ b/pkg/raft/tracker/inflights_test.go @@ -20,14 +20,25 @@ package tracker import ( "testing" + "github.com/cockroachdb/cockroach/pkg/util/container/ring" "github.com/stretchr/testify/require" ) +func checkEquality(t *testing.T, expected Inflights, actual Inflights) { + expBuf := expected.buffer + actualBuf := actual.buffer + expected.buffer = ring.Buffer[inflight]{} + actual.buffer = ring.Buffer[inflight]{} + require.Equal(t, expected, actual) + require.Equal(t, expBuf.Length(), actualBuf.Length()) + for i := 0; i < expBuf.Length(); i++ { + require.Equal(t, expBuf.At(i), actualBuf.At(i)) + } +} + func TestInflightsAdd(t *testing.T) { - // no rotating case in := &Inflights{ - size: 10, - buffer: make([]inflight, 10), + size: 10, } for i := 0; i < 5; i++ { @@ -35,24 +46,20 @@ func TestInflightsAdd(t *testing.T) { } wantIn := &Inflights{ - start: 0, - count: 5, bytes: 510, size: 10, buffer: inflightsBuffer( // ↓------------ - []uint64{0, 1, 2, 3, 4, 0, 0, 0, 0, 0}, - []uint64{100, 101, 102, 103, 104, 0, 0, 0, 0, 0}), + []uint64{0, 1, 2, 3, 4}, + []uint64{100, 101, 102, 103, 104}), } - require.Equal(t, wantIn, in) + checkEquality(t, *wantIn, *in) for i := 5; i < 10; i++ { in.Add(uint64(i), uint64(100+i)) } wantIn2 := &Inflights{ - start: 0, - count: 10, bytes: 1045, size: 10, buffer: inflightsBuffer( @@ -60,50 +67,25 @@ func TestInflightsAdd(t *testing.T) { []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), } - require.Equal(t, wantIn2, in) - - // rotating case - in2 := &Inflights{ - start: 5, - size: 10, - buffer: make([]inflight, 10), - } - - for i := 0; i < 5; i++ { - in2.Add(uint64(i), uint64(100+i)) - } + checkEquality(t, *wantIn2, *in) - wantIn21 := &Inflights{ - start: 5, - count: 5, - bytes: 510, - size: 10, - buffer: inflightsBuffer( - // ↓------------ - []uint64{0, 0, 0, 0, 0, 0, 1, 2, 3, 4}, - []uint64{0, 0, 0, 0, 0, 100, 101, 102, 103, 104}), - } - require.Equal(t, wantIn21, in2) - - for i := 5; i < 10; i++ { - in2.Add(uint64(i), uint64(100+i)) + // Can grow beyond size. + for i := 10; i < 15; i++ { + in.Add(uint64(i), uint64(100+i)) } - wantIn22 := &Inflights{ - start: 5, - count: 10, - bytes: 1045, + wantIn3 := &Inflights{ + bytes: 1605, size: 10, buffer: inflightsBuffer( - // -------------- ↓------------ - []uint64{5, 6, 7, 8, 9, 0, 1, 2, 3, 4}, - []uint64{105, 106, 107, 108, 109, 100, 101, 102, 103, 104}), + // ↓--------------------------- + []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14}, + []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114}), } - require.Equal(t, wantIn22, in2) + checkEquality(t, *wantIn3, *in) } func TestInflightFreeTo(t *testing.T) { - // no rotating case in := NewInflights(10, 0) for i := 0; i < 10; i++ { in.Add(uint64(i), uint64(100+i)) @@ -112,46 +94,39 @@ func TestInflightFreeTo(t *testing.T) { in.FreeLE(0) wantIn0 := &Inflights{ - start: 1, - count: 9, bytes: 945, size: 10, buffer: inflightsBuffer( - // ↓------------------------ - []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), + // ↓------------------------ + []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9}, + []uint64{101, 102, 103, 104, 105, 106, 107, 108, 109}), } - require.Equal(t, wantIn0, in) + checkEquality(t, *wantIn0, *in) in.FreeLE(4) wantIn := &Inflights{ - start: 5, - count: 5, bytes: 535, size: 10, buffer: inflightsBuffer( - // ↓------------ - []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), + // ↓------------ + []uint64{5, 6, 7, 8, 9}, + []uint64{105, 106, 107, 108, 109}), } - require.Equal(t, wantIn, in) + checkEquality(t, *wantIn, *in) in.FreeLE(8) wantIn2 := &Inflights{ - start: 9, - count: 1, bytes: 109, size: 10, buffer: inflightsBuffer( // ↓ - []uint64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - []uint64{100, 101, 102, 103, 104, 105, 106, 107, 108, 109}), + []uint64{9}, + []uint64{109}), } - require.Equal(t, wantIn2, in) + checkEquality(t, *wantIn2, *in) - // rotating case for i := 10; i < 15; i++ { in.Add(uint64(i), uint64(100+i)) } @@ -159,29 +134,21 @@ func TestInflightFreeTo(t *testing.T) { in.FreeLE(12) wantIn3 := &Inflights{ - start: 3, - count: 2, bytes: 227, size: 10, buffer: inflightsBuffer( - // ↓----- - []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, - []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), + // ↓----- + []uint64{13, 14}, + []uint64{113, 114}), } - require.Equal(t, wantIn3, in) + checkEquality(t, *wantIn3, *in) in.FreeLE(14) wantIn4 := &Inflights{ - start: 0, - count: 0, - size: 10, - buffer: inflightsBuffer( - // ↓ - []uint64{10, 11, 12, 13, 14, 5, 6, 7, 8, 9}, - []uint64{110, 111, 112, 113, 114, 105, 106, 107, 108, 109}), + size: 10, } - require.Equal(t, wantIn4, in) + checkEquality(t, *wantIn4, *in) } func TestInflightsFull(t *testing.T) { @@ -244,13 +211,13 @@ func TestInflightsReset(t *testing.T) { require.Equal(t, 0, in.Count()) } -func inflightsBuffer(indices []uint64, sizes []uint64) []inflight { +func inflightsBuffer(indices []uint64, sizes []uint64) ring.Buffer[inflight] { if len(indices) != len(sizes) { panic("len(indices) != len(sizes)") } - buffer := make([]inflight, 0, len(indices)) + var buffer ring.Buffer[inflight] for i, idx := range indices { - buffer = append(buffer, inflight{index: idx, bytes: sizes[i]}) + buffer.Push(inflight{index: idx, bytes: sizes[i]}) } return buffer } diff --git a/pkg/util/container/ring/BUILD.bazel b/pkg/util/container/ring/BUILD.bazel index c60e80d197e9..6fbdfff10df9 100644 --- a/pkg/util/container/ring/BUILD.bazel +++ b/pkg/util/container/ring/BUILD.bazel @@ -2,16 +2,31 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ring", - srcs = ["ring.go"], + srcs = [ + "buffer.go", + "ring.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/util/container/ring", visibility = ["//visibility:public"], + deps = [ + "//pkg/util/buildutil", + "@com_github_cockroachdb_errors//:errors", + ], ) go_test( name = "ring_test", srcs = [ + "buffer_test.go", "example_test.go", "ring_test.go", ], + data = glob(["testdata/**"]), embed = [":ring"], + deps = [ + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_cockroachdb_datadriven//:datadriven", + "@com_github_stretchr_testify//require", + ], ) diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer.go b/pkg/util/container/ring/buffer.go similarity index 89% rename from pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer.go rename to pkg/util/container/ring/buffer.go index b9dc5daa0f47..a79816148193 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer.go +++ b/pkg/util/container/ring/buffer.go @@ -3,20 +3,20 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package rac2 +package ring import ( "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/errors" ) -// CircularBuffer provides functionality akin to a []T, for cases that usually +// Buffer provides functionality akin to a []T, for cases that usually // push to the back and pop from the front, and would like to reduce // allocations. The assumption made here is that the capacity needed for // holding the live entries is somewhat stable. The circular buffer grows as // needed, and over time will shrink. Liveness of shrinking depends on new // entries being pushed. -type CircularBuffer[T any] struct { +type Buffer[T any] struct { // len(buf) == cap(buf). These are powers of 2 and >= minCap. buf []T // first is in [0, len(buf)). @@ -41,7 +41,7 @@ const minCap = 32 const shrinkCheckInterval = 3 // Push adds an entry to the end of the buffer. -func (cb *CircularBuffer[T]) Push(a T) { +func (cb *Buffer[T]) Push(a T) { needed := cb.len + 1 cap := len(cb.buf) if needed > cap { @@ -78,7 +78,7 @@ func (cb *CircularBuffer[T]) Push(a T) { // Pop removes the first num entries. // // REQUIRES: num <= cb.len. -func (cb *CircularBuffer[T]) Pop(num int) { +func (cb *Buffer[T]) Pop(num int) { if buildutil.CrdbTestBuild && num > cb.len { panic(errors.AssertionFailedf("num %d > cb.len %d", num, cb.len)) } @@ -96,7 +96,7 @@ func (cb *CircularBuffer[T]) Pop(num int) { // ShrinkToPrefix shrinks the buffer to retain the first num entries. // // REQUIRES: num <= cb.len. -func (cb *CircularBuffer[T]) ShrinkToPrefix(num int) { +func (cb *Buffer[T]) ShrinkToPrefix(num int) { if buildutil.CrdbTestBuild && num > cb.len { panic(errors.AssertionFailedf("num %d > cb.len %d", num, cb.len)) } @@ -106,7 +106,7 @@ func (cb *CircularBuffer[T]) ShrinkToPrefix(num int) { // At returns the entry at index. // // REQUIRES: index < cb.len. -func (cb *CircularBuffer[T]) At(index int) T { +func (cb *Buffer[T]) At(index int) T { if buildutil.CrdbTestBuild && index >= cb.len { panic(errors.AssertionFailedf("index %d >= cb.len %d", index, cb.len)) } @@ -117,7 +117,7 @@ func (cb *CircularBuffer[T]) At(index int) T { // SetLast overwrites the last entry. // // REQUIRES: Length() > 0. -func (cb *CircularBuffer[T]) SetLast(a T) { +func (cb *Buffer[T]) SetLast(a T) { if buildutil.CrdbTestBuild && cb.len == 0 { panic(errors.AssertionFailedf("buffer is empty")) } @@ -128,7 +128,7 @@ func (cb *CircularBuffer[T]) SetLast(a T) { // SetFirst overwrites the first entry. // // REQUIRES: Length() > 0. -func (cb *CircularBuffer[T]) SetFirst(a T) { +func (cb *Buffer[T]) SetFirst(a T) { if buildutil.CrdbTestBuild && cb.len == 0 { panic(errors.AssertionFailedf("buffer is empty")) } @@ -136,11 +136,17 @@ func (cb *CircularBuffer[T]) SetFirst(a T) { } // Length returns the current length. -func (cb *CircularBuffer[T]) Length() int { +func (cb *Buffer[T]) Length() int { return cb.len } -func (cb *CircularBuffer[T]) reallocate(size int) { +func (cb *Buffer[T]) Clone() Buffer[T] { + b := *cb + b.buf = append([]T(nil), cb.buf...) + return b +} + +func (cb *Buffer[T]) reallocate(size int) { buf := make([]T, size) capacity := len(cb.buf) // cb.buf is split into a prefix and suffix, where the prefix is diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer_test.go b/pkg/util/container/ring/buffer_test.go similarity index 87% rename from pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer_test.go rename to pkg/util/container/ring/buffer_test.go index a31f09b8f89d..1dd6f34751e0 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/circular_buffer_test.go +++ b/pkg/util/container/ring/buffer_test.go @@ -3,7 +3,7 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package rac2 +package ring import ( "fmt" @@ -13,15 +13,23 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" ) -func TestCircularBuffer(t *testing.T) { +func TestBuffer(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - cb := CircularBuffer[int]{} + cb := Buffer[int]{} cbString := func() string { var b strings.Builder + // Clone cb and trash the clone's state. It should not affect cb. + cbClone := cb.Clone() + require.Equal(t, cbClone, cb) + for i := 0; i < len(cbClone.buf); i++ { + cbClone.buf[i] = -1 + } + cbClone = Buffer[int]{} printStats := func() { fmt.Fprintf(&b, "first: %d len: %d cap: %d pushes: %d, max-len: %d\n", cb.first, cb.len, len(cb.buf), cb.pushesSinceCheck, cb.maxObservedLen) @@ -68,11 +76,11 @@ func TestCircularBuffer(t *testing.T) { printStats() return b.String() } - datadriven.RunTest(t, "testdata/circular_buffer", + datadriven.RunTest(t, "testdata/buffer", func(t *testing.T, d *datadriven.TestData) string { switch d.Cmd { case "init": - cb = CircularBuffer[int]{} + cb = Buffer[int]{} return "" case "push": diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/circular_buffer b/pkg/util/container/ring/testdata/buffer similarity index 100% rename from pkg/kv/kvserver/kvflowcontrol/rac2/testdata/circular_buffer rename to pkg/util/container/ring/testdata/buffer diff --git a/pkg/util/ring/ring_buffer.go b/pkg/util/ring/ring_buffer.go index 50e3a35e31be..60b12b27606c 100644 --- a/pkg/util/ring/ring_buffer.go +++ b/pkg/util/ring/ring_buffer.go @@ -10,8 +10,10 @@ package ring // The zero value is ready to use. See MakeBuffer() for initializing a Buffer // with pre-allocated space. // -// Note: it is backed by a slice (unlike container/ring which is backed by a -// linked list). +// Note: it is backed by a slice (unlike container/ring/ring_buffer.go which +// is backed by a linked list). There is also a container/ring/buffer.go, that +// is backed by a slice and can both grow and shrink and uses bit arithmetic. +// We should replace this implementation with that one. type Buffer[T any] struct { buffer []T head int // the index of the front of the buffer