Skip to content

Commit

Permalink
don't route through controller
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Dec 4, 2024
1 parent 1a3ce04 commit fedc798
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 37 deletions.
15 changes: 6 additions & 9 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ func Start(

g.Go(func() error {
return rpc.Serve(ctx, config.Bind,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, svc),
rpc.GRPC(deploymentconnect.NewDeploymentServiceHandler, svc),
rpc.GRPC(leaseconnect.NewLeaseServiceHandler, svc),
rpc.GRPC(ftlv1connect.NewControllerServiceHandler, svc),
Expand All @@ -180,7 +179,6 @@ func Start(
}

var _ ftlv1connect.ControllerServiceHandler = (*Service)(nil)
var _ ftlv1connect.VerbServiceHandler = (*Service)(nil)
var _ ftlv1connect.SchemaServiceHandler = (*Service)(nil)

type clients struct {
Expand Down Expand Up @@ -746,7 +744,6 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
// Initialize checksum to -1; a zero checksum does occur when the context contains no settings
lastChecksum := int64(-1)

callableModuleNames := []string{}
callableModules := map[string]bool{}
for _, decl := range deployment.Schema.Decls {
switch entry := decl.(type) {
Expand All @@ -762,7 +759,7 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request

}
}
callableModuleNames = maps.Keys(callableModules)
callableModuleNames := maps.Keys(callableModules)
callableModuleNames = slices.Sort(callableModuleNames)
logger.Debugf("Modules %s can call %v", module, callableModuleNames)
for {
Expand Down Expand Up @@ -877,10 +874,6 @@ func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[f
}
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
return s.callWithRequest(ctx, headers.CopyRequestForForwarding(req), optional.None[model.RequestKey](), optional.None[model.RequestKey](), "")
}

func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldeployment.PublishEventRequest]) (*connect.Response[ftldeployment.PublishEventResponse], error) {
// Publish the event.
now := time.Now().UTC()
Expand Down Expand Up @@ -1621,7 +1614,11 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon

moduleSchema := message.Schema.ToProto().(*schemapb.Module) //nolint:forcetypeassert
hasher := sha.New()
data := []byte(moduleSchema.String())
data, err := schema.ModuleToBytes(message.Schema)
if err != nil {
logger.Errorf(err, "Could not serialize module schema")
return fmt.Errorf("could not serialize module schema: %w", err)
}
if _, err := hasher.Write(data); err != nil {
return err
}
Expand Down
15 changes: 12 additions & 3 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ import (
"github.com/TBD54566975/ftl/backend/cron/observability"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/routing"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
"github.com/TBD54566975/ftl/internal/slices"
Expand All @@ -33,7 +35,7 @@ type cronJob struct {
}

type Config struct {
ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
SchemaServiceEndpoint *url.URL `name:"ftl-endpoint" help:"Schema Service endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
}

func (c cronJob) String() string {
Expand All @@ -46,12 +48,13 @@ func (c cronJob) String() string {
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, verbClient CallClient) error {
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, table *routing.RouteTable) error {
logger := log.FromContext(ctx).Scope("cron")
// Map of cron jobs for each module.
cronJobs := map[string][]cronJob{}
// Cron jobs ordered by next execution.
cronQueue := []cronJob{}
routeManager := routing.NewClientManager(ctx, table, ftlv1connect.NewVerbServiceClient)

logger.Debugf("Starting cron service")

Expand Down Expand Up @@ -98,7 +101,13 @@ func Start(ctx context.Context, eventSource schemaeventsource.EventSource, verbC
NextExecution: job.next,
}
observability.Cron.JobStarted(ctx, cronModel)
if err := callCronJob(ctx, verbClient, job); err != nil {
client, ok := routeManager.LookupClient(job.module).Get()
if !ok {
logger.Debugf("Service not ready")
observability.Cron.JobFailed(ctx, cronModel)
continue
}
if err := callCronJob(ctx, client, job); err != nil {
observability.Cron.JobFailed(ctx, cronModel)
logger.Errorf(err, "Failed to execute cron job")
} else {
Expand Down
12 changes: 11 additions & 1 deletion backend/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ import (
"github.com/TBD54566975/ftl/backend/libdal"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
)

// handleHTTP HTTP ingress routes.
func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, routesForMethod []ingressRoute, w http.ResponseWriter, r *http.Request, verbClient CallClient) {
func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.RequestKey, routesForMethod []ingressRoute, w http.ResponseWriter, r *http.Request, verbClientLookup func(module string) optional.Option[ftlv1connect.VerbServiceClient]) {
logger := log.FromContext(r.Context()).Scope(fmt.Sprintf("ingress:%s:%s", r.Method, r.URL.Path))
logger.Debugf("Start ingress request")

Expand Down Expand Up @@ -68,6 +69,15 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
Verb: verbRef,
Body: body,
})
verbClient, ok := verbClientLookup(route.module).Get()
if !ok {
// we have a route in the schema, but the service is not ready
logger.Debugf("service not ready")
http.Error(w, err.Error(), http.StatusBadGateway)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusBadGateway, err.Error())
return
}

resp, err := verbClient.Call(r.Context(), creq)
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions backend/ingress/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (
"github.com/TBD54566975/ftl/backend/controller/observability"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/schema/v1"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/cors"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/routing"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
"github.com/TBD54566975/ftl/internal/slices"
)
Expand All @@ -43,16 +45,17 @@ func (c *Config) Validate() error {

type service struct {
// Complete schema synchronised from the database.
view *atomic.Value[materialisedView]
callClient CallClient
view *atomic.Value[materialisedView]
routingTable *routing.RouteClientManager[ftlv1connect.VerbServiceClient]
}

// Start the HTTP ingress service. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, verbClient CallClient) error {
func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource) error {
logger := log.FromContext(ctx).Scope("http-ingress")
routingTable := routing.New(ctx, schemaEventSource)
svc := &service{
view: syncView(ctx, schemaEventSource),
callClient: verbClient,
view: syncView(ctx, schemaEventSource),
routingTable: routing.NewClientManager(ctx, routingTable, ftlv1connect.NewVerbServiceClient),
}

ingressHandler := otelhttp.NewHandler(http.Handler(svc), "ftl.ingress")
Expand Down Expand Up @@ -89,5 +92,5 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal"))
return
}
handleHTTP(start, state.schema, requestKey, routes, w, r, s.callClient)
handleHTTP(start, state.schema, requestKey, routes, w, r, s.routingTable.LookupClient)
}
7 changes: 4 additions & 3 deletions cmd/ftl-cron/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
_ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/observability"
"github.com/TBD54566975/ftl/internal/routing"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)
Expand Down Expand Up @@ -42,10 +43,10 @@ func main() {
err = observability.Init(ctx, false, "", "ftl-cron", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")

verbClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, cli.CronConfig.ControllerEndpoint.String(), log.Error)
schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.CronConfig.ControllerEndpoint.String(), log.Error)
schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.CronConfig.SchemaServiceEndpoint.String(), log.Error)
eventSource := schemaeventsource.New(ctx, schemaClient)

err = cron.Start(ctx, eventSource, verbClient)
rt := routing.New(ctx, schemaeventsource.New(ctx, schemaClient))
err = cron.Start(ctx, eventSource, rt)
kctx.FatalIfErrorf(err, "failed to start cron")
}
19 changes: 9 additions & 10 deletions cmd/ftl-http-ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
)

var cli struct {
Version kong.VersionFlag `help:"Show version."`
ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"`
LogConfig log.Config `embed:"" prefix:"log-"`
HTTPIngressConfig ingress.Config `embed:""`
ConfigFlag string `name:"config" short:"C" help:"Path to FTL project cf file." env:"FTL_CONFIG" placeholder:"FILE"`
ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
Version kong.VersionFlag `help:"Show version."`
ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"`
LogConfig log.Config `embed:"" prefix:"log-"`
HTTPIngressConfig ingress.Config `embed:""`
ConfigFlag string `name:"config" short:"C" help:"Path to FTL project cf file." env:"FTL_CONFIG" placeholder:"FILE"`
SchemaServerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
}

func main() {
Expand All @@ -44,10 +44,9 @@ func main() {
err = observability.Init(ctx, false, "", "ftl-cron", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")

verbClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, cli.ControllerEndpoint.String(), log.Error)
schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.ControllerEndpoint.String(), log.Error)
schemaEventSource := schemaeventsource.New(ctx, schemaClient)
schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.SchemaServerEndpoint.String(), log.Error)
eventSource := schemaeventsource.New(ctx, schemaClient)

err = ingress.Start(ctx, cli.HTTPIngressConfig, schemaEventSource, verbClient)
err = ingress.Start(ctx, cli.HTTPIngressConfig, eventSource)
kctx.FatalIfErrorf(err, "failed to start HTTP ingress")
}
6 changes: 4 additions & 2 deletions frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/routing"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
Expand Down Expand Up @@ -304,15 +305,16 @@ func (s *serveCommonConfig) run(

// Start Cron
wg.Go(func() error {
err := cron.Start(ctx, schemaEventSourceFactory(), verbClient)
rt := routing.New(ctx, schemaEventSourceFactory())
err := cron.Start(ctx, schemaEventSourceFactory(), rt)
if err != nil {
return fmt.Errorf("cron failed: %w", err)
}
return nil
})
// Start Ingress
wg.Go(func() error {
err := ingress.Start(ctx, s.Ingress, schemaEventSourceFactory(), verbClient)
err := ingress.Start(ctx, s.Ingress, schemaEventSourceFactory())
if err != nil {
return fmt.Errorf("ingress failed: %w", err)
}
Expand Down
11 changes: 8 additions & 3 deletions internal/routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@ func (r *RouteTable) run(ctx context.Context, changes schemaeventsource.EventSou
case <-ctx.Done():
return

case event := <-changes.Events():
case <-changes.Events():
old := r.routes.Load()
routes := extractRoutes(ctx, event.Schema())
//TODO: removal notifications
routes := extractRoutes(ctx, changes.View())
for module, rd := range old.moduleToDeployment {
if old.byDeployment[rd.String()] != routes.byDeployment[rd.String()] {
r.changeNotification.Publish(module)
}
}
for module, rd := range routes.moduleToDeployment {
// Check for new modules
if old.byDeployment[rd.String()] == nil {
r.changeNotification.Publish(module)
}
}
r.routes.Store(routes)
}
}
Expand Down
49 changes: 49 additions & 0 deletions internal/routing/routingclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package routing

import (
"context"

"github.com/alecthomas/types/optional"
"github.com/puzpuzpuz/xsync/v3"

"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/rpc"
)

// RouteClientManager managed clients for the routing service, so calls to a given module can be routed to the correct instance.
type RouteClientManager[T rpc.Pingable] struct {
routingTable *RouteTable
moduleClients *xsync.MapOf[string, optional.Option[T]]
factory rpc.ClientFactory[T]
}

func NewClientManager[T rpc.Pingable](ctx context.Context, routingTable *RouteTable, factory rpc.ClientFactory[T]) *RouteClientManager[T] {
svc := &RouteClientManager[T]{
routingTable: routingTable,
moduleClients: xsync.NewMapOf[string, optional.Option[T]](),
factory: factory,
}
routeUpdates := svc.routingTable.Subscribe()
go func() {
for {
select {
case <-ctx.Done():
return
case module := <-routeUpdates:
svc.moduleClients.Delete(module)
}
}
}()
return svc
}

func (s *RouteClientManager[T]) LookupClient(module string) optional.Option[T] {
res, _ := s.moduleClients.LoadOrCompute(module, func() optional.Option[T] {
route, ok := s.routingTable.Current().GetForModule(module).Get()
if !ok {
return optional.None[T]()
}
return optional.Some[T](rpc.Dial(s.factory, route.String(), log.Error))
})
return res
}

0 comments on commit fedc798

Please sign in to comment.