Skip to content

Commit

Permalink
wait for runners to be ready
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Dec 4, 2024
1 parent b621a89 commit a9649c0
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 202 deletions.
175 changes: 39 additions & 136 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"time"

"connectrpc.com/connect"
"github.com/alecthomas/atomic"
"github.com/alecthomas/kong"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
subscriptions "github.com/alecthomas/types/pubsub"
"github.com/jackc/pgx/v5"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -64,9 +62,11 @@ import (
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
internalobservability "github.com/TBD54566975/ftl/internal/observability"
"github.com/TBD54566975/ftl/internal/routing"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/rpc/headers"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
"github.com/TBD54566975/ftl/internal/sha256"
"github.com/TBD54566975/ftl/internal/slices"
status "github.com/TBD54566975/ftl/internal/terminal"
Expand Down Expand Up @@ -212,17 +212,15 @@ type Service struct {
// Map from runnerKey.String() to client.
clients *ttlcache.Cache[string, clients]

// Complete schema synchronised from the database.
schemaState atomic.Value[schemaState]
schemaSyncLock sync.Mutex

config Config

increaseReplicaFailures map[string]int
asyncCallsLock sync.Mutex

clientLock sync.Mutex
routeTableUpdated *subscriptions.Topic[struct{}]
clientLock sync.Mutex
routeTable *routing.RouteTable
}

func New(
Expand Down Expand Up @@ -253,6 +251,8 @@ func New(
ldb := dbleaser.NewDatabaseLeaser(conn)
scheduler := scheduledtask.New(ctx, key, ldb)

routingTable := routing.New(ctx, schemaeventsource.New(ctx, rpc.Dial[ftlv1connect.SchemaServiceClient](ftlv1connect.NewSchemaServiceClient, config.Bind.String(), log.Error)))

svc := &Service{
cm: cm,
sm: sm,
Expand All @@ -263,9 +263,8 @@ func New(
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
config: config,
increaseReplicaFailures: map[string]int{},
routeTableUpdated: subscriptions.New[struct{}](),
routeTable: routingTable,
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})

storage, err := artefacts.NewOCIRegistryStorage(config.Registry)
if err != nil {
Expand Down Expand Up @@ -323,7 +322,6 @@ func New(
}

// Parallel tasks.
parallelTask(svc.syncRoutesAndSchema, "sync-routes-and-schema", time.Second, time.Second, time.Second*5)
parallelTask(svc.heartbeatController, "controller-heartbeat", time.Second, time.Second*3, time.Second*5)
parallelTask(svc.updateControllersList, "update-controllers-list", time.Second, time.Second*5, time.Second*5)
parallelTask(svc.executeAsyncCalls, "execute-async-calls", time.Second, time.Second*5, time.Second*10)
Expand Down Expand Up @@ -381,12 +379,18 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
if err != nil {
return nil, fmt.Errorf("could not get status: %w", err)
}
sroutes := s.schemaState.Load().routes
routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) {
allModules := s.routeTable.Current()
routes := slices.Map(allModules.Schema().Modules, func(module *schema.Module) (out *ftlv1.StatusResponse_Route) {
key := ""
endpoint := ""
if module.Runtime != nil && module.Runtime.Deployment != nil {
key = module.Runtime.Deployment.DeploymentKey
endpoint = module.Runtime.Deployment.Endpoint
}
return &ftlv1.StatusResponse_Route{
Module: route.Module,
Deployment: route.Deployment.String(),
Endpoint: route.Endpoint,
Module: module.Name,
Deployment: key,
Endpoint: endpoint,
}
})
replicas := map[string]int32{}
Expand Down Expand Up @@ -620,7 +624,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
}()
deferredDeregistration = true
}
_, err = s.syncRoutesAndSchema(ctx)
if err != nil {
return nil, fmt.Errorf("could not sync routes: %w", err)
}
Expand Down Expand Up @@ -694,11 +697,11 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

routeView := s.routeTable.Current()
// It's not actually ready until it is in the routes table
routes := s.schemaState.Load().routes
var missing []string
for _, module := range s.config.WaitFor {
if _, ok := routes[module]; !ok {
if _, ok := routeView.GetForModule(module).Get(); !ok {
missing = append(missing, module)
}
}
Expand All @@ -714,8 +717,8 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request[ftldeployment.GetDeploymentContextRequest], resp *connect.ServerStream[ftldeployment.GetDeploymentContextResponse]) error {

logger := log.FromContext(ctx)
updates := s.routeTableUpdated.Subscribe(nil)
defer s.routeTableUpdated.Unsubscribe(updates)
updates := s.routeTable.Subscribe()
defer s.routeTable.Unsubscribe(updates)
depName := req.Msg.Deployment
if !strings.HasPrefix(depName, "dpl-") {
// For hot reload endponts we might not have a deployment key
Expand Down Expand Up @@ -766,12 +769,12 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
logger.Debugf("Checking for updated deployment context for: %s", key.String())
h := sha.New()

routeView := s.routeTable.Current()
configs, err := s.cm.MapForModule(ctx, module)
routeTable := map[string]string{}
routes := s.schemaState.Load().routes
for _, module := range callableModuleNames {
if route, ok := routes[module]; ok {
routeTable[module] = route.Endpoint
if route, ok := routeView.GetForModule(module).Get(); ok {
routeTable[module] = route.String()
}
}

Expand Down Expand Up @@ -895,12 +898,12 @@ func (s *Service) PublishEvent(ctx context.Context, req *connect.Request[ftldepl
}

// Add to timeline.
sstate := s.schemaState.Load()
module := req.Msg.Topic.Module
route, ok := sstate.routes[module]
routes := s.routeTable.Current()
route, ok := routes.GetDeployment(module).Get()
if ok {
s.timeline.EnqueueEvent(ctx, &timeline.PubSubPublish{
DeploymentKey: route.Deployment,
DeploymentKey: route,
RequestKey: requestKey,
Time: now,
SourceVerb: schema.Ref{Name: req.Msg.Caller, Module: req.Msg.Topic.Module},
Expand Down Expand Up @@ -937,8 +940,8 @@ func (s *Service) callWithRequest(
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("body is required"))
}

sstate := s.schemaState.Load()
sch := sstate.schema
routes := s.routeTable.Current()
sch := routes.Schema()

verbRef := schema.RefFromProto(req.Msg.Verb)
verb := &schema.Verb{}
Expand All @@ -965,7 +968,7 @@ func (s *Service) callWithRequest(
}

module := verbRef.Module
route, ok := sstate.routes[module]
route, ok := routes.GetForModule(module).Get()
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
Expand Down Expand Up @@ -999,7 +1002,7 @@ func (s *Service) callWithRequest(
}

callEvent := &timeline.Call{
DeploymentKey: route.Deployment,
DeploymentKey: routes.GetDeployment(module).Default(model.NewDeploymentKey("unkown")),

Check failure on line 1005 in backend/controller/controller.go

View workflow job for this annotation

GitHub Actions / Lint

`unkown` is a misspelling of `unknown` (misspell)
RequestKey: requestKey,
ParentRequestKey: parentKey,
StartTime: start,
Expand All @@ -1024,7 +1027,7 @@ func (s *Service) callWithRequest(
return nil, err
}

client := s.clientsForEndpoint(route.Endpoint)
client := s.clientsForEndpoint(route.String())

if pk, ok := parentKey.Get(); ok {
ctx = rpc.WithParentRequestKey(ctx, pk)
Expand All @@ -1042,7 +1045,7 @@ func (s *Service) callWithRequest(
} else {
callEvent.Response = either.RightOf[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
logger.Errorf(err, "Call failed to verb %s for deployment %s", verbRef.String(), route.Deployment)
logger.Errorf(err, "Call failed to verb %s for module %s", verbRef.String(), module)
}

s.timeline.EnqueueEvent(ctx, callEvent)
Expand Down Expand Up @@ -1231,11 +1234,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
logger.Tracef("Acquiring async call")

now := time.Now().UTC()
sstate := s.schemaState.Load()
sstate := s.routeTable.Current()

enqueueTimelineEvent := func(call *dal.AsyncCall, err optional.Option[error]) {
module := call.Verb.Module
route, ok := sstate.routes[module]
deployment, ok := sstate.GetDeployment(module).Get()
if ok {
eventType := timeline.AsyncExecuteEventTypeUnkown
switch call.Origin.(type) {
Expand All @@ -1253,7 +1256,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
errStr = optional.Some(e.Error())
}
s.timeline.EnqueueEvent(ctx, &timeline.AsyncExecute{
DeploymentKey: route.Deployment,
DeploymentKey: deployment,
RequestKey: call.ParentRequestKey,
EventType: eventType,
Verb: *call.Verb.ToRef(),
Expand Down Expand Up @@ -1375,7 +1378,8 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *
}
logger.Debugf("Catching async call %s with %s", call.Verb, catchVerb)

sch := s.schemaState.Load().schema
routeView := s.routeTable.Current()
sch := routeView.Schema()

verb := &schema.Verb{}
if err := sch.ResolveToType(call.Verb.ToRef(), verb); err != nil {
Expand Down Expand Up @@ -1685,92 +1689,6 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
return log.FromContext(ctx).AddSink(s.deploymentLogsSink).Attrs(attrs)
}

// Periodically sync the routing table and schema from the DB.
// We do this in a single function so the routing table and schema are always consistent
// And they both need the same info from the DB
func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, err error) {
s.schemaSyncLock.Lock() // This can result in confusing log messages if it is called concurrently
defer s.schemaSyncLock.Unlock()
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
deployments = []dalmodel.Deployment{}
} else if err != nil {
return 0, err
}
tx, err := s.dal.Begin(ctx)
if err != nil {
return 0, fmt.Errorf("failed to start transaction %w", err)
}
defer tx.CommitOrRollback(ctx, &err)

old := s.schemaState.Load().routes
newRoutes := map[string]Route{}
modulesByName := map[string]*schema.Module{}
changed := false

builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert
modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins)
if err != nil {
return 0, fmt.Errorf("failed to convert builtins to schema: %w", err)
}
for _, v := range deployments {
deploymentLogger := s.getDeploymentLogger(ctx, v.Key)
deploymentLogger.Tracef("processing deployment %s for route table", v.Key.String())
// Deployments are in order, oldest to newest
// If we see a newer one overwrite an old one that means the new one is read
// And we set its replicas to zero
// It may seem a bit odd to do this here but this is where we are actually updating the routing table
// Which is what makes as a deployment 'live' from a clients POV
if v.Schema.Runtime == nil || v.Schema.Runtime.Deployment == nil {
deploymentLogger.Debugf("Deployment %s has no runtime metadata", v.Key.String())
continue
}
targetEndpoint := v.Schema.Runtime.Deployment.Endpoint
if targetEndpoint == "" {
deploymentLogger.Debugf("Failed to get updated endpoint for deployment %s", v.Key.String())
continue
}
// Check if this is a new route
if oldRoute, oldRouteExists := old[v.Module]; !oldRouteExists || oldRoute.Deployment.String() != v.Key.String() {
// If it is a new route we only add it if we can ping it
// Kube deployments can take a while to come up, so we don't want to add them to the routing table until they are ready.
_, err := s.clientsForEndpoint(targetEndpoint).verb.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err != nil {
deploymentLogger.Tracef("Unable to ping %s, not adding to route table", v.Key.String())
continue
}
deploymentLogger.Infof("Deployed %s", v.Key.String())
changed = true
status.UpdateModuleState(ctx, v.Module, status.BuildStateDeployed)
}
if prev, ok := newRoutes[v.Module]; ok {
// We have already seen a route for this module, the existing route must be an old one
// as the deployments are in order
// We have a new route ready to go, so we can just set the old one to 0 replicas
// Do this in a TX so it doesn't happen until the route table is updated
deploymentLogger.Debugf("Setting %s to zero replicas", prev.Deployment)
err := tx.SetDeploymentReplicas(ctx, prev.Deployment, 0)
if err != nil {
deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
}
changed = true
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
}

orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes})
if changed {
s.routeTableUpdated.Publish(struct{}{})
}
return time.Second, nil
}

func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)

Expand Down Expand Up @@ -1817,18 +1735,3 @@ func validateCallBody(body []byte, verb *schema.Verb, sch *schema.Schema) error
}
return nil
}

type Route struct {
Module string
Deployment model.DeploymentKey
Endpoint string
}

type schemaState struct {
schema *schema.Schema
routes map[string]Route
}

func (r Route) String() string {
return fmt.Sprintf("%s -> %s", r.Deployment, r.Endpoint)
}
19 changes: 19 additions & 0 deletions backend/provisioner/runner_scaling_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package provisioner
import (
"context"
"fmt"
"time"

"connectrpc.com/connect"
_ "github.com/go-sql-driver/mysql"
Expand Down Expand Up @@ -60,6 +61,24 @@ func provisionRunner(scaling scaling.RunnerScaling) InMemResourceProvisionerFn {
}
ep := endpoint.MustGet()
endpointURI := ep.String()

runnerClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, endpointURI, log.Error)
// TODO: a proper timeout
timeout := time.After(1 * time.Minute)
for {
_, err := runnerClient.Ping(ctx, connect.NewRequest(&ftlv1.PingRequest{}))
if err == nil {
break
}
select {
case <-ctx.Done():
return nil, fmt.Errorf("context cancelled %w", ctx.Err())
case <-timeout:
return nil, fmt.Errorf("timed out waiting for runner to be ready")
case <-time.After(time.Millisecond * 100):
}
}

runner.Runner.Output = &provisioner.RunnerResource_RunnerResourceOutput{
RunnerUri: endpointURI,
DeploymentKey: deployment,
Expand Down
Loading

0 comments on commit a9649c0

Please sign in to comment.