diff --git a/backend/controller/controller.go b/backend/controller/controller.go index bfeb45611..cd19c5947 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -19,7 +19,6 @@ import ( "time" "connectrpc.com/connect" - "github.com/alecthomas/atomic" "github.com/alecthomas/kong" "github.com/alecthomas/types/either" "github.com/alecthomas/types/optional" @@ -64,9 +63,11 @@ import ( ftlmaps "github.com/TBD54566975/ftl/internal/maps" "github.com/TBD54566975/ftl/internal/model" internalobservability "github.com/TBD54566975/ftl/internal/observability" + "github.com/TBD54566975/ftl/internal/routing" "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/rpc/headers" "github.com/TBD54566975/ftl/internal/schema" + "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/sha256" "github.com/TBD54566975/ftl/internal/slices" status "github.com/TBD54566975/ftl/internal/terminal" @@ -212,8 +213,6 @@ type Service struct { // Map from runnerKey.String() to client. clients *ttlcache.Cache[string, clients] - // Complete schema synchronised from the database. - schemaState atomic.Value[schemaState] schemaSyncLock sync.Mutex config Config @@ -223,6 +222,7 @@ type Service struct { clientLock sync.Mutex routeTableUpdated *subscriptions.Topic[struct{}] + routeTable *routing.RouteTable } func New( @@ -253,6 +253,8 @@ func New( ldb := dbleaser.NewDatabaseLeaser(conn) scheduler := scheduledtask.New(ctx, key, ldb) + routingTable := routing.New(ctx, schemaeventsource.New(ctx, rpc.Dial[ftlv1connect.SchemaServiceClient](ftlv1connect.NewSchemaServiceClient, config.Bind.String(), log.Error))) + svc := &Service{ cm: cm, sm: sm, @@ -263,9 +265,8 @@ func New( clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)), config: config, increaseReplicaFailures: map[string]int{}, - routeTableUpdated: subscriptions.New[struct{}](), + routeTable: routingTable, } - svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}}) storage, err := artefacts.NewOCIRegistryStorage(config.Registry) if err != nil { @@ -323,7 +324,6 @@ func New( } // Parallel tasks. - parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5) parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5) parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5) parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10) @@ -381,12 +381,12 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR if err != nil { return nil, fmt.Errorf("could not get status: %w", err) } - sroutes := s.schemaState.Load().routes - routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) { + allModules := s.routeTable.Current() + routes := slices.Map(allModules.Schema().Modules, func(module *schema.Module) (out *ftlv1.StatusResponse_Route) { return &ftlv1.StatusResponse_Route{ - Module: route.Module, - Deployment: route.Deployment.String(), - Endpoint: route.Endpoint, + Module: module.Name, + Deployment: module.Runtime.Deployment.DeploymentKey, + Endpoint: module.Runtime.Deployment.Endpoint, } }) replicas := map[string]int32{} @@ -620,7 +620,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre }() deferredDeregistration = true } - _, err = s.syncRoutesAndSchema(ctx) if err != nil { return nil, fmt.Errorf("could not sync routes: %w", err) } @@ -694,11 +693,11 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque return connect.NewResponse(&ftlv1.PingResponse{}), nil } + routeView := s.routeTable.Current() // It's not actually ready until it is in the routes table - routes := s.schemaState.Load().routes var missing []string for _, module := range s.config.WaitFor { - if _, ok := routes[module]; !ok { + if _, ok := routeView.GetForModule(module).Get(); !ok { missing = append(missing, module) } } @@ -766,12 +765,12 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request logger.Debugf("Checking for updated deployment context for: %s", key.String()) h := sha.New() + routeView := s.routeTable.Current() configs, err := s.cm.MapForModule(ctx, module) routeTable := map[string]string{} - routes := s.schemaState.Load().routes for _, module := range callableModuleNames { - if route, ok := routes[module]; ok { - routeTable[module] = route.Endpoint + if route, ok := routeView.GetForModule(module).Get(); ok { + routeTable[module] = route.String() } } @@ -895,12 +894,12 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl } // Add to timeline. - sstate := s.schemaState.Load() module := req.Msg.Topic.Module - route, ok := sstate.routes[module] + routes := s.routeTable.Current() + route, ok := routes.GetDeployment(module).Get() if ok { s.timeline.EnqueueEvent(ctx, &timeline.PubSubPublish{ - DeploymentKey: route.Deployment, + DeploymentKey: route, RequestKey: requestKey, Time: now, SourceVerb: schema.Ref{Name: req.Msg.Caller, Module: req.Msg.Topic.Module}, @@ -937,8 +936,8 @@ func (s *Service) callWithRequest( return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required")) } - sstate := s.schemaState.Load() - sch := sstate.schema + routes := s.routeTable.Current() + sch := routes.Schema() verbRef := schema.RefFromProto(req.Msg.Verb) verb := &schema.Verb{} @@ -965,7 +964,7 @@ func (s *Service) callWithRequest( } module := verbRef.Module - route, ok := sstate.routes[module] + route, ok := routes.GetForModule(module).Get() if !ok { observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module")) return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module)) @@ -999,7 +998,7 @@ func (s *Service) callWithRequest( } callEvent := &timeline.Call{ - DeploymentKey: route.Deployment, + DeploymentKey: routes.GetDeployment(module).Default(model.NewDeploymentKey("unkown")), RequestKey: requestKey, ParentRequestKey: parentKey, StartTime: start, @@ -1024,7 +1023,7 @@ func (s *Service) callWithRequest( return nil, err } - client := s.clientsForEndpoint(route.Endpoint) + client := s.clientsForEndpoint(route.String()) if pk, ok := parentKey.Get(); ok { ctx = rpc.WithParentRequestKey(ctx, pk) @@ -1042,7 +1041,7 @@ func (s *Service) callWithRequest( } else { callEvent.Response = either.RightOf[*ftlv1.CallResponse](err) observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) - logger.Errorf(err, "Call failed to verb %s for deployment %s", verbRef.String(), route.Deployment) + logger.Errorf(err, "Call failed to verb %s for module %s", verbRef.String(), module) } s.timeline.EnqueueEvent(ctx, callEvent) @@ -1231,11 +1230,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration logger.Tracef("Acquiring async call") now := time.Now().UTC() - sstate := s.schemaState.Load() + sstate := s.routeTable.Current() enqueueTimelineEvent := func(call *dal.AsyncCall, err optional.Option[error]) { module := call.Verb.Module - route, ok := sstate.routes[module] + deployment, ok := sstate.GetDeployment(module).Get() if ok { eventType := timeline.AsyncExecuteEventTypeUnkown switch call.Origin.(type) { @@ -1253,7 +1252,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration errStr = optional.Some(e.Error()) } s.timeline.EnqueueEvent(ctx, &timeline.AsyncExecute{ - DeploymentKey: route.Deployment, + DeploymentKey: deployment, RequestKey: call.ParentRequestKey, EventType: eventType, Verb: *call.Verb.ToRef(), @@ -1375,7 +1374,8 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call * } logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb) - sch := s.schemaState.Load().schema + routeView := s.routeTable.Current() + sch := routeView.Schema() verb := &schema.Verb{} if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil { @@ -1685,92 +1685,6 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs) } -// Periodically sync the routing table and schema from the DB. -// We do this in a single function so the routing table and schema are always consistent -// And they both need the same info from the DB -func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) { - s.schemaSyncLock.Lock() // This can result in confusing log messages if it is called concurrently - defer s.schemaSyncLock.Unlock() - deployments, err := s.dal.GetActiveDeployments(ctx) - if errors.Is(err, libdal.ErrNotFound) { - deployments = []dalmodel.Deployment{} - } else if err != nil { - return 0, err - } - tx, err := s.dal.Begin(ctx) - if err != nil { - return 0, fmt.Errorf("failed to start transaction %w", err) - } - defer tx.CommitOrRollback(ctx, &err) - - old := s.schemaState.Load().routes - newRoutes := map[string]Route{} - modulesByName := map[string]*schema.Module{} - changed := false - - builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert - modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins) - if err != nil { - return 0, fmt.Errorf("failed to convert builtins to schema: %w", err) - } - for _, v := range deployments { - deploymentLogger := s.getDeploymentLogger(ctx, v.Key) - deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String()) - // Deployments are in order, oldest to newest - // If we see a newer one overwrite an old one that means the new one is read - // And we set its replicas to zero - // It may seem a bit odd to do this here but this is where we are actually updating the routing table - // Which is what makes as a deployment 'live' from a clients POV - if v.Schema.Runtime == nil || v.Schema.Runtime.Deployment == nil { - deploymentLogger.Debugf("Deployment %s has no runtime metadata", v.Key.String()) - continue - } - targetEndpoint := v.Schema.Runtime.Deployment.Endpoint - if targetEndpoint == "" { - deploymentLogger.Debugf("Failed to get updated endpoint for deployment %s", v.Key.String()) - continue - } - // Check if this is a new route - if oldRoute, oldRouteExists := old[v.Module]; !oldRouteExists || oldRoute.Deployment.String() != v.Key.String() { - // If it is a new route we only add it if we can ping it - // Kube deployments can take a while to come up, so we don't want to add them to the routing table until they are ready. - _, err := s.clientsForEndpoint(targetEndpoint).verb.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{})) - if err != nil { - deploymentLogger.Tracef("Unable to ping %s, not adding to route table", v.Key.String()) - continue - } - deploymentLogger.Infof("Deployed %s", v.Key.String()) - changed = true - status.UpdateModuleState(ctx, v.Module, status.BuildStateDeployed) - } - if prev, ok := newRoutes[v.Module]; ok { - // We have already seen a route for this module, the existing route must be an old one - // as the deployments are in order - // We have a new route ready to go, so we can just set the old one to 0 replicas - // Do this in a TX so it doesn't happen until the route table is updated - deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment) - err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0) - if err != nil { - deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String()) - } - changed = true - } - newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint} - modulesByName[v.Module] = v.Schema - } - - orderedModules := maps.Values(modulesByName) - sort.SliceStable(orderedModules, func(i, j int) bool { - return orderedModules[i].Name < orderedModules[j].Name - }) - combined := &schema.Schema{Modules: orderedModules} - s.schemaState.Store(schemaState{schema: combined, routes: newRoutes}) - if changed { - s.routeTableUpdated.Publish(struct{}{}) - } - return time.Second, nil -} - func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) @@ -1817,18 +1731,3 @@ func validateCallBody(body []byte, verb *schema.Verb, sch *schema.Schema) error } return nil } - -type Route struct { - Module string - Deployment model.DeploymentKey - Endpoint string -} - -type schemaState struct { - schema *schema.Schema - routes map[string]Route -} - -func (r Route) String() string { - return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint) -} diff --git a/backend/provisioner/runner_scaling_provisioner.go b/backend/provisioner/runner_scaling_provisioner.go index 611483d8c..12ea57828 100644 --- a/backend/provisioner/runner_scaling_provisioner.go +++ b/backend/provisioner/runner_scaling_provisioner.go @@ -3,6 +3,7 @@ package provisioner import ( "context" "fmt" + "time" "connectrpc.com/connect" _ "github.com/go-sql-driver/mysql" @@ -60,6 +61,24 @@ func provisionRunner(scaling scaling.RunnerScaling) InMemResourceProvisionerFn { } ep := endpoint.MustGet() endpointURI := ep.String() + + runnerClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, endpointURI, log.Error) + // TODO: a proper timeout + timeout := time.After(1 * time.Minute) + for { + _, err := runnerClient.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{})) + if err == nil { + break + } + select { + case <-ctx.Done(): + return nil, fmt.Errorf("context cancelled %w", ctx.Err()) + case <-timeout: + return nil, fmt.Errorf("timed out waiting for runner to be ready") + case <-time.After(time.Millisecond * 100): + } + } + runner.Runner.Output = &provisioner.RunnerResource_RunnerResourceOutput{ RunnerUri: endpointURI, DeploymentKey: deployment, diff --git a/internal/modulecontext/to_proto.go b/internal/modulecontext/to_proto.go deleted file mode 100644 index 0e3da8fb2..000000000 --- a/internal/modulecontext/to_proto.go +++ /dev/null @@ -1,47 +0,0 @@ -package modulecontext - -import ( - "fmt" - "strconv" - - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" -) - -// ToProto converts a ModuleContext to a proto response. -func (m ModuleContext) ToProto() *ftlv1.GetModuleContextResponse { - databases := make([]*ftlv1.GetModuleContextResponse_DSN, 0, len(m.databases)) - for name, entry := range m.databases { - databases = append(databases, &ftlv1.GetModuleContextResponse_DSN{ - Name: name, - Type: entry.DBType.ToProto(), - Dsn: entry.DSN, - }) - } - routes := make([]*ftlv1.GetModuleContextResponse_Route, 0, len(m.routes)) - for name, entry := range m.routes { - routes = append(routes, &ftlv1.GetModuleContextResponse_Route{ - Module: name, - Uri: entry, - }) - } - return &ftlv1.GetModuleContextResponse{ - Module: m.module, - Configs: m.configs, - Secrets: m.secrets, - Routes: routes, - Databases: databases, - } -} - -func (x DBType) ToProto() ftlv1.GetModuleContextResponse_DbType { - switch x { - case DBTypeUnspecified: - return ftlv1.GetModuleContextResponse_DB_TYPE_UNSPECIFIED - case DBTypePostgres: - return ftlv1.GetModuleContextResponse_DB_TYPE_POSTGRES - case DBTypeMySQL: - return ftlv1.GetModuleContextResponse_DB_TYPE_MYSQL - default: - panic(fmt.Sprintf("unknown DB type: %s", strconv.Itoa(int(x)))) - } -} diff --git a/internal/routing/routing.go b/internal/routing/routing.go index 8374239d5..80b2710ed 100644 --- a/internal/routing/routing.go +++ b/internal/routing/routing.go @@ -6,6 +6,7 @@ import ( "github.com/alecthomas/atomic" "github.com/alecthomas/types/optional" + "github.com/alecthomas/types/pubsub" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" @@ -13,14 +14,22 @@ import ( "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) +type RouteView struct { + byDeployment map[string]*url.URL + moduleToDeployment map[string]model.DeploymentKey + schema *schema.Schema +} + type RouteTable struct { - // Routes keyed by module name. TODO: this should be keyed by deployment key. - routes *atomic.Value[map[string]*url.URL] + routes *atomic.Value[RouteView] + // When the routes for a module change they are published here. + changeNotification *pubsub.Topic[string] } func New(ctx context.Context, changes schemaeventsource.EventSource) *RouteTable { r := &RouteTable{ - routes: atomic.New(extractRoutes(ctx, changes.View())), + routes: atomic.New(extractRoutes(ctx, changes.View())), + changeNotification: pubsub.New[string](), } go r.run(ctx, changes) return r @@ -33,38 +42,80 @@ func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSou return case event := <-changes.Events(): + old := r.routes.Load() routes := extractRoutes(ctx, event.Schema()) + //TODO: removal notifications + for module, rd := range old.moduleToDeployment { + if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] { + r.changeNotification.Publish(module) + } + } r.routes.Store(routes) } } } +// Current returns the current routes. +func (r *RouteTable) Current() RouteView { + return r.routes.Load() +} + // Get returns the URL for the given deployment or None if it doesn't exist. -func (r *RouteTable) Get(deployment model.DeploymentKey) optional.Option[*url.URL] { - routes := r.routes.Load() - return optional.Zero(routes[deployment.Payload.Module]) +func (r RouteView) Get(deployment model.DeploymentKey) optional.Option[url.URL] { + mod := r.byDeployment[deployment.String()] + if mod == nil { + return optional.None[url.URL]() + } + return optional.Some(*mod) } // GetForModule returns the URL for the given module or None if it doesn't exist. -func (r *RouteTable) GetForModule(module string) optional.Option[*url.URL] { - routes := r.routes.Load() - return optional.Zero(routes[module]) +func (r RouteView) GetForModule(module string) optional.Option[url.URL] { + dep, ok := r.moduleToDeployment[module] + if !ok { + return optional.None[url.URL]() + } + return r.Get(dep) +} + +// GetDeployment returns the deployment key for the given module or None if it doesn't exist. +func (r RouteView) GetDeployment(module string) optional.Option[model.DeploymentKey] { + return optional.Zero(r.moduleToDeployment[module]) } -func extractRoutes(ctx context.Context, schema *schema.Schema) map[string]*url.URL { +// Schema returns the current schema that the routes are based on. +func (r RouteView) Schema() *schema.Schema { + return r.schema +} + +func (r *RouteTable) Subscribe() chan string { + return r.changeNotification.Subscribe(nil) +} + +func extractRoutes(ctx context.Context, sch *schema.Schema) RouteView { + if sch == nil { + return RouteView{moduleToDeployment: map[string]model.DeploymentKey{}, byDeployment: map[string]*url.URL{}, schema: &schema.Schema{}} + } logger := log.FromContext(ctx) - out := make(map[string]*url.URL, len(schema.Modules)) - for _, module := range schema.Modules { + moduleToDeployment := make(map[string]model.DeploymentKey, len(sch.Modules)) + byDeployment := make(map[string]*url.URL, len(sch.Modules)) + for _, module := range sch.Modules { if module.Runtime == nil || module.Runtime.Deployment == nil { continue } rt := module.Runtime.Deployment + key, err := model.ParseDeploymentKey(rt.DeploymentKey) + if err != nil { + logger.Warnf("Failed to parse deployment key for module %q: %v", module.Name, err) + continue + } u, err := url.Parse(rt.Endpoint) if err != nil { logger.Warnf("Failed to parse endpoint URL for module %q: %v", module.Name, err) continue } - out[module.Name] = u + moduleToDeployment[module.Name] = key + byDeployment[rt.DeploymentKey] = u } - return out + return RouteView{moduleToDeployment: moduleToDeployment, byDeployment: byDeployment} } diff --git a/internal/routing/routing_test.go b/internal/routing/routing_test.go index 994cce357..3f6098ec4 100644 --- a/internal/routing/routing_test.go +++ b/internal/routing/routing_test.go @@ -22,27 +22,31 @@ func TestRouting(t *testing.T) { Name: "time", Runtime: &schema.ModuleRuntime{ Deployment: &schema.ModuleRuntimeDeployment{ - Endpoint: "http://time.ftl", + Endpoint: "http://time.ftl", + DeploymentKey: "dpl-time-sjkfislfjslfas", }, }, }, }) rt := New(log.ContextWithNewDefaultLogger(context.TODO()), events) - assert.Equal(t, optional.Some(must.Get(url.Parse("http://time.ftl"))), rt.GetForModule("time")) - assert.Equal(t, optional.None[*url.URL](), rt.GetForModule("echo")) + current := rt.Current() + assert.Equal(t, optional.Ptr(must.Get(url.Parse("http://time.ftl"))), current.GetForModule("time")) + assert.Equal(t, optional.None[url.URL](), current.GetForModule("echo")) events.Publish(schemaeventsource.EventUpsert{ Module: &schema.Module{ Name: "echo", Runtime: &schema.ModuleRuntime{ Deployment: &schema.ModuleRuntimeDeployment{ - Endpoint: "http://echo.ftl", + Endpoint: "http://echo.ftl", + DeploymentKey: "dpl-echo-sjkfiaslfjslfs", }, }, }, }) time.Sleep(time.Millisecond * 250) - assert.Equal(t, optional.Some(must.Get(url.Parse("http://echo.ftl"))), rt.GetForModule("echo")) + current = rt.Current() + assert.Equal(t, optional.Ptr(must.Get(url.Parse("http://echo.ftl"))), current.GetForModule("echo")) }