Skip to content

Commit

Permalink
feat: peer to peer routing
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Dec 3, 2024
1 parent 682b4bf commit b10d664
Show file tree
Hide file tree
Showing 11 changed files with 424 additions and 139 deletions.
63 changes: 58 additions & 5 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/alecthomas/kong"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
pubsub2 "github.com/alecthomas/types/pubsub"
"github.com/jackc/pgx/v5"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
Expand Down Expand Up @@ -216,7 +217,8 @@ type Service struct {
increaseReplicaFailures map[string]int
asyncCallsLock sync.Mutex

clientLock sync.Mutex
clientLock sync.Mutex
routeTableUpdated *pubsub2.Topic[struct{}]
}

func New(
Expand Down Expand Up @@ -257,6 +259,7 @@ func New(
clients: ttlcache.New(ttlcache.WithTTL[string, clients](time.Minute)),
config: config,
increaseReplicaFailures: map[string]int{},
routeTableUpdated: pubsub2.New[struct{}](),
}
svc.schemaState.Store(schemaState{routes: map[string]Route{}, schema: &schema.Schema{}})

Expand Down Expand Up @@ -695,33 +698,58 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
// GetModuleContext retrieves config, secrets and DSNs for a module.
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.GetModuleContextRequest], resp *connect.ServerStream[ftlv1.GetModuleContextResponse]) error {
name := req.Msg.Module
updates := s.routeTableUpdated.Subscribe(nil)
defer s.routeTableUpdated.Unsubscribe(updates)

// Initialize checksum to -1; a zero checksum does occur when the context contains no settings
lastChecksum := int64(-1)

callableModuleNames := []string{}
dbTypes := map[string]modulecontext.DBType{}
deps, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get deployments: %w", err))
}
for _, dep := range deps {
callableModules := map[string]bool{}
if dep.Module == name {
for _, decl := range dep.Schema.Decls {
if db, ok := decl.(*schema.Database); ok {
dbType, err := modulecontext.DBTypeFromString(db.Type)
switch entry := decl.(type) {
case *schema.Database:
dbType, err := modulecontext.DBTypeFromString(entry.Type)
if err != nil {
// Not much we can do here
continue
}
dbTypes[db.Name] = dbType
dbTypes[entry.Name] = dbType
case *schema.Verb:
for _, md := range entry.Metadata {
if calls, ok := md.(*schema.MetadataCalls); ok {
for _, call := range calls.Calls {
callableModules[call.Module] = true
}
}
}
default:

}
}
callableModuleNames = maps.Keys(callableModules)
callableModuleNames = slices.Sort(callableModuleNames)
break
}
}
for {
h := sha.New()

routeTable := map[string]string{}
routes := s.schemaState.Load().routes
for _, module := range callableModuleNames {
if route, ok := routes[module]; ok {
routeTable[module] = route.Endpoint
}
}

configs, err := s.cm.MapForModule(ctx, name)
if err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not get configs: %w", err))
Expand All @@ -737,11 +765,14 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl
if err := hashConfigurationMap(h, secrets); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not detect change on secrets: %w", err))
}
if err := hashRoutesTable(h, routeTable); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not detect change on routes: %w", err))
}

checksum := int64(binary.BigEndian.Uint64((h.Sum(nil))[0:8]))

if checksum != lastChecksum {
response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).Build().ToProto()
response := modulecontext.NewBuilder(name).AddConfigs(configs).AddSecrets(secrets).AddRoutes(routeTable).Build().ToProto()

if err := resp.Send(response); err != nil {
return connect.NewError(connect.CodeInternal, fmt.Errorf("could not send response: %w", err))
Expand All @@ -754,6 +785,8 @@ func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftl
case <-ctx.Done():
return nil
case <-time.After(s.config.ModuleUpdateFrequency):
case <-updates:

}
}
}
Expand All @@ -772,6 +805,20 @@ func hashConfigurationMap(h hash.Hash, m map[string][]byte) error {
return nil
}

// hashRoutesTable computes an order invariant checksum on the configuration
// settings supplied in the map.
func hashRoutesTable(h hash.Hash, m map[string]string) error {
keys := maps.Keys(m)
sort.Strings(keys)
for _, k := range keys {
_, err := h.Write(append([]byte(k), m[k]...))
if err != nil {
return fmt.Errorf("error hashing routes: %w", err)
}
}
return nil
}

// hashDatabaseConfiguration computes an order invariant checksum on the database
// configuration settings supplied in the map.
func hashDatabaseConfiguration(h hash.Hash, m map[string]modulecontext.Database) error {
Expand Down Expand Up @@ -1648,6 +1695,7 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
old := s.schemaState.Load().routes
newRoutes := map[string]Route{}
modulesByName := map[string]*schema.Module{}
changed := false

builtins := schema.Builtins().ToProto().(*schemapb.Module) //nolint:forcetypeassert
modulesByName[builtins.Name], err = schema.ModuleFromProto(builtins)
Expand Down Expand Up @@ -1681,6 +1729,7 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
continue
}
deploymentLogger.Infof("Deployed %s", v.Key.String())
changed = true
status.UpdateModuleState(ctx, v.Module, status.BuildStateDeployed)
}
if prev, ok := newRoutes[v.Module]; ok {
Expand All @@ -1693,6 +1742,7 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
if err != nil {
deploymentLogger.Errorf(err, "Failed to set replicas to 0 for deployment %s", prev.Deployment.String())
}
changed = true
}
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: targetEndpoint}
modulesByName[v.Module] = v.Schema
Expand All @@ -1704,6 +1754,9 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
})
combined := &schema.Schema{Modules: orderedModules}
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes})
if changed {
s.routeTableUpdated.Publish(struct{}{})
}
return time.Second, nil
}

Expand Down
Loading

0 comments on commit b10d664

Please sign in to comment.