diff --git a/backend/controller/scaling/localscaling/local_scaling.go b/backend/controller/scaling/localscaling/local_scaling.go index 3ace46aa70..174d17c164 100644 --- a/backend/controller/scaling/localscaling/local_scaling.go +++ b/backend/controller/scaling/localscaling/local_scaling.go @@ -158,6 +158,13 @@ func (l *localScaling) reconcileRunners(ctx context.Context, deploymentRunners * } func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, info *deploymentInfo) error { + select { + case <-ctx.Done(): + // In some cases this gets called with an expired context, generally after the lease is released + // We don't want to start a runner in that case + return nil + default: + } controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)] logger := log.FromContext(ctx) @@ -210,12 +217,14 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey string, in go func() { logger.Debugf("Starting runner: %s", config.Key) err := runner.Start(runnerCtx, config) + l.lock.Lock() + defer l.lock.Unlock() if err != nil && !errors.Is(err, context.Canceled) { logger.Errorf(err, "Runner failed: %s", err) + } else { + // Don't count context.Canceled as an a restart error + info.exits++ } - l.lock.Lock() - defer l.lock.Unlock() - info.exits++ if info.exits >= maxExits { logger.Errorf(fmt.Errorf("too many restarts"), "Runner failed too many times, not restarting") } diff --git a/backend/controller/scaling/scaling.go b/backend/controller/scaling/scaling.go index 31e10f6ff7..b565018a58 100644 --- a/backend/controller/scaling/scaling.go +++ b/backend/controller/scaling/scaling.go @@ -28,13 +28,6 @@ func BeginGrpcScaling(ctx context.Context, url url.URL, leaser leases.Leaser, ha // Grab a lease to take control of runner scaling lease, leaseContext, err := leaser.AcquireLease(ctx, leases.SystemKey("ftl-scaling", "runner-creation"), leaseTimeout, optional.None[any]()) if err == nil { - defer func(lease leases.Lease) { - err := lease.Release() - if err != nil { - logger := log.FromContext(ctx) - logger.Errorf(err, "Failed to release lease") - } - }(lease) // If we get it then we take over runner scaling runGrpcScaling(leaseContext, url, handler) } else if !errors.Is(err, leases.ErrConflict) { @@ -43,8 +36,15 @@ func BeginGrpcScaling(ctx context.Context, url url.URL, leaser leases.Leaser, ha } select { case <-ctx.Done(): + if lease != nil { + err := lease.Release() + if err != nil { + logger := log.FromContext(ctx) + logger.Errorf(err, "Failed to release lease") + } + } return - case <-time.After(leaseTimeout): + case <-leaseContext.Done(): } } } diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index b6643b2152..f1af32ffa5 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -280,12 +280,6 @@ func RetryStreamingServerStream[Req, Resp any]( for { stream, err := rpc(ctx, connect.NewRequest(req)) if err == nil { - defer func(stream *connect.ServerStreamForClient[Resp]) { - err := stream.Close() - if err != nil { - logger.Debugf("Failed to close stream: %s", err) - } - }(stream) for { if stream.Receive() { resp := stream.Msg() @@ -300,6 +294,10 @@ func RetryStreamingServerStream[Req, Resp any]( } select { case <-ctx.Done(): + err := stream.Close() + if err != nil { + logger.Debugf("Failed to close stream: %s", err) + } return default: } @@ -312,6 +310,10 @@ func RetryStreamingServerStream[Req, Resp any]( break } } + err := stream.Close() + if err != nil { + logger.Debugf("Failed to close stream: %s", err) + } } errored = true