Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix deadlock during shutdown which prevented leader election cleanup #1688

Merged
merged 2 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions internal/concierge/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,39 +107,45 @@ func (c completedConfig) New() (*PinnipedServer, error) {
return nil, fmt.Errorf("could not install API groups: %w", err)
}

shutdown := &sync.WaitGroup{}
controllersShutdownWaitGroup := &sync.WaitGroup{}
controllersCtx, cancelControllerCtx := context.WithCancel(context.Background())

s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
func(postStartContext genericapiserver.PostStartHookContext) error {
plog.Debug("start-controllers post start hook starting")
defer plog.Debug("start-controllers post start hook completed")

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()

<-postStartContext.StopCh
Copy link
Member Author

@cfryanr cfryanr Sep 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where the deadlock was happening. This StopCh channel is not going to be closed until our pre-shutdown hook finishes running. But our pre-shutdown hook calls shutdown.Wait() which is effectively waiting for this goroutine to end (because this goroutine cancels the context which allows the runControllers() call to stop blocking, which in turn ends the WaitGroup that the pre-shutdown hook is waiting for).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤯

}()

runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx)
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(controllersCtx)
if err != nil {
return fmt.Errorf("cannot create run controller func: %w", err)
}

shutdown.Add(1)
controllersShutdownWaitGroup.Add(1)
go func() {
defer shutdown.Done()
// When this goroutine ends, then also end the WaitGroup, allowing anyone who called Wait() to proceed.
defer controllersShutdownWaitGroup.Done()

runControllers(ctx)
// Start the controllers and block until their context is cancelled and they have shut down.
cfryanr marked this conversation as resolved.
Show resolved Hide resolved
runControllers(controllersCtx)
plog.Debug("start-controllers post start hook's background goroutine saw runControllers() finish")
}()

return nil
},
)

s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
func() error {
plog.Debug("stop-controllers pre shutdown hook starting")
defer plog.Debug("stop-controllers pre shutdown hook completed")

shutdown.Wait()
// The generic api server is telling us that it wants to shut down, so tell our controllers that we
// want them to shut down by cancelling their context.
cancelControllerCtx()

// Now wait for the controllers to finish shutting down. By blocking here, we prevent the generic api server's
// graceful shutdown process from continuing until we are finished shutting down our own controllers.
controllersShutdownWaitGroup.Wait()

return nil
},
Expand Down
13 changes: 12 additions & 1 deletion internal/concierge/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func addCommandlineFlagsToCommand(cmd *cobra.Command, app *App) {
}

// Boot the aggregated API server, which will in turn boot the controllers.
// In practice, the ctx passed in should be one which will be cancelled when the process receives SIGTERM or SIGINT.
func (a *App) runServer(ctx context.Context) error {
// Read the server config file.
cfg, err := concierge.FromPath(ctx, a.configPath)
Expand Down Expand Up @@ -186,7 +187,9 @@ func (a *App) runServer(ctx context.Context) error {
return fmt.Errorf("could not create aggregated API server: %w", err)
}

// Run the server. Its post-start hook will start the controllers.
// Run the server. Its post-start hook will start the controllers. Its pre shutdown hook will be called when ctx is
// cancelled, and that hook should graceful stop the controllers and give up the leader election lease. See the
// code for these hooks in internal/concierge/apiserver.go.
return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}

Expand Down Expand Up @@ -276,8 +279,16 @@ func main() error {
"time-since-build", timeSinceCompile,
)

// This context will be cancelled upon the first SIGTERM or SIGINT, and will os.Exit() to kill the process
// upon the second SIGTERM or SIGINT.
ctx := genericapiserver.SetupSignalContext()

// Just for debugging purposes, log when the first signal is received.
go func() {
<-ctx.Done() // wait for the Done channel to be closed, indicating that ctx was cancelled by the signal handler
plog.Debug("concierge shutdown initiated due to process receiving SIGTERM or SIGINT")
}()

return New(ctx, os.Args[1:], os.Stdout, os.Stderr).Run()
}

Expand Down
5 changes: 4 additions & 1 deletion internal/controllerlib/controller.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020 the Pinniped contributors. All Rights Reserved.
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package controllerlib
Expand Down Expand Up @@ -102,6 +102,7 @@ func (c *controller) Run(ctx context.Context, workers int) {
workerContext, workerContextCancel := context.WithCancel(context.Background())

defer func() {
plog.Debug("starting to shut down controller workers", "controller", c.Name(), "workers", workers)
c.queue.ShutDown() // shutdown the controller queue first
workerContextCancel() // cancel the worker context, which tell workers to initiate shutdown

Expand All @@ -126,7 +127,9 @@ func (c *controller) Run(ctx context.Context, workers int) {
}()
}

plog.Debug("controller started", "controller", c.Name(), "workers", workers)
<-ctx.Done() // wait for controller context to be cancelled
plog.Debug("controller context cancelled, next will terminate workers", "controller", c.Name(), "workers", workers)
}

