Skip to content

Commit

Permalink
feat: replace runners table with an event stream (#3662)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Dec 9, 2024
1 parent e3a1a35 commit 6cf0ce7
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 680 deletions.
92 changes: 40 additions & 52 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/state"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect"
ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1"
Expand All @@ -57,6 +58,7 @@ import (
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/deploymentcontext"
"github.com/TBD54566975/ftl/internal/dsn"
"github.com/TBD54566975/ftl/internal/eventstream"
"github.com/TBD54566975/ftl/internal/log"
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -215,8 +217,9 @@ type Service struct {
increaseReplicaFailures map[string]int
asyncCallsLock sync.Mutex

clientLock sync.Mutex
routeTable *routing.RouteTable
clientLock sync.Mutex
routeTable *routing.RouteTable
controllerState eventstream.EventStream[state.State]
}

func New(
Expand Down Expand Up @@ -257,6 +260,7 @@ func New(
increaseReplicaFailures: map[string]int{},
routeTable: routingTable,
storage: storage,
controllerState: state.NewInMemoryState(),
}

pubSub := pubsub.New(ctx, conn, optional.Some[pubsub.AsyncCallListener](svc))
Expand Down Expand Up @@ -317,32 +321,17 @@ func New(
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
processes, err := s.dal.GetProcessList(ctx)
if err != nil {
return nil, err
}
out, err := slices.MapErr(processes, func(p dal.Process) (*ftlv1.ProcessListResponse_Process, error) {
var runner *ftlv1.ProcessListResponse_ProcessRunner
if dbr, ok := p.Runner.Get(); ok {
labels, err := structpb.NewStruct(dbr.Labels)
if err != nil {
return nil, fmt.Errorf("could not marshal labels for runner %s: %w", dbr.Key, err)
}
runner = &ftlv1.ProcessListResponse_ProcessRunner{
Key: dbr.Key.String(),
Endpoint: dbr.Endpoint,
Labels: labels,
}
}
labels, err := structpb.NewStruct(p.Labels)
if err != nil {
return nil, fmt.Errorf("could not marshal labels for deployment %s: %w", p.Deployment, err)
currentState := s.controllerState.View()
runners := currentState.Runners()

out, err := slices.MapErr(runners, func(p state.Runner) (*ftlv1.ProcessListResponse_Process, error) {
runner := &ftlv1.ProcessListResponse_ProcessRunner{
Key: p.Key.String(),
Endpoint: p.Endpoint,
}
return &ftlv1.ProcessListResponse_Process{
Deployment: p.Deployment.String(),
MinReplicas: int32(p.MinReplicas),
Labels: labels,
Runner: runner,
Deployment: p.Deployment.String(),
Runner: runner,
}, nil
})
if err != nil {
Expand All @@ -352,7 +341,10 @@ func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.Pr
}

func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusRequest]) (*connect.Response[ftlv1.StatusResponse], error) {
status, err := s.dal.GetStatus(ctx, dalmodel.Controller{Key: s.key, Endpoint: s.config.Bind.String()})
controller := dalmodel.Controller{Key: s.key, Endpoint: s.config.Bind.String()}
currentState := s.controllerState.View()
runners := currentState.Runners()
status, err := s.dal.GetDeploymentStatus(ctx)
if err != nil {
return nil, fmt.Errorf("could not get status: %w", err)
}
Expand All @@ -371,25 +363,20 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
}
})
replicas := map[string]int32{}
protoRunners, err := slices.MapErr(status.Runners, func(r dalmodel.Runner) (*ftlv1.StatusResponse_Runner, error) {
protoRunners, err := slices.MapErr(runners, func(r state.Runner) (*ftlv1.StatusResponse_Runner, error) {
asString := r.Deployment.String()
deployment := &asString
replicas[asString]++
labels, err := structpb.NewStruct(r.Labels)
if err != nil {
return nil, fmt.Errorf("could not marshal attributes for runner %s: %w", r.Key, err)
}
return &ftlv1.StatusResponse_Runner{
Key: r.Key.String(),
Endpoint: r.Endpoint,
Deployment: deployment,
Labels: labels,
}, nil
})
if err != nil {
return nil, err
}
deployments, err := slices.MapErr(status.Deployments, func(d dalmodel.Deployment) (*ftlv1.StatusResponse_Deployment, error) {
deployments, err := slices.MapErr(status, func(d dalmodel.Deployment) (*ftlv1.StatusResponse_Deployment, error) {
labels, err := structpb.NewStruct(d.Labels)
if err != nil {
return nil, fmt.Errorf("could not marshal attributes for deployment %s: %w", d.Key.String(), err)
Expand All @@ -408,13 +395,11 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
return nil, err
}
resp := &ftlv1.StatusResponse{
Controllers: slices.Map(status.Controllers, func(c dalmodel.Controller) *ftlv1.StatusResponse_Controller {
return &ftlv1.StatusResponse_Controller{
Key: c.Key.String(),
Endpoint: c.Endpoint,
Version: ftl.Version,
}
}),
Controllers: []*ftlv1.StatusResponse_Controller{{
Key: controller.Key.String(),
Endpoint: controller.Endpoint,
Version: ftl.Version,
}},
Runners: protoRunners,
Deployments: deployments,
Routes: routes,
Expand Down Expand Up @@ -576,21 +561,19 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
err = s.dal.UpsertRunner(ctx, dalmodel.Runner{
// The created event does not matter if it is a new runner or not.
err = s.controllerState.Publish(&state.RunnerCreatedEvent{
Key: runnerKey,
Endpoint: msg.Endpoint,
Deployment: deploymentKey,
Labels: msg.Labels.AsMap(),
})
if errors.Is(err, libdal.ErrConflict) {
return nil, connect.NewError(connect.CodeAlreadyExists, err)
} else if err != nil {
if err != nil {
return nil, err
}
if !deferredDeregistration {
// Deregister the runner if the Runner disconnects.
defer func() {
err := s.dal.DeregisterRunner(context.Background(), runnerKey)
err := s.controllerState.Publish(&state.RunnerDeletedEvent{Key: runnerKey})
if err != nil {
logger.Errorf(err, "Could not deregister runner %s", runnerStr)
}
Expand Down Expand Up @@ -1149,11 +1132,16 @@ func (s *Service) clientsForEndpoint(endpoint string) clients {

func (s *Service) reapStaleRunners(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
count, err := s.dal.KillStaleRunners(context.Background(), s.config.RunnerTimeout)
if err != nil {
return 0, fmt.Errorf("failed to delete stale runners: %w", err)
} else if count > 0 {
logger.Debugf("Reaped %d stale runners", count)
cs := s.controllerState.View()

for _, runner := range cs.Runners() {
if runner.LastSeen.Add(s.config.RunnerTimeout).Before(time.Now()) {
runnerKey := runner.Key
logger.Debugf("Reaping stale runner %s", runnerKey)
if err := s.controllerState.Publish(&state.RunnerDeletedEvent{Key: runnerKey}); err != nil {
return 0, fmt.Errorf("failed to publish runner deleted event: %w", err)
}
}
}
return s.config.RunnerTimeout, nil
}
Expand Down
Loading

0 comments on commit 6cf0ce7

Please sign in to comment.