From fedc79859bf6e59ec3308e7e1c75b1dc5affb603 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 4 Dec 2024 16:54:26 +1100 Subject: [PATCH] don't route through controller --- backend/controller/controller.go | 15 ++++------ backend/cron/service.go | 15 ++++++++-- backend/ingress/handler.go | 12 +++++++- backend/ingress/service.go | 15 ++++++---- cmd/ftl-cron/main.go | 7 +++-- cmd/ftl-http-ingress/main.go | 19 ++++++------ frontend/cli/cmd_serve.go | 6 ++-- internal/routing/routing.go | 11 +++++-- internal/routing/routingclient.go | 49 +++++++++++++++++++++++++++++++ 9 files changed, 112 insertions(+), 37 deletions(-) create mode 100644 internal/routing/routingclient.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 8dec9789c..29fa85020 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -162,7 +162,6 @@ func Start( g.Go(func() error { return rpc.Serve(ctx, config.Bind, - rpc.GRPC(ftlv1connect.NewVerbServiceHandler, svc), rpc.GRPC(deploymentconnect.NewDeploymentServiceHandler, svc), rpc.GRPC(leaseconnect.NewLeaseServiceHandler, svc), rpc.GRPC(ftlv1connect.NewControllerServiceHandler, svc), @@ -180,7 +179,6 @@ func Start( } var _ ftlv1connect.ControllerServiceHandler = (*Service)(nil) -var _ ftlv1connect.VerbServiceHandler = (*Service)(nil) var _ ftlv1connect.SchemaServiceHandler = (*Service)(nil) type clients struct { @@ -746,7 +744,6 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request // Initialize checksum to -1; a zero checksum does occur when the context contains no settings lastChecksum := int64(-1) - callableModuleNames := []string{} callableModules := map[string]bool{} for _, decl := range deployment.Schema.Decls { switch entry := decl.(type) { @@ -762,7 +759,7 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request } } - callableModuleNames = maps.Keys(callableModules) + callableModuleNames := maps.Keys(callableModules) callableModuleNames = slices.Sort(callableModuleNames) logger.Debugf("Modules %s can call %v", module, callableModuleNames) for { @@ -877,10 +874,6 @@ func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[f } } -func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { - return s.callWithRequest(ctx, headers.CopyRequestForForwarding(req), optional.None[model.RequestKey](), optional.None[model.RequestKey](), "") -} - func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldeployment.PublishEventRequest]) (*connect.Response[ftldeployment.PublishEventResponse], error) { // Publish the event. now := time.Now().UTC() @@ -1621,7 +1614,11 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon moduleSchema := message.Schema.ToProto().(*schemapb.Module) //nolint:forcetypeassert hasher := sha.New() - data := []byte(moduleSchema.String()) + data, err := schema.ModuleToBytes(message.Schema) + if err != nil { + logger.Errorf(err, "Could not serialize module schema") + return fmt.Errorf("could not serialize module schema: %w", err) + } if _, err := hasher.Write(data); err != nil { return err } diff --git a/backend/cron/service.go b/backend/cron/service.go index f8dc02e0d..03ad3c15c 100644 --- a/backend/cron/service.go +++ b/backend/cron/service.go @@ -12,9 +12,11 @@ import ( "github.com/TBD54566975/ftl/backend/cron/observability" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/TBD54566975/ftl/internal/cron" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/routing" "github.com/TBD54566975/ftl/internal/schema" "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/slices" @@ -33,7 +35,7 @@ type cronJob struct { } type Config struct { - ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"` + SchemaServiceEndpoint *url.URL `name:"ftl-endpoint" help:"Schema Service endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"` } func (c cronJob) String() string { @@ -46,12 +48,13 @@ func (c cronJob) String() string { } // Start the cron service. Blocks until the context is cancelled. -func Start(ctx context.Context, eventSource schemaeventsource.EventSource, verbClient CallClient) error { +func Start(ctx context.Context, eventSource schemaeventsource.EventSource, table *routing.RouteTable) error { logger := log.FromContext(ctx).Scope("cron") // Map of cron jobs for each module. cronJobs := map[string][]cronJob{} // Cron jobs ordered by next execution. cronQueue := []cronJob{} + routeManager := routing.NewClientManager(ctx, table, ftlv1connect.NewVerbServiceClient) logger.Debugf("Starting cron service") @@ -98,7 +101,13 @@ func Start(ctx context.Context, eventSource schemaeventsource.EventSource, verbC NextExecution: job.next, } observability.Cron.JobStarted(ctx, cronModel) - if err := callCronJob(ctx, verbClient, job); err != nil { + client, ok := routeManager.LookupClient(job.module).Get() + if !ok { + logger.Debugf("Service not ready") + observability.Cron.JobFailed(ctx, cronModel) + continue + } + if err := callCronJob(ctx, client, job); err != nil { observability.Cron.JobFailed(ctx, cronModel) logger.Errorf(err, "Failed to execute cron job") } else { diff --git a/backend/ingress/handler.go b/backend/ingress/handler.go index 06d9b3cea..486fddf90 100644 --- a/backend/ingress/handler.go +++ b/backend/ingress/handler.go @@ -16,13 +16,14 @@ import ( "github.com/TBD54566975/ftl/backend/libdal" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/schema" ) // handleHTTP HTTP ingress routes. -func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, routesForMethod []ingressRoute, w http.ResponseWriter, r *http.Request, verbClient CallClient) { +func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, routesForMethod []ingressRoute, w http.ResponseWriter, r *http.Request, verbClientLookup func(module string) optional.Option[ftlv1connect.VerbServiceClient]) { logger := log.FromContext(r.Context()).Scope(fmt.Sprintf("ingress:%s:%s", r.Method, r.URL.Path)) logger.Debugf("Start ingress request") @@ -68,6 +69,15 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques Verb: verbRef, Body: body, }) + verbClient, ok := verbClientLookup(route.module).Get() + if !ok { + // we have a route in the schema, but the service is not ready + logger.Debugf("service not ready") + http.Error(w, err.Error(), http.StatusBadGateway) + observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request")) + recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusBadGateway, err.Error()) + return + } resp, err := verbClient.Call(r.Context(), creq) if err != nil { diff --git a/backend/ingress/service.go b/backend/ingress/service.go index 69807b11f..bdc006acf 100644 --- a/backend/ingress/service.go +++ b/backend/ingress/service.go @@ -16,10 +16,12 @@ import ( "github.com/TBD54566975/ftl/backend/controller/observability" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/TBD54566975/ftl/internal/cors" ftlhttp "github.com/TBD54566975/ftl/internal/http" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/routing" "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" "github.com/TBD54566975/ftl/internal/slices" ) @@ -43,16 +45,17 @@ func (c *Config) Validate() error { type service struct { // Complete schema synchronised from the database. - view *atomic.Value[materialisedView] - callClient CallClient + view *atomic.Value[materialisedView] + routingTable *routing.RouteClientManager[ftlv1connect.VerbServiceClient] } // Start the HTTP ingress service. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, verbClient CallClient) error { +func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource) error { logger := log.FromContext(ctx).Scope("http-ingress") + routingTable := routing.New(ctx, schemaEventSource) svc := &service{ - view: syncView(ctx, schemaEventSource), - callClient: verbClient, + view: syncView(ctx, schemaEventSource), + routingTable: routing.NewClientManager(ctx, routingTable, ftlv1connect.NewVerbServiceClient), } ingressHandler := otelhttp.NewHandler(http.Handler(svc), "ftl.ingress") @@ -89,5 +92,5 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) { observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal")) return } - handleHTTP(start, state.schema, requestKey, routes, w, r, s.callClient) + handleHTTP(start, state.schema, requestKey, routes, w, r, s.routingTable.LookupClient) } diff --git a/cmd/ftl-cron/main.go b/cmd/ftl-cron/main.go index 19b29816a..aa9522a0e 100644 --- a/cmd/ftl-cron/main.go +++ b/cmd/ftl-cron/main.go @@ -15,6 +15,7 @@ import ( _ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota. "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/observability" + "github.com/TBD54566975/ftl/internal/routing" "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" ) @@ -42,10 +43,10 @@ func main() { err = observability.Init(ctx, false, "", "ftl-cron", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") - verbClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, cli.CronConfig.ControllerEndpoint.String(), log.Error) - schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.CronConfig.ControllerEndpoint.String(), log.Error) + schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.CronConfig.SchemaServiceEndpoint.String(), log.Error) eventSource := schemaeventsource.New(ctx, schemaClient) - err = cron.Start(ctx, eventSource, verbClient) + rt := routing.New(ctx, schemaeventsource.New(ctx, schemaClient)) + err = cron.Start(ctx, eventSource, rt) kctx.FatalIfErrorf(err, "failed to start cron") } diff --git a/cmd/ftl-http-ingress/main.go b/cmd/ftl-http-ingress/main.go index 0329e281c..d2ddf2269 100644 --- a/cmd/ftl-http-ingress/main.go +++ b/cmd/ftl-http-ingress/main.go @@ -21,12 +21,12 @@ import ( ) var cli struct { - Version kong.VersionFlag `help:"Show version."` - ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"` - LogConfig log.Config `embed:"" prefix:"log-"` - HTTPIngressConfig ingress.Config `embed:""` - ConfigFlag string `name:"config" short:"C" help:"Path to FTL project cf file." env:"FTL_CONFIG" placeholder:"FILE"` - ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"` + Version kong.VersionFlag `help:"Show version."` + ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"` + LogConfig log.Config `embed:"" prefix:"log-"` + HTTPIngressConfig ingress.Config `embed:""` + ConfigFlag string `name:"config" short:"C" help:"Path to FTL project cf file." env:"FTL_CONFIG" placeholder:"FILE"` + SchemaServerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"` } func main() { @@ -44,10 +44,9 @@ func main() { err = observability.Init(ctx, false, "", "ftl-cron", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") - verbClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, cli.ControllerEndpoint.String(), log.Error) - schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.ControllerEndpoint.String(), log.Error) - schemaEventSource := schemaeventsource.New(ctx, schemaClient) + schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.SchemaServerEndpoint.String(), log.Error) + eventSource := schemaeventsource.New(ctx, schemaClient) - err = ingress.Start(ctx, cli.HTTPIngressConfig, schemaEventSource, verbClient) + err = ingress.Start(ctx, cli.HTTPIngressConfig, eventSource) kctx.FatalIfErrorf(err, "failed to start HTTP ingress") } diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index 9b4c12eb9..04d5cebf8 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -37,6 +37,7 @@ import ( "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/observability" "github.com/TBD54566975/ftl/internal/projectconfig" + "github.com/TBD54566975/ftl/internal/routing" "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/schema" "github.com/TBD54566975/ftl/internal/schema/schemaeventsource" @@ -304,7 +305,8 @@ func (s *serveCommonConfig) run( // Start Cron wg.Go(func() error { - err := cron.Start(ctx, schemaEventSourceFactory(), verbClient) + rt := routing.New(ctx, schemaEventSourceFactory()) + err := cron.Start(ctx, schemaEventSourceFactory(), rt) if err != nil { return fmt.Errorf("cron failed: %w", err) } @@ -312,7 +314,7 @@ func (s *serveCommonConfig) run( }) // Start Ingress wg.Go(func() error { - err := ingress.Start(ctx, s.Ingress, schemaEventSourceFactory(), verbClient) + err := ingress.Start(ctx, s.Ingress, schemaEventSourceFactory()) if err != nil { return fmt.Errorf("ingress failed: %w", err) } diff --git a/internal/routing/routing.go b/internal/routing/routing.go index c0365b66e..a8e288b65 100644 --- a/internal/routing/routing.go +++ b/internal/routing/routing.go @@ -41,15 +41,20 @@ func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSou case <-ctx.Done(): return - case event := <-changes.Events(): + case <-changes.Events(): old := r.routes.Load() - routes := extractRoutes(ctx, event.Schema()) - //TODO: removal notifications + routes := extractRoutes(ctx, changes.View()) for module, rd := range old.moduleToDeployment { if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] { r.changeNotification.Publish(module) } } + for module, rd := range routes.moduleToDeployment { + // Check for new modules + if old.byDeployment[rd.String()] == nil { + r.changeNotification.Publish(module) + } + } r.routes.Store(routes) } } diff --git a/internal/routing/routingclient.go b/internal/routing/routingclient.go new file mode 100644 index 000000000..9fc36c95e --- /dev/null +++ b/internal/routing/routingclient.go @@ -0,0 +1,49 @@ +package routing + +import ( + "context" + + "github.com/alecthomas/types/optional" + "github.com/puzpuzpuz/xsync/v3" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/rpc" +) + +// RouteClientManager managed clients for the routing service, so calls to a given module can be routed to the correct instance. +type RouteClientManager[T rpc.Pingable] struct { + routingTable *RouteTable + moduleClients *xsync.MapOf[string, optional.Option[T]] + factory rpc.ClientFactory[T] +} + +func NewClientManager[T rpc.Pingable](ctx context.Context, routingTable *RouteTable, factory rpc.ClientFactory[T]) *RouteClientManager[T] { + svc := &RouteClientManager[T]{ + routingTable: routingTable, + moduleClients: xsync.NewMapOf[string, optional.Option[T]](), + factory: factory, + } + routeUpdates := svc.routingTable.Subscribe() + go func() { + for { + select { + case <-ctx.Done(): + return + case module := <-routeUpdates: + svc.moduleClients.Delete(module) + } + } + }() + return svc +} + +func (s *RouteClientManager[T]) LookupClient(module string) optional.Option[T] { + res, _ := s.moduleClients.LoadOrCompute(module, func() optional.Option[T] { + route, ok := s.routingTable.Current().GetForModule(module).Get() + if !ok { + return optional.None[T]() + } + return optional.Some[T](rpc.Dial(s.factory, route.String(), log.Error)) + }) + return res +}