Skip to content

Commit

Permalink
feat: Schema changes for using schema as provisioning input (#3608)
Browse files Browse the repository at this point in the history
Commits should describe what the intent is, hopefully.
  • Loading branch information
jvmakine authored Dec 3, 2024
1 parent 5e0d18c commit 0d6f2bf
Show file tree
Hide file tree
Showing 15 changed files with 97 additions and 54 deletions.
5 changes: 3 additions & 2 deletions backend/provisioner/controller_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
)

// NewControllerProvisioner creates a new provisioner that uses the FTL controller to provision modules
func NewControllerProvisioner(client ftlv1connect.ControllerServiceClient) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypeModule: func(ctx context.Context, rc *provisioner.ResourceContext, module, _ string, previous *provisioner.Resource) (*provisioner.Resource, error) {
return NewEmbeddedProvisioner(map[schema.ResourceType]InMemResourceProvisionerFn{
schema.ResourceTypeModule: func(ctx context.Context, rc *provisioner.ResourceContext, module, _ string, previous *provisioner.Resource) (*provisioner.Resource, error) {
mod, ok := rc.Resource.Resource.(*provisioner.Resource_Module)
if !ok {
panic(fmt.Errorf("unexpected resource type: %T", rc.Resource.Resource))
Expand Down
9 changes: 5 additions & 4 deletions backend/provisioner/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/provisioner"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
)

// MockProvisioner is a mock implementation of the Provisioner interface
Expand Down Expand Up @@ -89,8 +90,8 @@ func TestDeployment_Progress(t *testing.T) {
mock := &MockProvisioner{}

registry := provisioner.ProvisionerRegistry{}
registry.Register("mock", mock, provisioner.ResourceTypePostgres)
registry.Register("mock", mock, provisioner.ResourceTypeMysql)
registry.Register("mock", mock, schema.ResourceTypePostgres)
registry.Register("mock", mock, schema.ResourceTypeMysql)

graph := &provisioner.ResourceGraph{}
graph.AddNode(&proto.Resource{ResourceId: "a", Resource: &proto.Resource_Mysql{}})
Expand Down Expand Up @@ -165,8 +166,8 @@ func TestDeployment_Progress(t *testing.T) {
}

registry := provisioner.ProvisionerRegistry{}
registry.Register("mockdb", dbMock, provisioner.ResourceTypePostgres)
registry.Register("mockmod", moduleMock, provisioner.ResourceTypeModule)
registry.Register("mockdb", dbMock, schema.ResourceTypePostgres)
registry.Register("mockmod", moduleMock, schema.ResourceTypeModule)

// Check that the deployment finishes without errors
graph := &provisioner.ResourceGraph{}
Expand Down
11 changes: 6 additions & 5 deletions backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/TBD54566975/ftl/internal/dev"
"github.com/TBD54566975/ftl/internal/dsn"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/strcase"
)

Expand All @@ -24,11 +25,11 @@ var pubSubNameLimit = 249 // 255 (filename limit) - 6 (partition id)

// NewDevProvisioner creates a new provisioner that provisions resources locally when running FTL in dev mode
func NewDevProvisioner(postgresPort int, mysqlPort int, recreate bool) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypePostgres: provisionPostgres(postgresPort, recreate),
ResourceTypeMysql: provisionMysql(mysqlPort, recreate),
ResourceTypeTopic: provisionTopic(),
ResourceTypeSubscription: provisionSubscription(),
return NewEmbeddedProvisioner(map[schema.ResourceType]InMemResourceProvisionerFn{
schema.ResourceTypePostgres: provisionPostgres(postgresPort, recreate),
schema.ResourceTypeMysql: provisionMysql(mysqlPort, recreate),
schema.ResourceTypeTopic: provisionTopic(),
schema.ResourceTypeSubscription: provisionSubscription(),
})
}
func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
Expand Down
5 changes: 3 additions & 2 deletions backend/provisioner/inmem_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
provisionerconnect "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/provisioner/v1beta1/provisionerpbconnect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema"
)

type inMemProvisioningTask struct {
Expand Down Expand Up @@ -48,10 +49,10 @@ type InMemResourceProvisionerFn func(context.Context, *provisioner.ResourceConte
// finishes the task when all resources are provisioned or an error occurs.
type InMemProvisioner struct {
running *xsync.MapOf[string, *inMemProvisioningTask]
handlers map[ResourceType]InMemResourceProvisionerFn
handlers map[schema.ResourceType]InMemResourceProvisionerFn
}

func NewEmbeddedProvisioner(handlers map[ResourceType]InMemResourceProvisionerFn) *InMemProvisioner {
func NewEmbeddedProvisioner(handlers map[schema.ResourceType]InMemResourceProvisionerFn) *InMemProvisioner {
return &InMemProvisioner{
running: xsync.NewMapOf[string, *inMemProvisioningTask](),
handlers: handlers,
Expand Down
12 changes: 6 additions & 6 deletions backend/provisioner/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ type provisionerPluginConfig struct {
// The default provisioner to use for all resources not matched here
Default string `toml:"default"`
Plugins []struct {
ID string `toml:"id"`
Resources []ResourceType `toml:"resources"`
ID string `toml:"id"`
Resources []schema.ResourceType `toml:"resources"`
} `toml:"plugins"`
}

func (cfg *provisionerPluginConfig) Validate() error {
registeredResources := map[ResourceType]bool{}
registeredResources := map[schema.ResourceType]bool{}
for _, plugin := range cfg.Plugins {
for _, r := range plugin.Resources {
if registeredResources[r] {
Expand All @@ -49,7 +49,7 @@ func (cfg *provisionerPluginConfig) Validate() error {
type ProvisionerBinding struct {
Provisioner provisionerconnect.ProvisionerPluginServiceClient
ID string
Types []ResourceType
Types []schema.ResourceType
}

func (p ProvisionerBinding) String() string {
Expand Down Expand Up @@ -116,7 +116,7 @@ func provisionerIDToProvisioner(ctx context.Context, id string, controller ftlv1
}

// Register to the registry, to be executed after all the previously added handlers
func (reg *ProvisionerRegistry) Register(id string, handler provisionerconnect.ProvisionerPluginServiceClient, types ...ResourceType) *ProvisionerBinding {
func (reg *ProvisionerRegistry) Register(id string, handler provisionerconnect.ProvisionerPluginServiceClient, types ...schema.ResourceType) *ProvisionerBinding {
binding := &ProvisionerBinding{
Provisioner: handler,
Types: types,
Expand Down Expand Up @@ -154,7 +154,7 @@ func (reg *ProvisionerRegistry) CreateDeployment(ctx context.Context, module str
return deployment
}

func getTypes(resources []*provisioner.Resource, types []ResourceType) []*provisioner.Resource {
func getTypes(resources []*provisioner.Resource, types []schema.ResourceType) []*provisioner.Resource {
result := []*provisioner.Resource{}
for _, r := range resources {
for _, t := range types {
Expand Down
33 changes: 10 additions & 23 deletions backend/provisioner/resource_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,27 @@ package provisioner

import (
provisioner "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/provisioner/v1beta1"
)

// ResourceType is a type of resource used to configure provisioners
type ResourceType string

const (
ResourceTypeUnknown ResourceType = "unknown"
ResourceTypePostgres ResourceType = "postgres"
ResourceTypeMysql ResourceType = "mysql"
ResourceTypeModule ResourceType = "module"
ResourceTypeSQLMigration ResourceType = "sql-migration"
ResourceTypeTopic ResourceType = "topic"
ResourceTypeSubscription ResourceType = "subscription"
ResourceTypeRunner ResourceType = "runner"
"github.com/TBD54566975/ftl/internal/schema"
)

// TypeOf returns the resource type of the given resource
func TypeOf(r *provisioner.Resource) ResourceType {
func TypeOf(r *provisioner.Resource) schema.ResourceType {
switch r.Resource.(type) {
case *provisioner.Resource_Module:
return ResourceTypeModule
return schema.ResourceTypeModule
case *provisioner.Resource_Mysql:
return ResourceTypeMysql
return schema.ResourceTypeMysql
case *provisioner.Resource_Postgres:
return ResourceTypePostgres
return schema.ResourceTypePostgres
case *provisioner.Resource_SqlMigration:
return ResourceTypeSQLMigration
return schema.ResourceTypeSQLMigration
case *provisioner.Resource_Topic:
return ResourceTypeTopic
return schema.ResourceTypeTopic
case *provisioner.Resource_Subscription:
return ResourceTypeSubscription
return schema.ResourceTypeSubscription
case *provisioner.Resource_Runner:
return ResourceTypeRunner
return schema.ResourceTypeRunner
default:
return ResourceTypeUnknown
return schema.ResourceTypeUnknown
}
}
4 changes: 2 additions & 2 deletions backend/provisioner/runner_scaling_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (

// NewRunnerScalingProvisioner creates a new provisioner that provisions resources locally when running FTL in dev mode
func NewRunnerScalingProvisioner(runners scaling.RunnerScaling, client ftlv1connect.ControllerServiceClient) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypeRunner: provisionRunner(runners, client),
return NewEmbeddedProvisioner(map[schema.ResourceType]InMemResourceProvisionerFn{
schema.ResourceTypeRunner: provisionRunner(runners, client),
})
}

Expand Down
4 changes: 2 additions & 2 deletions backend/provisioner/sql_migration_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ const tenMB = 1024 * 1024 * 10

// NewSQLMigrationProvisioner creates a new provisioner that provisions database migrations
func NewSQLMigrationProvisioner(registryConfig artefacts.RegistryConfig) *InMemProvisioner {
return NewEmbeddedProvisioner(map[ResourceType]InMemResourceProvisionerFn{
ResourceTypeSQLMigration: provisionSQLMigration(registryConfig),
return NewEmbeddedProvisioner(map[schema.ResourceType]InMemResourceProvisionerFn{
schema.ResourceTypeSQLMigration: provisionSQLMigration(registryConfig),
})
}

Expand Down
17 changes: 9 additions & 8 deletions frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/TBD54566975/ftl/internal/observability"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

Expand Down Expand Up @@ -257,27 +258,27 @@ func (s *serveCommonConfig) run(
Bindings: []*provisioner.ProvisionerBinding{
{
Provisioner: provisioner.NewDevProvisioner(s.DBPort, s.MysqlPort, s.Recreate),
Types: []provisioner.ResourceType{
provisioner.ResourceTypeMysql,
provisioner.ResourceTypePostgres,
provisioner.ResourceTypeTopic,
provisioner.ResourceTypeSubscription,
Types: []schema.ResourceType{
schema.ResourceTypeMysql,
schema.ResourceTypePostgres,
schema.ResourceTypeTopic,
schema.ResourceTypeSubscription,
},
ID: "dev",
},
{
Provisioner: provisioner.NewSQLMigrationProvisioner(registry),
Types: []provisioner.ResourceType{provisioner.ResourceTypeSQLMigration},
Types: []schema.ResourceType{schema.ResourceTypeSQLMigration},
ID: "migration",
},
{
Provisioner: provisioner.NewControllerProvisioner(controllerClient),
Types: []provisioner.ResourceType{provisioner.ResourceTypeModule},
Types: []schema.ResourceType{schema.ResourceTypeModule},
ID: "controller",
},
{
Provisioner: provisioner.NewRunnerScalingProvisioner(runnerScaling, controllerClient),
Types: []provisioner.ResourceType{provisioner.ResourceTypeRunner},
Types: []schema.ResourceType{schema.ResourceTypeRunner},
ID: "runner",
},
},
Expand Down
2 changes: 2 additions & 0 deletions internal/schema/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,10 @@ type DatabaseRuntime struct {
WriteConnector DatabaseConnector `parser:"" protobuf:"2"`
}

var _ Runtime = (*DatabaseRuntime)(nil)
var _ Symbol = (*DatabaseRuntime)(nil)

func (d *DatabaseRuntime) runtime() {}
func (d *DatabaseRuntime) Position() Position { return d.ReadConnector.Position() }
func (d *DatabaseRuntime) schemaSymbol() {}
func (d *DatabaseRuntime) String() string {
Expand Down
4 changes: 4 additions & 0 deletions internal/schema/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type ModuleRuntime struct {
Image string `protobuf:"6,optional"`
}

var _ Runtime = (*ModuleRuntime)(nil)

func (m *ModuleRuntime) runtime() {}

type Module struct {
Pos Position `parser:"" protobuf:"1,optional"`

Expand Down
31 changes: 31 additions & 0 deletions internal/schema/provisioned.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package schema

// ResourceType is a type of resource used to configure provisioners
type ResourceType string

const (
ResourceTypeUnknown ResourceType = "unknown"
ResourceTypePostgres ResourceType = "postgres"
ResourceTypeMysql ResourceType = "mysql"
ResourceTypeModule ResourceType = "module"
ResourceTypeSQLMigration ResourceType = "sql-migration"
ResourceTypeTopic ResourceType = "topic"
ResourceTypeSubscription ResourceType = "subscription"
ResourceTypeRunner ResourceType = "runner"
)

type ProvisionedResource struct {
// Kind is the kind of resource provisioned.
Kind string
// ID of the provisioned resource.
ID string
// Config is the subset of the schema element's configuration that is used to create the resource.
// changes to this config are used to check if the resource needs to be updated.
Config any
}

// Provisioned is a schema element that provisioner acts on to create a runtime resource.
type Provisioned interface {
// Returns the resources provisioned from this schema element.
GetProvisioned() []*ProvisionedResource
}
6 changes: 6 additions & 0 deletions internal/schema/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package schema

// Runtime is data that is populated at runtime by an FTL subsystem.
type Runtime interface {
runtime()
}
4 changes: 4 additions & 0 deletions internal/schema/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type TopicRuntime struct {
TopicID string `parser:"" protobuf:"2"`
}

var _ Runtime = (*TopicRuntime)(nil)

func (t *TopicRuntime) runtime() {}

func (t *TopicRuntime) ToProto() *schemapb.TopicRuntime {
return &schemapb.TopicRuntime{
KafkaBrokers: t.KafkaBrokers,
Expand Down
4 changes: 4 additions & 0 deletions internal/schema/verb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ type VerbRuntime struct {
KafkaBrokers []string `protobuf:"3,optional"`
}

var _ Runtime = (*VerbRuntime)(nil)

func (v *VerbRuntime) runtime() {}

//protobuf:2
type Verb struct {
Pos Position `parser:"" protobuf:"1,optional"`
Expand Down

0 comments on commit 0d6f2bf

Please sign in to comment.