From fbc91fb00695555fdc96485cd814985fc0383cce Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Fri, 4 Oct 2024 10:00:25 +1000 Subject: [PATCH] fix: issue with Kube rolling deployments fixes: #2897 --- backend/controller/controller.go | 5 ++++- .../scaling/k8sscaling/deployment_provisioner.go | 9 +++++---- .../controller/scaling/k8sscaling/k8s_scaling.go | 3 ++- .../scaling/kube_scaling_integration_test.go | 13 ++++++++++--- .../base/ftl-controller/deployment-config.yaml | 2 ++ deployment/base/ftl-controller/ftl-controller.yml | 2 ++ internal/log/plain.go | 2 +- 7 files changed, 26 insertions(+), 10 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 4c22aefb76..1475f6181d 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -984,6 +984,7 @@ func (s *Service) callWithRequest( parentKey optional.Option[model.RequestKey], sourceAddress string, ) (*connect.Response[ftlv1.CallResponse], error) { + logger := log.FromContext(ctx) start := time.Now() ctx, span := observability.Calls.BeginSpan(ctx, req.Msg.Verb) defer span.End() @@ -1002,6 +1003,7 @@ func (s *Service) callWithRequest( verbRef := schema.RefFromProto(req.Msg.Verb) verb := &schema.Verb{} + logger = logger.Module(verbRef.Module) if err := sch.ResolveToType(verbRef, verb); err != nil { if errors.Is(err, schema.ErrNotFound) { @@ -1084,6 +1086,7 @@ func (s *Service) callWithRequest( } else { callResponse = either.RightOf[*ftlv1.CallResponse](err) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) + logger.Errorf(err, "Call failed to verb %s for deployment %s", verbRef.String(), route.Deployment) } s.timeline.EnqueueEvent(ctx, &timeline.Call{ DeploymentKey: route.Deployment, @@ -1817,7 +1820,7 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e // as the deployments are in order // We have a new route ready to go, so we can just set the old one to 0 replicas // Do this in a TX so it doesn't happen until the route table is updated - deploymentLogger.Debugf("Setting %s to zero replicas", v.Key.String()) + deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment) err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0) if err != nil { deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String()) diff --git a/backend/controller/scaling/k8sscaling/deployment_provisioner.go b/backend/controller/scaling/k8sscaling/deployment_provisioner.go index 3eb8c0af02..463862525c 100644 --- a/backend/controller/scaling/k8sscaling/deployment_provisioner.go +++ b/backend/controller/scaling/k8sscaling/deployment_provisioner.go @@ -23,6 +23,7 @@ import ( "time" "github.com/alecthomas/types/optional" + "github.com/puzpuzpuz/xsync/v3" istiosecmodel "istio.io/api/security/v1" "istio.io/api/type/v1beta1" istiosec "istio.io/client-go/pkg/apis/security/v1" @@ -53,7 +54,7 @@ type DeploymentProvisioner struct { MyDeploymentName string Namespace string // Map of known deployments - KnownDeployments map[string]bool + KnownDeployments *xsync.Map FTLEndpoint string IstioSecurity optional.Option[istioclient.Clientset] } @@ -130,7 +131,7 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl // Note that a change is now currently usually and add and a delete // As it should really be called a module changed, not a deployment changed // This will need to be fixed as part of the support for rolling deployments - r.KnownDeployments[msg.DeploymentKey] = true + r.KnownDeployments.Store(msg.DeploymentKey, true) if deploymentExists { logger.Debugf("Updating deployment %s", msg.DeploymentKey) return r.handleExistingDeployment(ctx, deployment, msg.Schema) @@ -138,13 +139,13 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl return r.handleNewDeployment(ctx, msg.Schema, msg.DeploymentKey) } case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED: - delete(r.KnownDeployments, msg.DeploymentKey) if deploymentExists { go func() { // Nasty hack, we want all the controllers to have updated their route tables before we kill the runner // so we add a slight delay here time.Sleep(time.Second * 10) + r.KnownDeployments.Delete(msg.DeploymentKey) logger.Debugf("Deleting service %s", msg.ModuleName) err = r.Client.CoreV1().Services(r.Namespace).Delete(ctx, msg.DeploymentKey, v1.DeleteOptions{}) if err != nil { @@ -418,7 +419,7 @@ func (r *DeploymentProvisioner) deleteMissingDeployments(ctx context.Context) { } for _, service := range list.Items { - if !r.KnownDeployments[service.Name] { + if _, ok := r.KnownDeployments.Load(service.Name); !ok { // With owner references the deployments should be deleted automatically // However this is in transition so delete both logger.Debugf("Deleting service %s as it is not a known module", service.Name) diff --git a/backend/controller/scaling/k8sscaling/k8s_scaling.go b/backend/controller/scaling/k8sscaling/k8s_scaling.go index 71cea0416e..1e0547a19f 100644 --- a/backend/controller/scaling/k8sscaling/k8s_scaling.go +++ b/backend/controller/scaling/k8sscaling/k8s_scaling.go @@ -7,6 +7,7 @@ import ( "os" "github.com/alecthomas/types/optional" + "github.com/puzpuzpuz/xsync/v3" istioclient "istio.io/client-go/pkg/clientset/versioned" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -60,7 +61,7 @@ func (k k8sScaling) Start(ctx context.Context, controller url.URL, leaser leases deploymentReconciler := &DeploymentProvisioner{ Client: clientset, Namespace: namespace, - KnownDeployments: map[string]bool{}, + KnownDeployments: xsync.NewMap(), FTLEndpoint: controller.String(), IstioSecurity: optional.Ptr(sec), } diff --git a/backend/controller/scaling/kube_scaling_integration_test.go b/backend/controller/scaling/kube_scaling_integration_test.go index fdf904e10a..f225caca03 100644 --- a/backend/controller/scaling/kube_scaling_integration_test.go +++ b/backend/controller/scaling/kube_scaling_integration_test.go @@ -49,9 +49,6 @@ func TestKubeScaling(t *testing.T) { // Istio should prevent this assert.Equal(t, strconv.FormatBool(false), response) }), - in.EditFile("echo", func(content []byte) []byte { - return []byte(strings.ReplaceAll(string(content), "Hello", "Bye")) - }, "echo.go"), func(t testing.TB, ic in.TestContext) { // Hit the verb constantly to test rolling updates. go func() { @@ -71,10 +68,20 @@ func TestKubeScaling(t *testing.T) { } }() }, + in.EditFile("echo", func(content []byte) []byte { + return []byte(strings.ReplaceAll(string(content), "Hello", "Bye")) + }, "echo.go"), in.Deploy("echo"), in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { assert.Equal(t, "Bye, Bob!!!", response) }), + in.EditFile("echo", func(content []byte) []byte { + return []byte(strings.ReplaceAll(string(content), "Bye", "Bonjour")) + }, "echo.go"), + in.Deploy("echo"), + in.Call("echo", "echo", "Bob", func(t testing.TB, response string) { + assert.Equal(t, "Bonjour, Bob!!!", response) + }), func(t testing.TB, ic in.TestContext) { done.Store(true) routineStopped.Wait() diff --git a/deployment/base/ftl-controller/deployment-config.yaml b/deployment/base/ftl-controller/deployment-config.yaml index e9d7959d2d..0379f7b6f8 100644 --- a/deployment/base/ftl-controller/deployment-config.yaml +++ b/deployment/base/ftl-controller/deployment-config.yaml @@ -49,6 +49,8 @@ data: value: "http://$(MY_POD_IP):8893" - name: FTL_LANGUAGE value: "go,kotlin,java" + - name: LOG_TIMESTAMPS + value: "true" ports: - containerPort: 8893 readinessProbe: diff --git a/deployment/base/ftl-controller/ftl-controller.yml b/deployment/base/ftl-controller/ftl-controller.yml index 95079bbbad..7eead27142 100644 --- a/deployment/base/ftl-controller/ftl-controller.yml +++ b/deployment/base/ftl-controller/ftl-controller.yml @@ -43,6 +43,8 @@ spec: value: "test" - name: AWS_ENDPOINT_URL value: "http://localstack:4566" + - name: LOG_TIMESTAMPS + value: "true" ports: - containerPort: 8891 - containerPort: 8892 diff --git a/internal/log/plain.go b/internal/log/plain.go index 55ad441871..392ec4c002 100644 --- a/internal/log/plain.go +++ b/internal/log/plain.go @@ -92,7 +92,7 @@ func (t *plainSink) Log(entry Entry) error { // Add timestamp if required if t.logTime { - prefix += entry.Time.Format(time.TimeOnly) + " " + prefix += entry.Time.Format(time.StampMilli) + " " } // Add scope if required