From 6cf0ce708aa189f26ef6738c7f67455d6504cfa9 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Mon, 9 Dec 2024 13:34:19 +1100 Subject: [PATCH] feat: replace runners table with an event stream (#3662) --- backend/controller/controller.go | 92 +++--- backend/controller/dal/dal.go | 190 +----------- backend/controller/dal/dal_test.go | 55 ---- .../controller/dal/internal/sql/querier.go | 8 - .../controller/dal/internal/sql/queries.sql | 77 +---- .../dal/internal/sql/queries.sql.go | 286 +----------------- backend/controller/dal/model/model.go | 1 - .../pubsub/internal/sql/queries.sql | 7 - .../pubsub/internal/sql/queries.sql.go | 7 - .../controller/sql/databasetesting/devel.go | 3 - .../schema/20241206045126_remove_runners.sql | 5 + backend/controller/state/controllerstate.go | 112 +++++++ .../controller/state/controllerstate_test.go | 54 ++++ internal/eventstream/eventstream.go | 68 +++++ 14 files changed, 285 insertions(+), 680 deletions(-) create mode 100644 backend/controller/sql/schema/20241206045126_remove_runners.sql create mode 100644 backend/controller/state/controllerstate.go create mode 100644 backend/controller/state/controllerstate_test.go create mode 100644 internal/eventstream/eventstream.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index c53b49ddbc..75420a8bc9 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -42,6 +42,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/pubsub" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" + "github.com/TBD54566975/ftl/backend/controller/state" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/console/v1/pbconsoleconnect" ftldeployment "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/deployment/v1" @@ -57,6 +58,7 @@ import ( cf "github.com/TBD54566975/ftl/internal/configuration/manager" "github.com/TBD54566975/ftl/internal/deploymentcontext" "github.com/TBD54566975/ftl/internal/dsn" + "github.com/TBD54566975/ftl/internal/eventstream" "github.com/TBD54566975/ftl/internal/log" ftlmaps "github.com/TBD54566975/ftl/internal/maps" "github.com/TBD54566975/ftl/internal/model" @@ -215,8 +217,9 @@ type Service struct { increaseReplicaFailures map[string]int asyncCallsLock sync.Mutex - clientLock sync.Mutex - routeTable *routing.RouteTable + clientLock sync.Mutex + routeTable *routing.RouteTable + controllerState eventstream.EventStream[state.State] } func New( @@ -257,6 +260,7 @@ func New( increaseReplicaFailures: map[string]int{}, routeTable: routingTable, storage: storage, + controllerState: state.NewInMemoryState(), } pubSub := pubsub.New(ctx, conn, optional.Some[pubsub.AsyncCallListener](svc)) @@ -317,32 +321,17 @@ func New( } func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) { - processes, err := s.dal.GetProcessList(ctx) - if err != nil { - return nil, err - } - out, err := slices.MapErr(processes, func(p dal.Process) (*ftlv1.ProcessListResponse_Process, error) { - var runner *ftlv1.ProcessListResponse_ProcessRunner - if dbr, ok := p.Runner.Get(); ok { - labels, err := structpb.NewStruct(dbr.Labels) - if err != nil { - return nil, fmt.Errorf("could not marshal labels for runner %s: %w", dbr.Key, err) - } - runner = &ftlv1.ProcessListResponse_ProcessRunner{ - Key: dbr.Key.String(), - Endpoint: dbr.Endpoint, - Labels: labels, - } - } - labels, err := structpb.NewStruct(p.Labels) - if err != nil { - return nil, fmt.Errorf("could not marshal labels for deployment %s: %w", p.Deployment, err) + currentState := s.controllerState.View() + runners := currentState.Runners() + + out, err := slices.MapErr(runners, func(p state.Runner) (*ftlv1.ProcessListResponse_Process, error) { + runner := &ftlv1.ProcessListResponse_ProcessRunner{ + Key: p.Key.String(), + Endpoint: p.Endpoint, } return &ftlv1.ProcessListResponse_Process{ - Deployment: p.Deployment.String(), - MinReplicas: int32(p.MinReplicas), - Labels: labels, - Runner: runner, + Deployment: p.Deployment.String(), + Runner: runner, }, nil }) if err != nil { @@ -352,7 +341,10 @@ func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.Pr } func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusRequest]) (*connect.Response[ftlv1.StatusResponse], error) { - status, err := s.dal.GetStatus(ctx, dalmodel.Controller{Key: s.key, Endpoint: s.config.Bind.String()}) + controller := dalmodel.Controller{Key: s.key, Endpoint: s.config.Bind.String()} + currentState := s.controllerState.View() + runners := currentState.Runners() + status, err := s.dal.GetDeploymentStatus(ctx) if err != nil { return nil, fmt.Errorf("could not get status: %w", err) } @@ -371,25 +363,20 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR } }) replicas := map[string]int32{} - protoRunners, err := slices.MapErr(status.Runners, func(r dalmodel.Runner) (*ftlv1.StatusResponse_Runner, error) { + protoRunners, err := slices.MapErr(runners, func(r state.Runner) (*ftlv1.StatusResponse_Runner, error) { asString := r.Deployment.String() deployment := &asString replicas[asString]++ - labels, err := structpb.NewStruct(r.Labels) - if err != nil { - return nil, fmt.Errorf("could not marshal attributes for runner %s: %w", r.Key, err) - } return &ftlv1.StatusResponse_Runner{ Key: r.Key.String(), Endpoint: r.Endpoint, Deployment: deployment, - Labels: labels, }, nil }) if err != nil { return nil, err } - deployments, err := slices.MapErr(status.Deployments, func(d dalmodel.Deployment) (*ftlv1.StatusResponse_Deployment, error) { + deployments, err := slices.MapErr(status, func(d dalmodel.Deployment) (*ftlv1.StatusResponse_Deployment, error) { labels, err := structpb.NewStruct(d.Labels) if err != nil { return nil, fmt.Errorf("could not marshal attributes for deployment %s: %w", d.Key.String(), err) @@ -408,13 +395,11 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR return nil, err } resp := &ftlv1.StatusResponse{ - Controllers: slices.Map(status.Controllers, func(c dalmodel.Controller) *ftlv1.StatusResponse_Controller { - return &ftlv1.StatusResponse_Controller{ - Key: c.Key.String(), - Endpoint: c.Endpoint, - Version: ftl.Version, - } - }), + Controllers: []*ftlv1.StatusResponse_Controller{{ + Key: controller.Key.String(), + Endpoint: controller.Endpoint, + Version: ftl.Version, + }}, Runners: protoRunners, Deployments: deployments, Routes: routes, @@ -576,21 +561,19 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) } - err = s.dal.UpsertRunner(ctx, dalmodel.Runner{ + // The created event does not matter if it is a new runner or not. + err = s.controllerState.Publish(&state.RunnerCreatedEvent{ Key: runnerKey, Endpoint: msg.Endpoint, Deployment: deploymentKey, - Labels: msg.Labels.AsMap(), }) - if errors.Is(err, libdal.ErrConflict) { - return nil, connect.NewError(connect.CodeAlreadyExists, err) - } else if err != nil { + if err != nil { return nil, err } if !deferredDeregistration { // Deregister the runner if the Runner disconnects. defer func() { - err := s.dal.DeregisterRunner(context.Background(), runnerKey) + err := s.controllerState.Publish(&state.RunnerDeletedEvent{Key: runnerKey}) if err != nil { logger.Errorf(err, "Could not deregister runner %s", runnerStr) } @@ -1149,11 +1132,16 @@ func (s *Service) clientsForEndpoint(endpoint string) clients { func (s *Service) reapStaleRunners(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) - count, err := s.dal.KillStaleRunners(context.Background(), s.config.RunnerTimeout) - if err != nil { - return 0, fmt.Errorf("failed to delete stale runners: %w", err) - } else if count > 0 { - logger.Debugf("Reaped %d stale runners", count) + cs := s.controllerState.View() + + for _, runner := range cs.Runners() { + if runner.LastSeen.Add(s.config.RunnerTimeout).Before(time.Now()) { + runnerKey := runner.Key + logger.Debugf("Reaping stale runner %s", runnerKey) + if err := s.controllerState.Publish(&state.RunnerDeletedEvent{Key: runnerKey}); err != nil { + return 0, fmt.Errorf("failed to publish runner deleted event: %w", err) + } + } } return s.config.RunnerTimeout, nil } diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index bd043bef37..8e77a1175d 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "time" "github.com/alecthomas/types/optional" inprocesspubsub "github.com/alecthomas/types/pubsub" @@ -16,7 +15,6 @@ import ( dalsql "github.com/TBD54566975/ftl/backend/controller/dal/internal/sql" dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model" "github.com/TBD54566975/ftl/backend/controller/pubsub" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/timeline" "github.com/TBD54566975/ftl/internal/log" @@ -27,27 +25,6 @@ import ( "github.com/TBD54566975/ftl/internal/slices" ) -func runnerFromDB(row dalsql.GetRunnerRow) dalmodel.Runner { - attrs := model.Labels{} - if err := json.Unmarshal(row.Labels, &attrs); err != nil { - return dalmodel.Runner{} - } - - return dalmodel.Runner{ - Key: row.RunnerKey, - Endpoint: row.Endpoint, - Deployment: row.DeploymentKey, - Labels: attrs, - } -} - -// A Reservation of a Runner. -type Reservation interface { - Runner() dalmodel.Runner - Commit(ctx context.Context) error - Rollback(ctx context.Context) error -} - func New(ctx context.Context, conn libdal.Connection, pubsub *pubsub.Service, registry aregistry.Service) *DAL { var d *DAL db := dalsql.New(conn) @@ -80,14 +57,10 @@ type DAL struct { DeploymentChanges *inprocesspubsub.Topic[DeploymentNotification] } -func (d *DAL) GetStatus(ctx context.Context, controller dalmodel.Controller) (dalmodel.Status, error) { - runners, err := d.db.GetActiveRunners(ctx) - if err != nil { - return dalmodel.Status{}, fmt.Errorf("could not get active runners: %w", libdal.TranslatePGError(err)) - } +func (d *DAL) GetDeploymentStatus(ctx context.Context) ([]dalmodel.Deployment, error) { deployments, err := d.db.GetActiveDeployments(ctx) if err != nil { - return dalmodel.Status{}, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err)) + return nil, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err)) } statusDeployments, err := slices.MapErr(deployments, func(in dalsql.GetActiveDeploymentsRow) (dalmodel.Deployment, error) { labels := model.Labels{} @@ -105,51 +78,10 @@ func (d *DAL) GetStatus(ctx context.Context, controller dalmodel.Controller) (da }, nil }) if err != nil { - return dalmodel.Status{}, fmt.Errorf("could not parse deployments: %w", err) + return nil, fmt.Errorf("could not parse deployments: %w", err) } - domainRunners, err := slices.MapErr(runners, func(in dalsql.GetActiveRunnersRow) (dalmodel.Runner, error) { - attrs := model.Labels{} - if err := json.Unmarshal(in.Labels, &attrs); err != nil { - return dalmodel.Runner{}, fmt.Errorf("invalid attributes JSON for runner %s: %w", in.RunnerKey, err) - } - return dalmodel.Runner{ - Key: in.RunnerKey, - Endpoint: in.Endpoint, - Deployment: in.DeploymentKey, - Labels: attrs, - }, nil - }) - if err != nil { - return dalmodel.Status{}, fmt.Errorf("could not parse runners: %w", err) - } - return dalmodel.Status{ - Controllers: []dalmodel.Controller{controller}, - Deployments: statusDeployments, - Runners: domainRunners, - }, nil -} - -func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.DeploymentKey) ([]dalmodel.Runner, error) { - runners := []dalmodel.Runner{} - rows, err := d.db.GetRunnersForDeployment(ctx, deployment) - if err != nil { - return nil, libdal.TranslatePGError(err) - } - for _, row := range rows { - attrs := model.Labels{} - if err := json.Unmarshal(row.Labels, &attrs); err != nil { - return nil, fmt.Errorf("invalid attributes JSON for runner %d: %w", row.ID, err) - } - - runners = append(runners, dalmodel.Runner{ - Key: row.Key, - Endpoint: row.Endpoint, - Deployment: deployment, - Labels: attrs, - }) - } - return runners, nil + return statusDeployments, nil } func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err error) { @@ -251,68 +183,6 @@ func (d *DAL) GetDeployment(ctx context.Context, key model.DeploymentKey) (*mode return d.loadDeployment(ctx, deployment) } -// UpsertRunner registers or updates a new runner. -// -// ErrConflict will be returned if a runner with the same endpoint and a -// different key already exists. -func (d *DAL) UpsertRunner(ctx context.Context, runner dalmodel.Runner) error { - attrBytes, err := json.Marshal(runner.Labels) - if err != nil { - return fmt.Errorf("failed to JSON encode runner labels: %w", err) - } - deploymentID, err := d.db.UpsertRunner(ctx, dalsql.UpsertRunnerParams{ - Key: runner.Key, - Endpoint: runner.Endpoint, - DeploymentKey: runner.Deployment, - Labels: attrBytes, - }) - if err != nil { - return libdal.TranslatePGError(err) - } - if deploymentID < 0 { - return fmt.Errorf("deployment %s not found", runner.Deployment) - } - return nil -} - -// KillStaleRunners deletes runners that have not had heartbeats for the given duration. -func (d *DAL) KillStaleRunners(ctx context.Context, age time.Duration) (int64, error) { - count, err := d.db.KillStaleRunners(ctx, sqltypes.Duration(age)) - return count, err -} - -// DeregisterRunner deregisters the given runner. -func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error { - count, err := d.db.DeregisterRunner(ctx, key) - if err != nil { - return libdal.TranslatePGError(err) - } - if count == 0 { - return libdal.ErrNotFound - } - return nil -} - -var _ Reservation = (*postgresClaim)(nil) - -type postgresClaim struct { - tx *DAL - runner dalmodel.Runner - cancel context.CancelFunc -} - -func (p *postgresClaim) Commit(ctx context.Context) error { - defer p.cancel() - return libdal.TranslatePGError(p.tx.Commit(ctx)) -} - -func (p *postgresClaim) Rollback(ctx context.Context) error { - defer p.cancel() - return libdal.TranslatePGError(p.tx.Rollback(ctx)) -} - -func (p *postgresClaim) Runner() dalmodel.Runner { return p.runner } - // SetDeploymentReplicas activates the given deployment. func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int) (err error) { // Start the transaction @@ -461,7 +331,6 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment, Module: in.ModuleName, Language: in.Language, MinReplicas: int(in.Deployment.MinReplicas), - Replicas: optional.Some(int(in.Replicas)), Schema: in.Deployment.Schema, CreatedAt: in.Deployment.CreatedAt, } @@ -557,47 +426,6 @@ type Process struct { Runner optional.Option[ProcessRunner] } -// GetProcessList returns a list of all "processes". -func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error) { - rows, err := d.db.GetProcessList(ctx) - if err != nil { - return nil, libdal.TranslatePGError(err) - } - return slices.MapErr(rows, func(row dalsql.GetProcessListRow) (Process, error) { //nolint:wrapcheck - var runner optional.Option[ProcessRunner] - if endpoint, ok := row.Endpoint.Get(); ok { - var labels model.Labels - if err := json.Unmarshal(row.RunnerLabels.RawMessage, &labels); err != nil { - return Process{}, fmt.Errorf("invalid labels JSON for runner %s: %w", row.RunnerKey, err) - } - - runner = optional.Some(ProcessRunner{ - Key: row.RunnerKey.MustGet(), - Endpoint: endpoint, - Labels: labels, - }) - } - var labels model.Labels - if err := json.Unmarshal(row.DeploymentLabels, &labels); err != nil { - return Process{}, fmt.Errorf("invalid labels JSON for deployment %s: %w", row.DeploymentKey, err) - } - return Process{ - Deployment: row.DeploymentKey, - Labels: labels, - MinReplicas: int(row.MinReplicas), - Runner: runner, - }, nil - }) -} - -func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (dalmodel.Runner, error) { - row, err := d.db.GetRunner(ctx, runnerKey) - if err != nil { - return dalmodel.Runner{}, libdal.TranslatePGError(err) - } - return runnerFromDB(row), nil -} - func (d *DAL) loadDeployment(ctx context.Context, deployment dalsql.GetDeploymentRow) (*model.Deployment, error) { out := &model.Deployment{ Module: deployment.ModuleName, @@ -619,16 +447,6 @@ func (d *DAL) loadDeployment(ctx context.Context, deployment dalsql.GetDeploymen return out, nil } -func (d *DAL) GetActiveRunners(ctx context.Context) ([]dalmodel.Runner, error) { - rows, err := d.db.GetActiveRunners(ctx) - if err != nil { - return nil, libdal.TranslatePGError(err) - } - return slices.Map(rows, func(row dalsql.GetActiveRunnersRow) dalmodel.Runner { - return runnerFromDB(dalsql.GetRunnerRow(row)) - }), nil -} - // Check if a deployment exists that exactly matches the given artefacts and schema. func (*DAL) checkForExistingDeployments(ctx context.Context, tx *DAL, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact) (model.DeploymentKey, error) { schemaBytes, err := schema.ModuleToBytes(moduleSchema) diff --git a/backend/controller/dal/dal_test.go b/backend/controller/dal/dal_test.go index 1870a214c3..7d17ff30f5 100644 --- a/backend/controller/dal/dal_test.go +++ b/backend/controller/dal/dal_test.go @@ -106,66 +106,11 @@ func TestDAL(t *testing.T) { assert.Equal(t, []sha256.SHA256{misshingSHA}, missing) }) - runnerID := model.NewRunnerKey("localhost", "8080") - labels := map[string]any{} - - t.Run("RegisterRunner", func(t *testing.T) { - err = dal.UpsertRunner(ctx, dalmodel.Runner{ - Key: runnerID, - Labels: labels, - Endpoint: "http://localhost:8080", - Deployment: deploymentKey, - }) - assert.NoError(t, err) - }) - t.Run("SetDeploymentReplicas", func(t *testing.T) { err := dal.SetDeploymentReplicas(ctx, deploymentKey, 1) assert.NoError(t, err) }) - t.Run("UpdateRunnerAssigned", func(t *testing.T) { - err := dal.UpsertRunner(ctx, dalmodel.Runner{ - Key: runnerID, - Labels: labels, - Endpoint: "http://localhost:8080", - Deployment: deploymentKey, - }) - assert.NoError(t, err) - }) - - t.Run("GetRunnersForDeployment", func(t *testing.T) { - runners, err := dal.GetRunnersForDeployment(ctx, deploymentKey) - assert.NoError(t, err) - assert.Equal(t, []dalmodel.Runner{{ - Key: runnerID, - Labels: labels, - Endpoint: "http://localhost:8080", - Deployment: deploymentKey, - }}, runners) - }) - - t.Run("UpdateRunnerInvalidDeployment", func(t *testing.T) { - err := dal.UpsertRunner(ctx, dalmodel.Runner{ - Key: runnerID, - Labels: labels, - Endpoint: "http://localhost:8080", - Deployment: model.NewDeploymentKey("test"), - }) - assert.Error(t, err) - assert.IsError(t, err, libdal.ErrConstraint) - }) - - t.Run("DeregisterRunner", func(t *testing.T) { - err = dal.DeregisterRunner(ctx, runnerID) - assert.NoError(t, err) - }) - - t.Run("DeregisterRunnerFailsOnMissing", func(t *testing.T) { - err = dal.DeregisterRunner(ctx, model.NewRunnerKey("localhost", "8080")) - assert.IsError(t, err, libdal.ErrNotFound) - }) - t.Run("VerifyDeploymentNotifications", func(t *testing.T) { t.Skip("Skipping this test since we're not using the deployment notification system") dal.DeploymentChanges.Unsubscribe(deploymentChangesCh) diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index 553fe48330..a9fa323ab1 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -25,12 +25,10 @@ type Querier interface { CreateDeployment(ctx context.Context, moduleName string, schema *schema.Module, key model.DeploymentKey) error DeleteSubscribers(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriberKey, error) DeleteSubscriptions(ctx context.Context, deployment model.DeploymentKey) ([]model.SubscriptionKey, error) - DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) FailAsyncCall(ctx context.Context, error string, iD int64) (bool, error) FailAsyncCallWithRetry(ctx context.Context, arg FailAsyncCallWithRetryParams) (bool, error) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDeploymentSchemasRow, error) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) - GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error) // Return the digests that exist in the database. GetArtefactDigests(ctx context.Context, digests [][]byte) ([][]byte, error) GetDeployment(ctx context.Context, key model.DeploymentKey) (GetDeploymentRow, error) @@ -43,10 +41,7 @@ type Querier interface { GetExistingDeploymentForModule(ctx context.Context, name string) (GetExistingDeploymentForModuleRow, error) GetModulesByID(ctx context.Context, ids []int64) ([]Module, error) GetNextEventForSubscription(ctx context.Context, consumptionDelay sqltypes.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) - GetProcessList(ctx context.Context) ([]GetProcessListRow, error) GetRandomSubscriber(ctx context.Context, key model.SubscriptionKey) (GetRandomSubscriberRow, error) - GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error) - GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) GetSchemaForDeployment(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) GetSubscription(ctx context.Context, column1 string, column2 string) (TopicSubscription, error) // Results may not be ready to be scheduled yet due to event consumption delay @@ -58,7 +53,6 @@ type Querier interface { GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent, error) GetZombieAsyncCalls(ctx context.Context, limit int32) ([]AsyncCall, error) InsertSubscriber(ctx context.Context, arg InsertSubscriberParams) error - KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error SetDeploymentDesiredReplicas(ctx context.Context, key model.DeploymentKey, minReplicas int32) error @@ -69,8 +63,6 @@ type Querier interface { // UpdateDeploymentSchema(ctx context.Context, schema *schema.Module, key model.DeploymentKey) error UpsertModule(ctx context.Context, language string, name string) (int64, error) - // Upsert a runner and return the deployment ID that it is assigned to, if any. - UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (int64, error) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error } diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index d7039f94f7..31f4441232 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -62,56 +62,10 @@ WHERE EXISTS (SELECT 1 HAVING COUNT(*) = @count::BIGINT -- Number of unique digests provided ); --- name: UpsertRunner :one --- Upsert a runner and return the deployment ID that it is assigned to, if any. -WITH deployment_rel AS ( - SELECT id FROM deployments d - WHERE d.key = sqlc.arg('deployment_key')::deployment_key - LIMIT 1) -INSERT -INTO runners (key, endpoint, labels, deployment_id, last_seen) -VALUES ($1, - $2, - $3, - (SELECT id FROM deployment_rel), - NOW() AT TIME ZONE 'utc') -ON CONFLICT (key) DO UPDATE SET endpoint = $2, - labels = $3, - last_seen = NOW() AT TIME ZONE 'utc' -RETURNING deployment_id; - --- name: KillStaleRunners :one -WITH matches AS ( - DELETE FROM runners - WHERE last_seen < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL - RETURNING 1) -SELECT COUNT(*) -FROM matches; - --- name: DeregisterRunner :one -WITH matches AS ( - DELETE FROM runners - WHERE key = sqlc.arg('key')::runner_key - RETURNING 1) -SELECT COUNT(*) -FROM matches; - --- name: GetActiveRunners :many -SELECT DISTINCT ON (r.key) r.key AS runner_key, - r.endpoint, - r.labels, - r.last_seen, - r.module_name, - d.key AS deployment_key -FROM runners r - INNER JOIN deployments d on d.id = r.deployment_id -ORDER BY r.key; - -- name: GetActiveDeployments :many -SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas +SELECT sqlc.embed(d), m.name AS module_name, m.language FROM deployments d JOIN modules m ON d.module_id = m.id - LEFT JOIN runners r ON d.id = r.deployment_id WHERE min_replicas > 0 GROUP BY d.id, m.name, m.language ORDER BY d.last_activated_at; @@ -129,18 +83,6 @@ SELECT key, schema FROM deployments WHERE min_replicas > 0; -- name: GetSchemaForDeployment :one SELECT schema FROM deployments WHERE key = sqlc.arg('key')::deployment_key; --- name: GetProcessList :many -SELECT d.min_replicas, - d.key AS deployment_key, - d.labels deployment_labels, - r.key AS runner_key, - r.endpoint, - r.labels AS runner_labels -FROM deployments d - LEFT JOIN runners r on d.id = r.deployment_id -WHERE d.min_replicas > 0 -ORDER BY d.key; - -- name: SetDeploymentDesiredReplicas :exec UPDATE deployments SET min_replicas = $2, last_activated_at = CASE WHEN min_replicas = 0 THEN (NOW() AT TIME ZONE 'utc') ELSE last_activated_at END @@ -155,23 +97,6 @@ WHERE m.name = $1 AND min_replicas > 0 LIMIT 1; --- name: GetRunner :one -SELECT DISTINCT ON (r.key) r.key AS runner_key, - r.endpoint, - r.labels, - r.last_seen, - r.module_name, - d.key AS deployment_key -FROM runners r - INNER JOIN deployments d on d.id = r.deployment_id -WHERE r.key = sqlc.arg('key')::runner_key; - --- name: GetRunnersForDeployment :many -SELECT * -FROM runners r - INNER JOIN deployments d on r.deployment_id = d.id -WHERE d.key = sqlc.arg('key')::deployment_key; - -- name: SucceedAsyncCall :one UPDATE async_calls SET diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index 14fbf39572..8f38355a6f 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -223,22 +223,6 @@ func (q *Queries) DeleteSubscriptions(ctx context.Context, deployment model.Depl return items, nil } -const deregisterRunner = `-- name: DeregisterRunner :one -WITH matches AS ( - DELETE FROM runners - WHERE key = $1::runner_key - RETURNING 1) -SELECT COUNT(*) -FROM matches -` - -func (q *Queries) DeregisterRunner(ctx context.Context, key model.RunnerKey) (int64, error) { - row := q.db.QueryRowContext(ctx, deregisterRunner, key) - var count int64 - err := row.Scan(&count) - return count, err -} - const failAsyncCall = `-- name: FailAsyncCall :one UPDATE async_calls SET @@ -350,10 +334,9 @@ func (q *Queries) GetActiveDeploymentSchemas(ctx context.Context) ([]GetActiveDe } const getActiveDeployments = `-- name: GetActiveDeployments :many -SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, d.last_activated_at, m.name AS module_name, m.language, COUNT(r.id) AS replicas +SELECT d.id, d.created_at, d.module_id, d.key, d.schema, d.labels, d.min_replicas, d.last_activated_at, m.name AS module_name, m.language FROM deployments d JOIN modules m ON d.module_id = m.id - LEFT JOIN runners r ON d.id = r.deployment_id WHERE min_replicas > 0 GROUP BY d.id, m.name, m.language ORDER BY d.last_activated_at @@ -363,7 +346,6 @@ type GetActiveDeploymentsRow struct { Deployment Deployment ModuleName string Language string - Replicas int64 } func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeploymentsRow, error) { @@ -386,58 +368,6 @@ func (q *Queries) GetActiveDeployments(ctx context.Context) ([]GetActiveDeployme &i.Deployment.LastActivatedAt, &i.ModuleName, &i.Language, - &i.Replicas, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - -const getActiveRunners = `-- name: GetActiveRunners :many -SELECT DISTINCT ON (r.key) r.key AS runner_key, - r.endpoint, - r.labels, - r.last_seen, - r.module_name, - d.key AS deployment_key -FROM runners r - INNER JOIN deployments d on d.id = r.deployment_id -ORDER BY r.key -` - -type GetActiveRunnersRow struct { - RunnerKey model.RunnerKey - Endpoint string - Labels json.RawMessage - LastSeen time.Time - ModuleName optional.Option[string] - DeploymentKey model.DeploymentKey -} - -func (q *Queries) GetActiveRunners(ctx context.Context) ([]GetActiveRunnersRow, error) { - rows, err := q.db.QueryContext(ctx, getActiveRunners) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetActiveRunnersRow - for rows.Next() { - var i GetActiveRunnersRow - if err := rows.Scan( - &i.RunnerKey, - &i.Endpoint, - &i.Labels, - &i.LastSeen, - &i.ModuleName, - &i.DeploymentKey, ); err != nil { return nil, err } @@ -815,58 +745,6 @@ func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDe return i, err } -const getProcessList = `-- name: GetProcessList :many -SELECT d.min_replicas, - d.key AS deployment_key, - d.labels deployment_labels, - r.key AS runner_key, - r.endpoint, - r.labels AS runner_labels -FROM deployments d - LEFT JOIN runners r on d.id = r.deployment_id -WHERE d.min_replicas > 0 -ORDER BY d.key -` - -type GetProcessListRow struct { - MinReplicas int32 - DeploymentKey model.DeploymentKey - DeploymentLabels json.RawMessage - RunnerKey optional.Option[model.RunnerKey] - Endpoint optional.Option[string] - RunnerLabels pqtype.NullRawMessage -} - -func (q *Queries) GetProcessList(ctx context.Context) ([]GetProcessListRow, error) { - rows, err := q.db.QueryContext(ctx, getProcessList) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetProcessListRow - for rows.Next() { - var i GetProcessListRow - if err := rows.Scan( - &i.MinReplicas, - &i.DeploymentKey, - &i.DeploymentLabels, - &i.RunnerKey, - &i.Endpoint, - &i.RunnerLabels, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const getRandomSubscriber = `-- name: GetRandomSubscriber :one SELECT subscribers.sink as sink, @@ -906,107 +784,6 @@ func (q *Queries) GetRandomSubscriber(ctx context.Context, key model.Subscriptio return i, err } -const getRunner = `-- name: GetRunner :one -SELECT DISTINCT ON (r.key) r.key AS runner_key, - r.endpoint, - r.labels, - r.last_seen, - r.module_name, - d.key AS deployment_key -FROM runners r - INNER JOIN deployments d on d.id = r.deployment_id -WHERE r.key = $1::runner_key -` - -type GetRunnerRow struct { - RunnerKey model.RunnerKey - Endpoint string - Labels json.RawMessage - LastSeen time.Time - ModuleName optional.Option[string] - DeploymentKey model.DeploymentKey -} - -func (q *Queries) GetRunner(ctx context.Context, key model.RunnerKey) (GetRunnerRow, error) { - row := q.db.QueryRowContext(ctx, getRunner, key) - var i GetRunnerRow - err := row.Scan( - &i.RunnerKey, - &i.Endpoint, - &i.Labels, - &i.LastSeen, - &i.ModuleName, - &i.DeploymentKey, - ) - return i, err -} - -const getRunnersForDeployment = `-- name: GetRunnersForDeployment :many -SELECT r.id, r.key, created, last_seen, endpoint, module_name, deployment_id, r.labels, d.id, created_at, module_id, d.key, schema, d.labels, min_replicas, last_activated_at -FROM runners r - INNER JOIN deployments d on r.deployment_id = d.id -WHERE d.key = $1::deployment_key -` - -type GetRunnersForDeploymentRow struct { - ID int64 - Key model.RunnerKey - Created time.Time - LastSeen time.Time - Endpoint string - ModuleName optional.Option[string] - DeploymentID int64 - Labels json.RawMessage - ID_2 int64 - CreatedAt time.Time - ModuleID int64 - Key_2 model.DeploymentKey - Schema *schema.Module - Labels_2 json.RawMessage - MinReplicas int32 - LastActivatedAt time.Time -} - -func (q *Queries) GetRunnersForDeployment(ctx context.Context, key model.DeploymentKey) ([]GetRunnersForDeploymentRow, error) { - rows, err := q.db.QueryContext(ctx, getRunnersForDeployment, key) - if err != nil { - return nil, err - } - defer rows.Close() - var items []GetRunnersForDeploymentRow - for rows.Next() { - var i GetRunnersForDeploymentRow - if err := rows.Scan( - &i.ID, - &i.Key, - &i.Created, - &i.LastSeen, - &i.Endpoint, - &i.ModuleName, - &i.DeploymentID, - &i.Labels, - &i.ID_2, - &i.CreatedAt, - &i.ModuleID, - &i.Key_2, - &i.Schema, - &i.Labels_2, - &i.MinReplicas, - &i.LastActivatedAt, - ); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const getSchemaForDeployment = `-- name: GetSchemaForDeployment :one SELECT schema FROM deployments WHERE key = $1::deployment_key ` @@ -1048,12 +825,6 @@ func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 s } const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many -WITH runner_count AS ( - SELECT count(r.deployment_id) as runner_count, - r.deployment_id as deployment - FROM runners r - GROUP BY deployment -) SELECT subs.key::subscription_key as key, curser.key as cursor, @@ -1062,7 +833,6 @@ SELECT deployments.key as deployment_key, curser.request_key as request_key FROM topic_subscriptions subs - JOIN runner_count on subs.deployment_id = runner_count.deployment JOIN deployments ON subs.deployment_id = deployments.id LEFT JOIN topics ON subs.topic_id = topics.id LEFT JOIN topic_events curser ON subs.cursor = curser.id @@ -1263,22 +1033,6 @@ func (q *Queries) InsertSubscriber(ctx context.Context, arg InsertSubscriberPara return err } -const killStaleRunners = `-- name: KillStaleRunners :one -WITH matches AS ( - DELETE FROM runners - WHERE last_seen < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL - RETURNING 1) -SELECT COUNT(*) -FROM matches -` - -func (q *Queries) KillStaleRunners(ctx context.Context, timeout sqltypes.Duration) (int64, error) { - row := q.db.QueryRowContext(ctx, killStaleRunners, timeout) - var count int64 - err := row.Scan(&count) - return count, err -} - const loadAsyncCall = `-- name: LoadAsyncCall :one SELECT id, created_at, verb, state, origin, scheduled_at, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context, request FROM async_calls @@ -1430,44 +1184,6 @@ func (q *Queries) UpsertModule(ctx context.Context, language string, name string return id, err } -const upsertRunner = `-- name: UpsertRunner :one -WITH deployment_rel AS ( - SELECT id FROM deployments d - WHERE d.key = $4::deployment_key - LIMIT 1) -INSERT -INTO runners (key, endpoint, labels, deployment_id, last_seen) -VALUES ($1, - $2, - $3, - (SELECT id FROM deployment_rel), - NOW() AT TIME ZONE 'utc') -ON CONFLICT (key) DO UPDATE SET endpoint = $2, - labels = $3, - last_seen = NOW() AT TIME ZONE 'utc' -RETURNING deployment_id -` - -type UpsertRunnerParams struct { - Key model.RunnerKey - Endpoint string - Labels json.RawMessage - DeploymentKey model.DeploymentKey -} - -// Upsert a runner and return the deployment ID that it is assigned to, if any. -func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (int64, error) { - row := q.db.QueryRowContext(ctx, upsertRunner, - arg.Key, - arg.Endpoint, - arg.Labels, - arg.DeploymentKey, - ) - var deployment_id int64 - err := row.Scan(&deployment_id) - return deployment_id, err -} - const upsertSubscription = `-- name: UpsertSubscription :one INSERT INTO topic_subscriptions ( key, diff --git a/backend/controller/dal/model/model.go b/backend/controller/dal/model/model.go index e3e1ec8eb8..6f7cf47d77 100644 --- a/backend/controller/dal/model/model.go +++ b/backend/controller/dal/model/model.go @@ -63,7 +63,6 @@ type Deployment struct { Language string Module string MinReplicas int - Replicas optional.Option[int] // Depending on the query this may or may not be populated. Schema *schema.Module CreatedAt time.Time Labels model.Labels diff --git a/backend/controller/pubsub/internal/sql/queries.sql b/backend/controller/pubsub/internal/sql/queries.sql index 68e94abc16..4de0dd2890 100644 --- a/backend/controller/pubsub/internal/sql/queries.sql +++ b/backend/controller/pubsub/internal/sql/queries.sql @@ -117,12 +117,6 @@ VALUES ( -- Sorting ensures that brand new events (that may not be ready for consumption) -- don't prevent older events from being consumed -- We also make sure that the subscription belongs to a deployment that has at least one runner -WITH runner_count AS ( - SELECT count(r.deployment_id) as runner_count, - r.deployment_id as deployment - FROM runners r - GROUP BY deployment -) SELECT subs.key::subscription_key as key, curser.key as cursor, @@ -131,7 +125,6 @@ SELECT deployments.key as deployment_key, curser.request_key as request_key FROM topic_subscriptions subs - JOIN runner_count on subs.deployment_id = runner_count.deployment JOIN deployments ON subs.deployment_id = deployments.id LEFT JOIN topics ON subs.topic_id = topics.id LEFT JOIN topic_events curser ON subs.cursor = curser.id diff --git a/backend/controller/pubsub/internal/sql/queries.sql.go b/backend/controller/pubsub/internal/sql/queries.sql.go index a66d0ae1db..557d841e3d 100644 --- a/backend/controller/pubsub/internal/sql/queries.sql.go +++ b/backend/controller/pubsub/internal/sql/queries.sql.go @@ -233,12 +233,6 @@ func (q *Queries) GetSubscription(ctx context.Context, column1 string, column2 s } const getSubscriptionsNeedingUpdate = `-- name: GetSubscriptionsNeedingUpdate :many -WITH runner_count AS ( - SELECT count(r.deployment_id) as runner_count, - r.deployment_id as deployment - FROM runners r - GROUP BY deployment -) SELECT subs.key::subscription_key as key, curser.key as cursor, @@ -247,7 +241,6 @@ SELECT deployments.key as deployment_key, curser.request_key as request_key FROM topic_subscriptions subs - JOIN runner_count on subs.deployment_id = runner_count.deployment JOIN deployments ON subs.deployment_id = deployments.id LEFT JOIN topics ON subs.topic_id = topics.id LEFT JOIN topic_events curser ON subs.cursor = curser.id diff --git a/backend/controller/sql/databasetesting/devel.go b/backend/controller/sql/databasetesting/devel.go index e2f04774c9..71d5942c60 100644 --- a/backend/controller/sql/databasetesting/devel.go +++ b/backend/controller/sql/databasetesting/devel.go @@ -87,9 +87,6 @@ func CreateForDevel(ctx context.Context, dsn string, recreate bool) (*stdsql.DB, WITH deleted AS ( DELETE FROM async_calls RETURNING 1 - ), deleted_runners AS ( - DELETE FROM runners - RETURNING 1 ) SELECT COUNT(*) `) diff --git a/backend/controller/sql/schema/20241206045126_remove_runners.sql b/backend/controller/sql/schema/20241206045126_remove_runners.sql new file mode 100644 index 0000000000..898141cbd9 --- /dev/null +++ b/backend/controller/sql/schema/20241206045126_remove_runners.sql @@ -0,0 +1,5 @@ +-- migrate:up +DROP TABLE runners; + +-- migrate:down + diff --git a/backend/controller/state/controllerstate.go b/backend/controller/state/controllerstate.go new file mode 100644 index 0000000000..f6e927dea7 --- /dev/null +++ b/backend/controller/state/controllerstate.go @@ -0,0 +1,112 @@ +package state + +import ( + "fmt" + "time" + + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/internal/eventstream" + "github.com/TBD54566975/ftl/internal/model" +) + +type State struct { + runners map[string]*Runner + runnersByDeployment map[string][]*Runner +} + +func NewInMemoryState() eventstream.EventStream[State] { + return eventstream.NewInMemory(State{ + runners: map[string]*Runner{}, + runnersByDeployment: map[string][]*Runner{}, + }) +} + +func (r *State) Runner(s string) optional.Option[Runner] { + result, ok := r.runners[s] + if ok { + return optional.Ptr(result) + } + return optional.None[Runner]() +} + +func (r *State) Runners() []Runner { + var ret []Runner + for _, v := range r.runners { + ret = append(ret, *v) + } + return ret +} + +func (r *State) RunnersForDeployment(deployment string) []Runner { + var ret []Runner + for _, v := range r.runnersByDeployment[deployment] { + ret = append(ret, *v) + } + return ret +} + +type Runner struct { + Key model.RunnerKey + Create time.Time + LastSeen time.Time + Endpoint string + Module string + Deployment model.DeploymentKey +} + +var _ eventstream.Event[State] = (*RunnerCreatedEvent)(nil) +var _ eventstream.Event[State] = (*RunnerHeartbeatEvent)(nil) +var _ eventstream.Event[State] = (*RunnerDeletedEvent)(nil) + +type RunnerCreatedEvent struct { + Key model.RunnerKey + Create time.Time + Endpoint string + Module string + Deployment model.DeploymentKey +} + +func (r *RunnerCreatedEvent) Handle(t State) (State, error) { + if existing := t.runners[r.Key.String()]; existing != nil { + return t, nil + } + n := Runner{ + Key: r.Key, + Create: r.Create, + LastSeen: r.Create, + Endpoint: r.Endpoint, + Module: r.Module, + Deployment: r.Deployment, + } + t.runners[r.Key.String()] = &n + t.runnersByDeployment[r.Deployment.String()] = append(t.runnersByDeployment[r.Deployment.String()], &n) + return t, nil +} + +type RunnerHeartbeatEvent struct { + Key model.RunnerKey + LastSeen time.Time +} + +func (r *RunnerHeartbeatEvent) Handle(t State) (State, error) { + existing := t.runners[r.Key.String()] + if existing == nil { + return t, fmt.Errorf("runner %s not found", r.Key) + } + existing.LastSeen = r.LastSeen + return t, nil +} + +type RunnerDeletedEvent struct { + Key model.RunnerKey +} + +func (r RunnerDeletedEvent) Handle(t State) (State, error) { + existing := t.runners[r.Key.String()] + if existing != nil { + delete(t.runners, r.Key.String()) + + } + return t, nil +} diff --git a/backend/controller/state/controllerstate_test.go b/backend/controller/state/controllerstate_test.go new file mode 100644 index 0000000000..fc06a4d685 --- /dev/null +++ b/backend/controller/state/controllerstate_test.go @@ -0,0 +1,54 @@ +package state_test + +import ( + "testing" + "time" + + "github.com/alecthomas/assert/v2" + + "github.com/TBD54566975/ftl/backend/controller/state" + "github.com/TBD54566975/ftl/internal/model" +) + +func TestRunnerState(t *testing.T) { + cs := state.NewInMemoryState() + view := cs.View() + assert.Equal(t, 0, len(view.Runners())) + key := model.NewLocalRunnerKey(1) + create := time.Now() + endpoint := "http://localhost:8080" + module := "test" + deploymentKey := model.NewDeploymentKey(module) + err := cs.Publish(&state.RunnerCreatedEvent{ + Key: key, + Create: create, + Endpoint: endpoint, + Module: module, + Deployment: deploymentKey, + }) + assert.NoError(t, err) + view = cs.View() + assert.Equal(t, 1, len(view.Runners())) + assert.Equal(t, key, view.Runners()[0].Key) + assert.Equal(t, create, view.Runners()[0].Create) + assert.Equal(t, create, view.Runners()[0].LastSeen) + assert.Equal(t, endpoint, view.Runners()[0].Endpoint) + assert.Equal(t, module, view.Runners()[0].Module) + assert.Equal(t, deploymentKey, view.Runners()[0].Deployment) + seen := time.Now() + err = cs.Publish(&state.RunnerHeartbeatEvent{ + Key: key, + LastSeen: seen, + }) + assert.NoError(t, err) + view = cs.View() + assert.Equal(t, seen, view.Runners()[0].LastSeen) + + err = cs.Publish(&state.RunnerDeletedEvent{ + Key: key, + }) + assert.NoError(t, err) + view = cs.View() + assert.Equal(t, 0, len(view.Runners())) + +} diff --git a/internal/eventstream/eventstream.go b/internal/eventstream/eventstream.go new file mode 100644 index 0000000000..758ce61b93 --- /dev/null +++ b/internal/eventstream/eventstream.go @@ -0,0 +1,68 @@ +package eventstream + +import ( + "fmt" + "sync" + + "github.com/alecthomas/types/pubsub" + + "github.com/TBD54566975/ftl/internal/reflect" +) + +// EventStream is a stream of events that can be published and subscribed to, that update a materialized view +type EventStream[View any] interface { + Publish(event Event[View]) error + + View() View + + Subscribe() <-chan Event[View] +} + +// StreamView is a view of an event stream that can be subscribed to, without modifying the stream. +type StreamView[View any] interface { + View() View + + // Subscribe to the event stream. The channel will only receive events that are published after the subscription. + Subscribe() <-chan Event[View] +} + +type Event[View any] interface { + + // Handle applies the event to the view + Handle(view View) (View, error) +} + +func NewInMemory[View any](initial View) EventStream[View] { + return &inMemoryEventStream[View]{ + view: initial, + topic: pubsub.New[Event[View]](), + } +} + +type inMemoryEventStream[View any] struct { + view View + lock sync.Mutex + topic *pubsub.Topic[Event[View]] +} + +func (i *inMemoryEventStream[T]) Publish(e Event[T]) error { + i.lock.Lock() + defer i.lock.Unlock() + + newView, err := e.Handle(reflect.DeepCopy(i.view)) + if err != nil { + return fmt.Errorf("failed to handle event: %w", err) + } + i.view = newView + i.topic.Publish(e) + return nil +} + +func (i *inMemoryEventStream[T]) View() T { + return i.view +} + +func (i *inMemoryEventStream[T]) Subscribe() <-chan Event[T] { + ret := i.topic.Subscribe(nil) + return ret +}