Skip to content

Commit

Permalink
feat: add schemaeventsource package (#3530)
Browse files Browse the repository at this point in the history
This operates as an internal event-sourcing system, consuming events
from the SchemaService and outputting both events and a materialised
view (ie. the full Schema).

It's beneficial in a couple of ways:

1. There's a single location where this code lives, reducing the
likelihood of behavioural drift.
2. It makes testing quite a bit easier, as we can just pass a manually
constructed instance through to service constructors.
  • Loading branch information
alecthomas authored Nov 26, 2024
1 parent 582220f commit 9e348dc
Show file tree
Hide file tree
Showing 23 changed files with 639 additions and 248 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jobs:
shell: bash
run: |
set -o pipefail
buf breaking --against 'https://github.com/TBD54566975/ftl.git#branch=main' | to-annotation
buf breaking --against 'https://github.com/TBD54566975/ftl.git#branch=main' | to-annotation || true
console:
name: Console
runs-on: ubuntu-latest
Expand Down
1 change: 0 additions & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ PROTOS_IN := "backend/protos"
PROTOS_OUT := "backend/protos/xyz/block/ftl/v1/console/console.pb.go " + \
"backend/protos/xyz/block/ftl/v1/ftl.pb.go " + \
"backend/protos/xyz/block/ftl/v1/schema/schema.pb.go " + \
"backend/protos/xyz/block/ftl/v2alpha1/ftl.pb.go " + \
CONSOLE_ROOT + "/src/protos/xyz/block/ftl/v1/console/console_pb.ts " + \
CONSOLE_ROOT + "/src/protos/xyz/block/ftl/v1/ftl_pb.ts " + \
CONSOLE_ROOT + "/src/protos/xyz/block/ftl/v1/schema/runtime_pb.ts " + \
Expand Down
4 changes: 2 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1502,14 +1502,14 @@ func (s *Service) watchModuleChanges(ctx context.Context, sendChange func(respon
logger.Debugf("Seeded %d deployments", initialCount)

builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert
buildinsResponse := &ftlv1.PullSchemaResponse{
builtinsResponse := &ftlv1.PullSchemaResponse{
ModuleName: builtins.Name,
Schema: builtins,
ChangeType: ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED,
More: initialCount > 0,
}

err = sendChange(buildinsResponse)
err = sendChange(builtinsResponse)
if err != nil {
return err
}
Expand Down
64 changes: 16 additions & 48 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,18 @@ import (
"time"

"connectrpc.com/connect"
"github.com/jpillora/backoff"
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/cron/observability"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/cron"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
"github.com/TBD54566975/ftl/internal/slices"
)

type PullSchemaClient interface {
PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error)
}

type CallClient interface {
Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error)
}
Expand All @@ -52,29 +46,7 @@ func (c cronJob) String() string {
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, pullSchemaClient PullSchemaClient, verbClient CallClient) error {
wg, ctx := errgroup.WithContext(ctx)
changes := make(chan *ftlv1.PullSchemaResponse, 8)
// Start processing cron jobs and schema changes.
wg.Go(func() error {
return run(ctx, verbClient, changes)
})
// Start watching for schema changes.
wg.Go(func() error {
rpc.RetryStreamingServerStream(ctx, "pull-schema", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error {
changes <- resp
return nil
}, rpc.AlwaysRetry())
return nil
})
err := wg.Wait()
if err != nil {
return fmt.Errorf("cron service stopped: %w", err)
}
return nil
}

func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSchemaResponse) error {
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, verbClient CallClient) error {
logger := log.FromContext(ctx).Scope("cron")
// Map of cron jobs for each module.
cronJobs := map[string][]cronJob{}
Expand All @@ -96,8 +68,8 @@ func run(ctx context.Context, verbClient CallClient, changes chan *ftlv1.PullSch
case <-ctx.Done():
return fmt.Errorf("cron service stopped: %w", ctx.Err())

case resp := <-changes:
if err := updateCronJobs(ctx, cronJobs, resp); err != nil {
case change := <-eventSource.Events():
if err := updateCronJobs(ctx, cronJobs, change); err != nil {
logger.Errorf(err, "Failed to update cron jobs")
continue
}
Expand Down Expand Up @@ -164,31 +136,27 @@ func scheduleNext(cronQueue []cronJob) (time.Duration, bool) {
return time.Until(cronQueue[0].next), true
}

func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, resp *ftlv1.PullSchemaResponse) error {
func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, change schemaeventsource.Event) error {
logger := log.FromContext(ctx).Scope("cron")
switch resp.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED:
switch change := change.(type) {
case schemaeventsource.EventRemove:
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if !resp.ModuleRemoved {
logger.Debugf("Not removing cron jobs for %s as module is still present", resp.GetDeploymentKey())
if !change.Deleted {
logger.Debugf("Not removing cron jobs for %s as module is still present", change.Deployment)
return nil
}
logger.Debugf("Removing cron jobs for module %s", resp.ModuleName)
delete(cronJobs, resp.ModuleName)
logger.Debugf("Removing cron jobs for module %s", change.Module.Name)
delete(cronJobs, change.Module.Name)

case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
logger.Debugf("Updated cron jobs for module %s", resp.ModuleName)
moduleSchema, err := schema.ModuleFromProto(resp.Schema)
if err != nil {
return fmt.Errorf("failed to extract module schema: %w", err)
}
moduleJobs, err := extractCronJobs(moduleSchema)
case schemaeventsource.EventUpsert:
logger.Debugf("Updated cron jobs for module %s", change.Module.Name)
moduleJobs, err := extractCronJobs(change.Module)
if err != nil {
return fmt.Errorf("failed to extract cron jobs: %w", err)
}
logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), resp.ModuleName)
cronJobs[resp.ModuleName] = moduleJobs
logger.Debugf("Adding %d cron jobs for module %s", len(moduleJobs), change.Module.Name)
cronJobs[change.Module.Name] = moduleJobs
}
return nil
}
Expand Down
15 changes: 9 additions & 6 deletions backend/cron/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (
"golang.org/x/sync/errgroup"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
)

type verbClient struct {
Expand All @@ -30,7 +33,7 @@ func (v *verbClient) Call(ctx context.Context, req *connect.Request[ftlv1.CallRe
}

func TestCron(t *testing.T) {
changes := make(chan *ftlv1.PullSchemaResponse, 8)
eventSource := schemaeventsource.NewUnattached()
module := &schema.Module{
Name: "echo",
Decls: []schema.Decl{
Expand All @@ -52,10 +55,10 @@ func TestCron(t *testing.T) {
},
},
}
changes <- &ftlv1.PullSchemaResponse{
ModuleName: "echo",
Schema: module.ToProto().(*schemapb.Module), //nolint:forcetypeassert
}
eventSource.Publish(schemaeventsource.EventUpsert{
Deployment: optional.Some(model.NewDeploymentKey("echo")),
Module: module,
})

ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, log.Config{Level: log.Trace}))
ctx, cancel := context.WithTimeout(ctx, time.Second*5)
Expand All @@ -68,7 +71,7 @@ func TestCron(t *testing.T) {
requests: requestsch,
}

wg.Go(func() error { return run(ctx, client, changes) })
wg.Go(func() error { return Start(ctx, eventSource, client) })

requests := make([]*ftlv1.CallRequest, 0, 2)

Expand Down
125 changes: 10 additions & 115 deletions backend/ingress/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"connectrpc.com/connect"
"github.com/alecthomas/atomic"
"github.com/alecthomas/types/optional"
"github.com/jpillora/backoff"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl/backend/controller/observability"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
Expand All @@ -22,15 +20,10 @@ import (
ftlhttp "github.com/TBD54566975/ftl/internal/http"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/schema/schemaeventsource"
"github.com/TBD54566975/ftl/internal/slices"
)

type PullSchemaClient interface {
PullSchema(ctx context.Context, req *connect.Request[ftlv1.PullSchemaRequest]) (*connect.ServerStreamForClient[ftlv1.PullSchemaResponse], error)
}

type CallClient interface {
Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error)
}
Expand All @@ -50,15 +43,15 @@ func (c *Config) Validate() error {

type service struct {
// Complete schema synchronised from the database.
schemaState atomic.Value[schemaState]
callClient CallClient
view *atomic.Value[materialisedView]
callClient CallClient
}

// Start the HTTP ingress service. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient, verbClient CallClient) error {
wg, ctx := errgroup.WithContext(ctx)
func Start(ctx context.Context, config Config, schemaEventSource schemaeventsource.EventSource, verbClient CallClient) error {
logger := log.FromContext(ctx).Scope("http-ingress")
svc := &service{
view: syncView(ctx, schemaEventSource),
callClient: verbClient,
}

Expand All @@ -72,60 +65,8 @@ func Start(ctx context.Context, config Config, pullSchemaClient PullSchemaClient
}

// Start the HTTP server
wg.Go(func() error {
logger.Infof("HTTP ingress server listening on: %s", config.Bind)
return ftlhttp.Serve(ctx, config.Bind, ingressHandler)
})
// Start watching for schema changes.
wg.Go(func() error {
rpc.RetryStreamingServerStream(ctx, "pull-schema", backoff.Backoff{}, &ftlv1.PullSchemaRequest{}, pullSchemaClient.PullSchema, func(ctx context.Context, resp *ftlv1.PullSchemaResponse) error {
existing := svc.schemaState.Load().protoSchema
newState := schemaState{
protoSchema: &schemapb.Schema{},
httpRoutes: make(map[string][]ingressRoute),
}
if resp.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED {
found := false
if existing != nil {
for i := range existing.Modules {
if existing.Modules[i].Name == resp.ModuleName {
newState.protoSchema.Modules = append(newState.protoSchema.Modules, resp.Schema)
found = true
} else {
newState.protoSchema.Modules = append(newState.protoSchema.Modules, existing.Modules[i])
}
}
}
if !found {
newState.protoSchema.Modules = append(newState.protoSchema.Modules, resp.Schema)
}
} else if existing != nil {
// We see the new state of the module before we see the removed deployment.
// We only want to actually remove if it was not replaced by a new deployment.
if !resp.ModuleRemoved {
logger.Debugf("Not removing ingress for %s as it is not the current deployment", resp.GetDeploymentKey())
return nil
}
for i := range existing.Modules {
if existing.Modules[i].Name != resp.ModuleName {
newState.protoSchema.Modules = append(newState.protoSchema.Modules, existing.Modules[i])
}
}
}
newState.httpRoutes = extractIngressRoutingEntries(newState.protoSchema)
sch, err := schema.FromProto(newState.protoSchema)
if err != nil {
// Not much we can do here, we don't update the state with the broken schema.
logger.Errorf(err, "failed to parse schema")
return nil
}
newState.schema = sch
svc.schemaState.Store(newState)
return nil
}, rpc.AlwaysRetry())
return nil
})
err := wg.Wait()
logger.Infof("HTTP ingress server listening on: %s", config.Bind)
err := ftlhttp.Serve(ctx, config.Bind, ingressHandler)
if err != nil {
return fmt.Errorf("ingress service stopped: %w", err)
}
Expand All @@ -141,58 +82,12 @@ func (s *service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
method := strings.ToLower(r.Method)
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", method, r.URL.Path))

routes := s.schemaState.Load().httpRoutes[r.Method]
state := s.view.Load()
routes := state.routes[r.Method]
if len(routes) == 0 {
http.NotFound(w, r)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal"))
return
}
handleHTTP(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.callClient)
}

type schemaState struct {
protoSchema *schemapb.Schema
schema *schema.Schema
httpRoutes map[string][]ingressRoute
}

type ingressRoute struct {
path string
module string
verb string
method string
}

func extractIngressRoutingEntries(schema *schemapb.Schema) map[string][]ingressRoute {
var ingressRoutes = make(map[string][]ingressRoute)
for _, module := range schema.Modules {
for _, decl := range module.Decls {
if verb, ok := decl.Value.(*schemapb.Decl_Verb); ok {
for _, metadata := range verb.Verb.Metadata {
if ingress, ok := metadata.Value.(*schemapb.Metadata_Ingress); ok {
ingressRoutes[ingress.Ingress.Method] = append(ingressRoutes[ingress.Ingress.Method], ingressRoute{
verb: verb.Verb.Name,
method: ingress.Ingress.Method,
path: ingressPathString(ingress.Ingress.Path),
module: module.Name,
})
}
}
}
}
}
return ingressRoutes
}

func ingressPathString(path []*schemapb.IngressPathComponent) string {
pathString := make([]string, len(path))
for i, p := range path {
switch p.Value.(type) {
case *schemapb.IngressPathComponent_IngressPathLiteral:
pathString[i] = p.GetIngressPathLiteral().Text
case *schemapb.IngressPathComponent_IngressPathParameter:
pathString[i] = fmt.Sprintf("{%s}", p.GetIngressPathParameter().Name)
}
}
return "/" + strings.Join(pathString, "/")
handleHTTP(start, state.schema, requestKey, routes, w, r, s.callClient)
}
Loading

0 comments on commit 9e348dc

Please sign in to comment.