Skip to content

Commit

Permalink
Revert "rangefeed: remove LegacyProcessor"
Browse files Browse the repository at this point in the history
This reverts commit a2f915a.
  • Loading branch information
erikgrinaker committed Mar 27, 2024
1 parent 40f4dc3 commit 8eb7ca9
Show file tree
Hide file tree
Showing 7 changed files with 1,734 additions and 951 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"scheduled_processor.go",
"scheduler.go",
"task.go",
"testutil.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed",
visibility = ["//visibility:public"],
Expand Down
24 changes: 14 additions & 10 deletions pkg/kv/kvserver/rangefeed/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

type benchmarkRangefeedOpts struct {
procType procType
opType opType
numRegistrations int
budget int64
Expand All @@ -45,16 +46,19 @@ const (
// BenchmarkRangefeed benchmarks the processor and registrations, by submitting
// a set of events and waiting until they are all emitted.
func BenchmarkRangefeed(b *testing.B) {
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
for _, numRegistrations := range []int{1, 10, 100} {
name := fmt.Sprintf("procType=scheduler/opType=%s/numRegs=%d", opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
for _, procType := range testTypes {
for _, opType := range []opType{writeOpType, commitOpType, closedTSOpType} {
for _, numRegistrations := range []int{1, 10, 100} {
name := fmt.Sprintf("procType=%s/opType=%s/numRegs=%d", procType, opType, numRegistrations)
b.Run(name, func(b *testing.B) {
runBenchmarkRangefeed(b, benchmarkRangefeedOpts{
procType: procType,
opType: opType,
numRegistrations: numRegistrations,
budget: math.MaxInt64,
})
})
})
}
}
}
}
Expand Down Expand Up @@ -91,7 +95,7 @@ func runBenchmarkRangefeed(b *testing.B, opts benchmarkRangefeedOpts) {
span := roachpb.RSpan{Key: roachpb.RKey("a"), EndKey: roachpb.RKey("z")}

p, h, stopper := newTestProcessor(b, withSpan(span), withBudget(budget), withChanCap(b.N),
withEventTimeout(time.Hour))
withEventTimeout(time.Hour), withProcType(opts.procType))
defer stopper.Stop(ctx)

// Add registrations.
Expand Down
Loading

0 comments on commit 8eb7ca9

Please sign in to comment.