func (c *controller) invokeAllRunOpts() {
Expand Down
1 change: 1 addition & 0 deletions internal/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func New(podInfo *downward.PodInfo, deployment *appsv1.Deployment, opts ...kubec

go func() {
controllers(ctx) // run the controllers with the global context, this blocks until the context is canceled
plog.Debug("leader election saw controllers have stopped")

if isLeader.stop() { // remove our in-memory leader status before we release the lock
plog.Debug("leader lost", "identity", identity, "reason", "controller stop")
Expand Down
32 changes: 19 additions & 13 deletions internal/supervisor/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,39 +113,45 @@ func (c completedConfig) New() (*PinnipedServer, error) {
return nil, fmt.Errorf("could not install API groups: %w", err)
}

shutdown := &sync.WaitGroup{}
controllersShutdownWaitGroup := &sync.WaitGroup{}
controllersCtx, cancelControllerCtx := context.WithCancel(context.Background())

s.GenericAPIServer.AddPostStartHookOrDie("start-controllers",
func(postStartContext genericapiserver.PostStartHookContext) error {
plog.Debug("start-controllers post start hook starting")
defer plog.Debug("start-controllers post start hook completed")

ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()

<-postStartContext.StopCh
}()

runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(ctx)
runControllers, err := c.ExtraConfig.BuildControllersPostStartHook(controllersCtx)
if err != nil {
return fmt.Errorf("cannot create run controller func: %w", err)
}

shutdown.Add(1)
controllersShutdownWaitGroup.Add(1)
go func() {
defer shutdown.Done()
// When this goroutine ends, then also end the WaitGroup, allowing anyone who called Wait() to proceed.
defer controllersShutdownWaitGroup.Done()

runControllers(ctx)
// Start the controllers and block until their context is cancelled and they have shut down.
runControllers(controllersCtx)
plog.Debug("start-controllers post start hook's background goroutine saw runControllers() finish")
}()

return nil
},
)

s.GenericAPIServer.AddPreShutdownHookOrDie("stop-controllers",
func() error {
plog.Debug("stop-controllers pre shutdown hook starting")
defer plog.Debug("stop-controllers pre shutdown hook completed")

shutdown.Wait()
// The generic api server is telling us that it wants to shut down, so tell our controllers that we
// want them to shut down by cancelling their context.
cancelControllerCtx()

// Now wait for the controllers to finish shutting down. By blocking here, we prevent the generic api server's
// graceful shutdown process from continuing until we are finished shutting down our own controllers.
controllersShutdownWaitGroup.Wait()

return nil
},
Expand Down
10 changes: 7 additions & 3 deletions internal/supervisor/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,10 @@ func prepareControllers(
return controllerinit.Prepare(controllerManager.Start, leaderElector, kubeInformers, pinnipedInformers)
}

//nolint:funlen
func runSupervisor(ctx context.Context, podInfo *downward.PodInfo, cfg *supervisor.Config) error {
// Boot the aggregated API server, which will in turn boot the controllers. Also open the appropriate network ports
// and start serving the health endpoint and the endpoints of the configured FederationDomains.
// In practice, the ctx passed in should be one which will be cancelled when the process receives SIGTERM or SIGINT.
func runSupervisor(ctx context.Context, podInfo *downward.PodInfo, cfg *supervisor.Config) error { //nolint:funlen
serverInstallationNamespace := podInfo.Namespace
clientSecretSupervisorGroupData := groupsuffix.SupervisorAggregatedGroups(*cfg.APIGroupSuffix)

Expand Down Expand Up @@ -575,7 +577,9 @@ func runSupervisor(ctx context.Context, podInfo *downward.PodInfo, cfg *supervis
plog.Debug("supervisor started")
defer plog.Debug("supervisor exiting")

// Run the server. Its post-start hook will start the controllers.
// Run the server. Its post-start hook will start the controllers. Its pre shutdown hook will be called when ctx is
// cancelled, and that hook should graceful stop the controllers and give up the leader election lease. See the
// code for these hooks in internal/supervisor/apiserver.go.
err = server.GenericAPIServer.PrepareRun().Run(ctx.Done())
if err != nil {
return err
Expand Down
12 changes: 6 additions & 6 deletions test/integration/main_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2020-2021 the Pinniped contributors. All Rights Reserved.
// Copyright 2020-2023 the Pinniped contributors. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

package integration
Expand Down Expand Up @@ -65,8 +65,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
serialTest := testing.InternalTest{
Name: "TestIntegrationSerial",
F: func(t *testing.T) {
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket

for _, test := range serialTests {
test := test
Expand All @@ -80,8 +80,8 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
parallelTest := testing.InternalTest{
Name: "TestIntegrationParallel",
F: func(t *testing.T) {
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
t.Parallel() // outer test always runs in parallel for this bucket

for _, test := range parallelTests {
test := test
Expand All @@ -97,7 +97,7 @@ func splitIntegrationTestsIntoBuckets(m *testing.M) {
disruptiveTest := testing.InternalTest{
Name: "TestIntegrationDisruptive",
F: func(t *testing.T) {
_ = testlib.IntegrationEnv(t) // make sure these tests do not run during unit tests
testlib.SkipUnlessIntegration(t) // make sure these tests do not run during unit tests
// outer test never runs in parallel for this bucket

for _, test := range disruptiveTests {
Expand Down
Loading