From b980c86e7e14b9a12981493bd50d6075b716ad4d Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 26 Sep 2024 19:01:12 +1000 Subject: [PATCH] fix: schema sync is broken (#2843) As the sync works on module name rather than deployment when a deployment is removed it will remove any newer version of the module. This PR changes to build the schema in the same function as the route table, so they stay in sync. At persent this is not an atomic operation however, so there is a slight possibility for a request to see a new schema for an old route table, however this is still way better than the current situation. --- backend/controller/controller.go | 108 ++++++++++++------------------- 1 file changed, 40 insertions(+), 68 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 2fb1ee7149..6c0dc85a7b 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -64,7 +64,6 @@ import ( "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/modulecontext" internalobservability "github.com/TBD54566975/ftl/internal/observability" - ftlreflect "github.com/TBD54566975/ftl/internal/reflect" "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/rpc/headers" "github.com/TBD54566975/ftl/internal/sha256" @@ -229,9 +228,8 @@ type Service struct { clients *ttlcache.Cache[string, clients] // Complete schema synchronised from the database. - schema atomic.Value[*schema.Schema] + schemaState atomic.Value[schemaState] - routes atomic.Value[map[string]Route] config Config increaseReplicaFailures map[string]int @@ -270,9 +268,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca increaseReplicaFailures: map[string]int{}, runnerScaling: runnerScaling, } - svc.routes.Store(map[string]Route{}) - svc.schema.Store(&schema.Schema{}) - + svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}}) cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn) svc.cronJobs = cronSvc @@ -286,8 +282,6 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca svc.deploymentLogsSink = newDeploymentLogsSink(ctx, timelineSvc) - go svc.syncSchema(ctx) - // Use min, max backoff if we are running in production, otherwise use // (1s, 1s) (or develBackoff). Will also wrap the job such that it its next // runtime is capped at 1s. @@ -330,7 +324,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca } // Parallel tasks. - parallelTask(svc.syncRoutes, "sync-routes", time.Second, time.Second, time.Second*5) + parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5) parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5) parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5) parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10) @@ -362,7 +356,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path)) - ingress.Handle(start, s.schema.Load(), requestKey, routes, w, r, s.timeline, s.callWithRequest) + ingress.Handle(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.timeline, s.callWithRequest) } func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) { @@ -405,7 +399,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR if err != nil { return nil, fmt.Errorf("could not get status: %w", err) } - sroutes := s.routes.Load() + sroutes := s.schemaState.Load().routes routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) { return &ftlv1.StatusResponse_Route{ Module: route.Module, @@ -634,7 +628,7 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre }() deferredDeregistration = true } - _, err = s.syncRoutes(ctx) + _, err = s.syncRoutesAndSchema(ctx) if err != nil { return nil, fmt.Errorf("could not sync routes: %w", err) } @@ -704,7 +698,7 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque } // It's not actually ready until it is in the routes table - routes := s.routes.Load() + routes := s.schemaState.Load().routes var missing []string for _, module := range s.config.WaitFor { if _, ok := routes[module]; !ok { @@ -874,7 +868,7 @@ func (s *Service) sendFSMEventInTx(ctx context.Context, tx *dal.DAL, instance *d var candidates []string - sch := s.schema.Load() + sch := s.schemaState.Load().schema updateCandidates := func(ref *schema.Ref) (brk bool, err error) { verb := &schema.Verb{} @@ -941,7 +935,7 @@ func (s *Service) SetNextFSMEvent(ctx context.Context, req *connect.Request[ftlv return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("could not start transaction: %w", err)) } defer tx.CommitOrRollback(ctx, &err) - sch := s.schema.Load() + sch := s.schemaState.Load().schema msg := req.Msg fsm, eventType, fsmKey, err := s.resolveFSMEvent(msg) if err != nil { @@ -1003,7 +997,8 @@ func (s *Service) callWithRequest( return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required")) } - sch := s.schema.Load() + sstate := s.schemaState.Load() + sch := sstate.schema verbRef := schema.RefFromProto(req.Msg.Verb) verb := &schema.Verb{} @@ -1024,7 +1019,7 @@ func (s *Service) callWithRequest( } module := verbRef.Module - route, ok := s.routes.Load()[module] + route, ok := sstate.routes[module] if !ok { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module")) return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module)) @@ -1374,7 +1369,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call * } logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb) - sch := s.schema.Load() + sch := s.schemaState.Load().schema verb := &schema.Verb{} if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil { @@ -1552,7 +1547,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori return nil } - sch := s.schema.Load() + sch := s.schemaState.Load().schema fsm := &schema.FSM{} err = sch.ResolveToType(origin.FSM.ToRef(), fsm) @@ -1587,7 +1582,7 @@ func (s *Service) onAsyncFSMCallCompletion(ctx context.Context, tx *dal.DAL, ori } func (s *Service) resolveFSMEvent(msg *ftlv1.SendFSMEventRequest) (fsm *schema.FSM, eventType schema.Type, fsmKey schema.RefKey, err error) { - sch := s.schema.Load() + sch := s.schemaState.Load().schema fsm = &schema.FSM{} if err := sch.ResolveToType(schema.RefFromProto(msg.Fsm), fsm); err != nil { @@ -1766,8 +1761,10 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs) } -// Periodically sync the routing table from the DB. -func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) { +// Periodically sync the routing table and schema from the DB. +// We do this in a single function so the routing table and schema are always consistent +// And they both need the same info from the DB +func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) { deployments, err := s.dal.GetActiveDeployments(ctx) if errors.Is(err, libdal.ErrNotFound) { deployments = []dalmodel.Deployment{} @@ -1780,8 +1777,15 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) } defer tx.CommitOrRollback(ctx, &err) - old := s.routes.Load() + old := s.schemaState.Load().routes newRoutes := map[string]Route{} + modulesByName := map[string]*schema.Module{} + + builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert + modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins) + if err != nil { + return 0, fmt.Errorf("failed to convert builtins to schema: %w", err) + } for _, v := range deployments { deploymentLogger := s.getDeploymentLogger(ctx, v.Key) deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String()) @@ -1820,54 +1824,17 @@ func (s *Service) syncRoutes(ctx context.Context) (ret time.Duration, err error) } } newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint} + modulesByName[v.Module] = v.Schema } } - s.routes.Store(newRoutes) - return time.Second, nil -} - -// Synchronises Service.schema from the database. -func (s *Service) syncSchema(ctx context.Context) { - logger := log.FromContext(ctx) - modulesByName := map[string]*schema.Module{} - retry := backoff.Backoff{Max: time.Second * 5} - for { - err := s.watchModuleChanges(ctx, func(response *ftlv1.PullSchemaResponse) error { - switch response.ChangeType { - case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED: - moduleSchema, err := schema.ModuleFromProto(response.Schema) - if err != nil { - return err - } - modulesByName[moduleSchema.Name] = moduleSchema - - case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED: - delete(modulesByName, response.ModuleName) - } - orderedModules := maps.Values(modulesByName) - sort.SliceStable(orderedModules, func(i, j int) bool { - return orderedModules[i].Name < orderedModules[j].Name - }) - combined := &schema.Schema{Modules: orderedModules} - s.schema.Store(ftlreflect.DeepCopy(combined)) - return nil - }) - if err != nil { - next := retry.Duration() - if ctx.Err() == nil { - // Don't log when the context is done - logger.Warnf("Failed to watch module changes, retrying in %s: %s", next, err) - } - select { - case <-time.After(next): - case <-ctx.Done(): - return - } - } else { - retry.Reset() - } - } + orderedModules := maps.Values(modulesByName) + sort.SliceStable(orderedModules, func(i, j int) bool { + return orderedModules[i].Name < orderedModules[j].Name + }) + combined := &schema.Schema{Modules: orderedModules} + s.schemaState.Store(schemaState{schema: combined, routes: newRoutes}) + return time.Second, nil } func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) { @@ -1936,6 +1903,11 @@ type Route struct { Endpoint string } +type schemaState struct { + schema *schema.Schema + routes map[string]Route +} + func (r Route) String() string { return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint) }