From a70e1e6521a40c058a254e0297c1e053b05081bf Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 6 Dec 2024 00:36:47 -0300 Subject: [PATCH 1/2] kgo-repeater: listen for SIGTERM too We listen for SIGINT and do a graceful shutdown, but we should do the same thing on SIGTERM too, as that's what systemd will send a service, and we run kgo as a service (and anyway it just makes sense to do graceful shutdown on SIGTERM, that's kind of what it's there for). The main reason to do graceful shutdown is to commit any cached offsets and (especially) to leave the consumer group explicitly, otherwise the CG will stay in the Stable state until heartbeat timeout, which causes problems for re-joining members. --- cmd/kgo-repeater/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/kgo-repeater/main.go b/cmd/kgo-repeater/main.go index d3e750b..47f632f 100644 --- a/cmd/kgo-repeater/main.go +++ b/cmd/kgo-repeater/main.go @@ -13,6 +13,7 @@ import ( "os/signal" "runtime/pprof" "strings" + "syscall" "time" log "github.com/sirupsen/logrus" @@ -127,7 +128,7 @@ func main() { } c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) var verifiers []*repeater.Worker From 2c38d85d3f70989c519bfb5264988a4f6fbf36dc Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Fri, 6 Dec 2024 00:46:43 -0300 Subject: [PATCH 2/2] kgo-repeater: call Shutdown() The verifiers expose a Shutdown() method but we never call it. This means that the clients are never closed, so consumers never explicitly leave their CG, which causes problems. Fixes CORE-8470. --- cmd/kgo-repeater/main.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cmd/kgo-repeater/main.go b/cmd/kgo-repeater/main.go index 47f632f..e7e1b07 100644 --- a/cmd/kgo-repeater/main.go +++ b/cmd/kgo-repeater/main.go @@ -172,6 +172,12 @@ func main() { log.Infof("Waiting for worker %d complete", i) log.Infof("Verifier %d result: %s", i, result.String()) } + + for i, v := range verifiers { + log.Infof("Shutting down worker %d...", i) + (*v).Shutdown() + log.Infof("Worker %d shutdown complete", i) + } } // Even if we're not in remote mode, start the HTTP listener so