Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

verifier: tolerate data loss #52

Merged
merged 10 commits into from
Mar 29, 2024
26 changes: 26 additions & 0 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Go

on:
push:
branches:
- main
pull_request: {}

jobs:
build:

runs-on: ubuntu-latest
strategy:
matrix:
go-version: [ 'stable' ]

steps:
- uses: actions/checkout@v4
- name: Setup Go ${{ matrix.go-version }}
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go-version }}
- name: Display Go version
run: go version
- name: Test & Build
run: make
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ kgo-repeater
kgo-verifier
valid_offsets*.json

.idea
.idea
.vscode
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@

all: build
all: test build

build: build-verifier build-repeater

test:
go test -v ./...

build-verifier:
go build -o kgo-verifier cmd/kgo-verifier/main.go

Expand Down
82 changes: 44 additions & 38 deletions cmd/kgo-verifier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ var (
seqConsumeCount = flag.Int("seq_read_msgs", -1, "Seq/group consumer: set max number of records to consume")
batchMaxBytes = flag.Int("batch_max_bytes", 1048576, "the maximum batch size to allow per-partition (must be less than Kafka's max.message.bytes, producing)")
cgReaders = flag.Int("consumer_group_readers", 0, "Number of parallel readers in the consumer group")
cgName = flag.String("consumer_group_name", "", "The name of the consumer group. Generated randomly if not set.")
linger = flag.Duration("linger", 0, "if non-zero, linger to use when producing")
maxBufferedRecords = flag.Uint("max-buffered-records", 1024, "Producer buffer size: the default of 1 is makes roughly one event per batch, useful for measurement. Set to something higher to make it easier to max out bandwidth.")
remote = flag.Bool("remote", false, "Remote control mode, driven by HTTP calls, for use in automated tests")
remotePort = flag.Uint("remote-port", 7884, "HTTP listen port for remote control/query")
loop = flag.Bool("loop", false, "For readers, run indefinitely until stopped via signal or HTTP call")
loop = flag.Bool("loop", false, "For readers, repeatedly consume from the beginning, looping to the beginning after hitting the end of the topic until stopped via signal")
continuous = flag.Bool("continuous", false, "For readers, wait for new messages to arrive after hitting the end of the topic until stopped via signal or HTTP call")
name = flag.String("client-name", "kgo", "Name of kafka client")
fakeTimestampMs = flag.Int64("fake-timestamp-ms", -1, "Producer: set artificial batch timestamps on an incrementing basis, starting from this number")
fakeTimestampStepMs = flag.Int64("fake-timestamp-step-ms", 1, "Producer: step size used to increment fake timestamp")
Expand All @@ -67,6 +69,8 @@ var (

compressionType = flag.String("compression-type", "", "One of none, gzip, snappy, lz4, zstd, or 'mixed' to pick a random codec for each producer")
compressiblePayload = flag.Bool("compressible-payload", false, "If true, use a highly compressible payload instead of the default random payload")

tolerateDataLoss = flag.Bool("tolerate-data-loss", false, "If true, tolerate data-loss events")
)

func makeWorkerConfig() worker.WorkerConfig {
Expand All @@ -84,6 +88,8 @@ func makeWorkerConfig() worker.WorkerConfig {
Transactions: *useTransactions,
CompressionType: *compressionType,
CompressiblePayload: *compressiblePayload,
TolerateDataLoss: *tolerateDataLoss,
Continuous: *continuous,
}

return c
Expand Down Expand Up @@ -179,7 +185,11 @@ func main() {

mux.HandleFunc("/shutdown", func(w http.ResponseWriter, r *http.Request) {
log.Info("Remote request /shutdown")
shutdownChan <- 1
select {
case shutdownChan <- 1:
default:
log.Warn("shutdown channel is full, skipping")
}
})

mux.HandleFunc("/last_pass", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -197,7 +207,11 @@ func main() {
log.Warn("unable to parse timeout query param, skipping printing stack trace logs")
}
}
lastPassChan <- 1
select {
case lastPassChan <- 1:
default:
log.Warn("last_pass channel is full, skipping")
}
})

mux.HandleFunc("/print_stack", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -212,6 +226,21 @@ func main() {
}
}()

if *loop && *continuous {
util.Die("Cannot use -loop and -continuous together")
}

ctx, cancel := context.WithCancel(context.Background())
loopState := util.NewLoopState(*loop)
go func() {
<-lastPassChan
if *continuous {
cancel()
} else {
loopState.RequestLastPass()
}
}()

if *pCount > 0 {
log.Info("Starting producer...")
pwc := verifier.NewProducerConfig(makeWorkerConfig(), "producer", nPartitions, *mSize, *pCount, *fakeTimestampMs, *fakeTimestampStepMs, (*produceRateLimitBps), *keySetCardinality, *msgsPerProducerId)
Expand All @@ -226,34 +255,23 @@ func main() {
waitErr := pw.Wait()
util.Chk(err, "Producer error: %v", waitErr)
log.Info("Finished producer.")
}

if *seqRead {
} else if *seqRead {
srw := verifier.NewSeqReadWorker(verifier.NewSeqReadConfig(
makeWorkerConfig(), "sequential", nPartitions, *seqConsumeCount,
(*consumeTputMb)*1024*1024,
))
workers = append(workers, &srw)

firstPass := true
lastPass := false
for firstPass || (!lastPass && *loop) {
for loopState.Next() {
log.Info("Starting sequential read pass")
firstPass = false
lastPass = len(lastPassChan) != 0
if lastPass {
log.Info("This will be the last pass")
}
waitErr := srw.Wait()
waitErr := srw.Wait(ctx)
if waitErr != nil {
// Proceed around the loop, to be tolerant of e.g. kafka client
// construct failures on unavailable cluster
log.Warnf("Error from sequeqntial read worker: %v", err)
}
}
}

if *cCount > 0 {
} else if *cCount > 0 {
var wg sync.WaitGroup
var randomWorkers []*verifier.RandomReadWorker
for i := 0; i < *parallelRead; i++ {
Expand All @@ -265,14 +283,7 @@ func main() {
workers = append(workers, &worker)
}

firstPass := true
lastPass := false
for firstPass || (!lastPass && *loop) {
lastPass = len(lastPassChan) != 0
if lastPass {
log.Info("This will be the last pass")
}
firstPass = false
for loopState.Next() {
for _, w := range randomWorkers {
wg.Add(1)
go func(worker *verifier.RandomReadWorker) {
Expand All @@ -288,25 +299,20 @@ func main() {
}
wg.Wait()
}
}
} else if *cgReaders > 0 {
if *loop && *cgName != "" {
util.Die("Cannot use -loop and -consumer_group_name together")
}

if *cgReaders > 0 {
grw := verifier.NewGroupReadWorker(
verifier.NewGroupReadConfig(
makeWorkerConfig(), "groupReader", nPartitions, *cgReaders,
makeWorkerConfig(), *cgName, nPartitions, *cgReaders,
*seqConsumeCount, (*consumeTputMb)*1024*1024))
workers = append(workers, &grw)

firstPass := true
lastPass := false
for firstPass || (!lastPass && *loop) {
for loopState.Next() {
log.Info("Starting group read pass")
lastPass = len(lastPassChan) != 0
if lastPass {
log.Info("This will be the last pass")
}
firstPass = false
waitErr := grw.Wait()
waitErr := grw.Wait(ctx)
util.Chk(waitErr, "Consumer error: %v", err)
}
}
Expand Down
56 changes: 53 additions & 3 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

package util

import (
"fmt"
"os"
"sync"

log "github.com/sirupsen/logrus"
)


func Die(msg string, args ...interface{}) {
formatted := fmt.Sprintf(msg, args...)
log.Error(formatted)
Expand All @@ -19,4 +18,55 @@ func Chk(err error, msg string, args ...interface{}) {
if err != nil {
Die(msg, args...)
}
}
}

// loopState is a helper struct holding common state for managing consumer loops.
type loopState struct {
mu sync.RWMutex

// `loop` is set to false when looping should stop. If it is false initially
// then `Next()` must return true at least once.
// To achieve that, we set lastPass to true to indicate that Next() returned
// true at least once when `loop` was false and on the next run we know that
// we are done and need to fuse the state.
loop bool
lastPass bool

// fused is set to true after Next returns false. It is used to enforce
// the invariant that Next must not be called after it returned false
// previously.
fused bool
}

// NewLoopState creates a state object for managing a consumer loops.
func NewLoopState(loop bool) *loopState {
return &loopState{loop: loop}
}

// RequestLastPass requests for the loop to run one more time before exiting.
func (ls *loopState) RequestLastPass() {
ls.mu.Lock()
defer ls.mu.Unlock()

ls.loop = false
}

// Next returns true if current loop iteration should run.
func (ls *loopState) Next() bool {
ls.mu.Lock()
defer ls.mu.Unlock()

if ls.fused {
panic("invariant: Next must not be called after it returned false previously")
}

if ls.lastPass {
ls.fused = true
return false
} else if !ls.loop {
log.Info("This is the last pass.")
ls.lastPass = true
}

return true
}
62 changes: 62 additions & 0 deletions pkg/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package util_test

import (
"testing"

"github.com/redpanda-data/kgo-verifier/pkg/util"
)

func TestLoopStateDefault(t *testing.T) {
s := util.NewLoopState(false)
if !s.Next() {
t.Error("Next must returns true on first pass")
}
if s.Next() {
t.Error("Next must return false after first pass")
}
}

func TestLoopStateDoLoop(t *testing.T) {
s := util.NewLoopState(true)
for i := 0; i < 3; i++ {
if !s.Next() {
t.Error("Next must return true as long as Loop is set to true")
}
}
s.RequestLastPass()
if !s.Next() {
t.Error("Next must return true after RequestLastPass")
}
if s.Next() {
t.Error("Next must return false after RequestLastPass")
}

func() {
defer func() {
if r := recover(); r == nil {
t.Error("Next must panic after RequestLastPass and third Next call")
}
}()
s.Next()
}()
}

func TestLoopStateDoLoopStopImmediately(t *testing.T) {
s := util.NewLoopState(true)
s.RequestLastPass()
if !s.Next() {
t.Error("Next must return true after RequestLastPass")
}
if s.Next() {
t.Error("Next must return false after RequestLastPass")
}

func() {
defer func() {
if r := recover(); r == nil {
t.Error("Next must panic after RequestLastPass and third Next call")
}
}()
s.Next()
}()
}
4 changes: 2 additions & 2 deletions pkg/worker/repeater/repeater_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (v *Worker) ConsumeRecord(r *kgo.Record) {

v.globalStats.E2e_latency.Update(e2e_latency)

log.Debugf("Consume %s token %06d, total latency %s", v.config.workerCfg.Name, token, e2e_latency)
log.Debugf("Consume %s token %06d, total latency %v", v.config.workerCfg.Name, token, e2e_latency)
v.pending <- int64(token)
}

Expand Down Expand Up @@ -495,7 +495,7 @@ loop:
})

}
log.Debug("Consume %s dropping out", v.config.workerCfg.Name)
log.Debugf("Consume %s dropping out", v.config.workerCfg.Name)

sync_ctx, _ := context.WithTimeout(context.Background(), 500*time.Millisecond)
v.client.CommitUncommittedOffsets(sync_ctx)
Expand Down
Loading
Loading