Skip to content

Commit

Permalink
fix: issue with Kube rolling deployments
Browse files Browse the repository at this point in the history
fixes: #2897
  • Loading branch information
stuartwdouglas committed Oct 4, 2024
1 parent 4f2606e commit fbc91fb
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 10 deletions.
5 changes: 4 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,7 @@ func (s *Service) callWithRequest(
parentKey optional.Option[model.RequestKey],
sourceAddress string,
) (*connect.Response[ftlv1.CallResponse], error) {
logger := log.FromContext(ctx)
start := time.Now()
ctx, span := observability.Calls.BeginSpan(ctx, req.Msg.Verb)
defer span.End()
Expand All @@ -1002,6 +1003,7 @@ func (s *Service) callWithRequest(

verbRef := schema.RefFromProto(req.Msg.Verb)
verb := &schema.Verb{}
logger = logger.Module(verbRef.Module)

if err := sch.ResolveToType(verbRef, verb); err != nil {
if errors.Is(err, schema.ErrNotFound) {
Expand Down Expand Up @@ -1084,6 +1086,7 @@ func (s *Service) callWithRequest(
} else {
callResponse = either.RightOf[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
logger.Errorf(err, "Call failed to verb %s for deployment %s", verbRef.String(), route.Deployment)
}
s.timeline.EnqueueEvent(ctx, &timeline.Call{
DeploymentKey: route.Deployment,
Expand Down Expand Up @@ -1817,7 +1820,7 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
// as the deployments are in order
// We have a new route ready to go, so we can just set the old one to 0 replicas
// Do this in a TX so it doesn't happen until the route table is updated
deploymentLogger.Debugf("Setting %s to zero replicas", v.Key.String())
deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment)
err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0)
if err != nil {
deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/alecthomas/types/optional"
"github.com/puzpuzpuz/xsync/v3"
istiosecmodel "istio.io/api/security/v1"
"istio.io/api/type/v1beta1"
istiosec "istio.io/client-go/pkg/apis/security/v1"
Expand Down Expand Up @@ -53,7 +54,7 @@ type DeploymentProvisioner struct {
MyDeploymentName string
Namespace string
// Map of known deployments
KnownDeployments map[string]bool
KnownDeployments *xsync.Map
FTLEndpoint string
IstioSecurity optional.Option[istioclient.Clientset]
}
Expand Down Expand Up @@ -130,21 +131,21 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
// Note that a change is now currently usually and add and a delete
// As it should really be called a module changed, not a deployment changed
// This will need to be fixed as part of the support for rolling deployments
r.KnownDeployments[msg.DeploymentKey] = true
r.KnownDeployments.Store(msg.DeploymentKey, true)
if deploymentExists {
logger.Debugf("Updating deployment %s", msg.DeploymentKey)
return r.handleExistingDeployment(ctx, deployment, msg.Schema)
} else {
return r.handleNewDeployment(ctx, msg.Schema, msg.DeploymentKey)
}
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
delete(r.KnownDeployments, msg.DeploymentKey)
if deploymentExists {
go func() {

// Nasty hack, we want all the controllers to have updated their route tables before we kill the runner
// so we add a slight delay here
time.Sleep(time.Second * 10)
r.KnownDeployments.Delete(msg.DeploymentKey)
logger.Debugf("Deleting service %s", msg.ModuleName)
err = r.Client.CoreV1().Services(r.Namespace).Delete(ctx, msg.DeploymentKey, v1.DeleteOptions{})
if err != nil {
Expand Down Expand Up @@ -418,7 +419,7 @@ func (r *DeploymentProvisioner) deleteMissingDeployments(ctx context.Context) {
}

for _, service := range list.Items {
if !r.KnownDeployments[service.Name] {
if _, ok := r.KnownDeployments.Load(service.Name); !ok {
// With owner references the deployments should be deleted automatically
// However this is in transition so delete both
logger.Debugf("Deleting service %s as it is not a known module", service.Name)
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"

"github.com/alecthomas/types/optional"
"github.com/puzpuzpuz/xsync/v3"
istioclient "istio.io/client-go/pkg/clientset/versioned"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -60,7 +61,7 @@ func (k k8sScaling) Start(ctx context.Context, controller url.URL, leaser leases
deploymentReconciler := &DeploymentProvisioner{
Client: clientset,
Namespace: namespace,
KnownDeployments: map[string]bool{},
KnownDeployments: xsync.NewMap(),
FTLEndpoint: controller.String(),
IstioSecurity: optional.Ptr(sec),
}
Expand Down
13 changes: 10 additions & 3 deletions backend/controller/scaling/kube_scaling_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ func TestKubeScaling(t *testing.T) {
// Istio should prevent this
assert.Equal(t, strconv.FormatBool(false), response)
}),
in.EditFile("echo", func(content []byte) []byte {
return []byte(strings.ReplaceAll(string(content), "Hello", "Bye"))
}, "echo.go"),
func(t testing.TB, ic in.TestContext) {
// Hit the verb constantly to test rolling updates.
go func() {
Expand All @@ -71,10 +68,20 @@ func TestKubeScaling(t *testing.T) {
}
}()
},
in.EditFile("echo", func(content []byte) []byte {
return []byte(strings.ReplaceAll(string(content), "Hello", "Bye"))
}, "echo.go"),
in.Deploy("echo"),
in.Call("echo", "echo", "Bob", func(t testing.TB, response string) {
assert.Equal(t, "Bye, Bob!!!", response)
}),
in.EditFile("echo", func(content []byte) []byte {
return []byte(strings.ReplaceAll(string(content), "Bye", "Bonjour"))
}, "echo.go"),
in.Deploy("echo"),
in.Call("echo", "echo", "Bob", func(t testing.TB, response string) {
assert.Equal(t, "Bonjour, Bob!!!", response)
}),
func(t testing.TB, ic in.TestContext) {
done.Store(true)
routineStopped.Wait()
Expand Down
2 changes: 2 additions & 0 deletions deployment/base/ftl-controller/deployment-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ data:
value: "http://$(MY_POD_IP):8893"
- name: FTL_LANGUAGE
value: "go,kotlin,java"
- name: LOG_TIMESTAMPS
value: "true"
ports:
- containerPort: 8893
readinessProbe:
Expand Down
2 changes: 2 additions & 0 deletions deployment/base/ftl-controller/ftl-controller.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ spec:
value: "test"
- name: AWS_ENDPOINT_URL
value: "http://localstack:4566"
- name: LOG_TIMESTAMPS
value: "true"
ports:
- containerPort: 8891
- containerPort: 8892
Expand Down
2 changes: 1 addition & 1 deletion internal/log/plain.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (t *plainSink) Log(entry Entry) error {

// Add timestamp if required
if t.logTime {
prefix += entry.Time.Format(time.TimeOnly) + " "
prefix += entry.Time.Format(time.StampMilli) + " "
}

// Add scope if required
Expand Down

0 comments on commit fbc91fb

Please sign in to comment.