From c431854028555d2928b18e7165c388720103b384 Mon Sep 17 00:00:00 2001 From: Niall D <4562759+driev@users.noreply.github.com> Date: Thu, 5 Dec 2024 16:38:57 +0000 Subject: [PATCH] refactor(envoy): refactoring and optimising the components that build envoy config (#6119) * adding a proper test * move some stuff around and delete unused methods * linting * cleaning up the test * some renaming and test updates * merging v2 * fix rename * fixing test * add mirror to route configs * reverting some test data * fixing a bug * changing structure for clarity * fix model name * some more optimisations * cleaning up * linting * fix logger * PR review changes --- .../batch_PR#6119_pre_refactor.txt | 49 ++ scheduler/pkg/envoy/processor/incremental.go | 98 +-- .../pkg/envoy/processor/incremental_test.go | 22 +- scheduler/pkg/envoy/processor/server_test.go | 2 +- scheduler/pkg/envoy/resources/cache.go | 13 +- scheduler/pkg/envoy/resources/resource.go | 600 ++++++++---------- .../pkg/envoy/resources/resource_test.go | 180 +++--- scheduler/pkg/envoy/xdscache/seldoncache.go | 308 +++++---- .../xdscache/seldoncache_benchmark_test.go | 9 +- .../pkg/envoy/xdscache/seldoncache_test.go | 236 ++++--- 10 files changed, 759 insertions(+), 758 deletions(-) create mode 100644 scheduler/pkg/envoy/processor/benchmark_results/batch_PR#6119_pre_refactor.txt diff --git a/scheduler/pkg/envoy/processor/benchmark_results/batch_PR#6119_pre_refactor.txt b/scheduler/pkg/envoy/processor/benchmark_results/batch_PR#6119_pre_refactor.txt new file mode 100644 index 0000000000..12e007fde2 --- /dev/null +++ b/scheduler/pkg/envoy/processor/benchmark_results/batch_PR#6119_pre_refactor.txt @@ -0,0 +1,49 @@ +oos: linux +goarch: amd64 +pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor +cpu: 12th Gen Intel(R) Core(TM) i7-12800H +BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 261 4293734 ns/op 1728869 B/op 20354 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 57 19236652 ns/op 11745105 B/op 148552 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 15 165481121 ns/op 306686874 B/op 4121256 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 1 1449165416 ns/op 4010677584 B/op 54872332 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 100 17107753 ns/op 21719815 B/op 273169 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 27 66313339 ns/op 118376950 B/op 1388585 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 3 519671198 ns/op 967958834 B/op 11993174 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 4341797404 ns/op 13883915576 B/op 181619840 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 6369 162639 ns/op 681659 B/op 6429 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 10 102798616 ns/op 264589261 B/op 3582995 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 117708448 ns/op 348929572 B/op 4632386 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 2 683663227 ns/op 2238235876 B/op 29665474 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 12 87394856 ns/op 297229676 B/op 4071575 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 114736702 ns/op 374778539 B/op 4887162 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 340307094 ns/op 1366063312 B/op 16472416 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 2402863570 ns/op 7096934200 B/op 93288518 allocs/op +PASS +ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 45.753s + + +After + + +goos: linux +goarch: amd64 +pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor +cpu: 12th Gen Intel(R) Core(TM) i7-12800H +BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 99 10503076 ns/op 5459504 B/op 75936 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 100 12329506 ns/op 11528623 B/op 147768 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 32 57135215 ns/op 200141562 B/op 2638843 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 3 425881058 ns/op 1155633304 B/op 15271185 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 97 12796395 ns/op 49125454 B/op 644298 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 34 33008128 ns/op 173017588 B/op 2127563 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 4 324180730 ns/op 1990164076 B/op 25634294 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 2786099232 ns/op 16107104968 B/op 215067138 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 5761 230485 ns/op 1276192 B/op 14658 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 12 95563752 ns/op 368770140 B/op 5040953 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 118337597 ns/op 345704658 B/op 4589376 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 3 531757210 ns/op 2030309925 B/op 26656810 allocs/op +BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 18 103613236 ns/op 149228799 B/op 1985478 allocs/op +BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 113923163 ns/op 225481680 B/op 2822586 allocs/op +BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 272610358 ns/op 945017966 B/op 10781455 allocs/op +BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 1734657705 ns/op 3920402984 B/op 49687374 allocs/op +PASS +ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 37.457s diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index fd167081c1..4ae707ea2e 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -79,7 +79,7 @@ func NewIncrementalProcessor( cache: cache, nodeID: nodeID, snapshotVersion: rand.Int63n(1000), - logger: log.WithField("source", "EnvoyServer"), + logger: log.WithField("source", "IncrementalProcessor"), xdsCache: xdscache.NewSeldonXDSCache(log, pipelineGatewayDetails), modelStore: modelStore, experimentServer: experimentServer, @@ -91,7 +91,7 @@ func NewIncrementalProcessor( batchTriggerManual: nil, } - err := ip.setListeners() + err := ip.init() if err != nil { return nil, err } @@ -154,15 +154,11 @@ func (p *IncrementalProcessor) handleExperimentEvents(event coordinator.Experime } else { if event.UpdatedExperiment { err := p.experimentUpdate(exp) - var err2 error if err != nil { logger.WithError(err).Errorf("Failed to process sync for experiment %s", event.String()) - err2 = p.experimentServer.SetStatus(event.ExperimentName, false, err.Error()) + p.setExperimentStatus(event, false, err.Error()) } else { - err2 = p.experimentServer.SetStatus(event.ExperimentName, true, "experiment active") - } - if err2 != nil { - logger.WithError(err2).Errorf("Failed to set experiment activation") + p.setExperimentStatus(event, true, "experiment active") } } } @@ -170,6 +166,14 @@ func (p *IncrementalProcessor) handleExperimentEvents(event coordinator.Experime }() } +func (p *IncrementalProcessor) setExperimentStatus(event coordinator.ExperimentEventMsg, active bool, msg string) { + logger := p.logger.WithField("func", "setExperimentStatus") + err := p.experimentServer.SetStatus(event.ExperimentName, active, msg) + if err != nil { + logger.WithError(err).Errorf("Failed to set experiment activation") + } +} + func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg) { logger := p.logger.WithField("func", "handleModelEvents") logger.Debugf("Received sync for model %s", event.String()) @@ -182,14 +186,15 @@ func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg }() } -func (p *IncrementalProcessor) setListeners() error { +func (p *IncrementalProcessor) init() error { p.mu.Lock() defer p.mu.Unlock() err := p.xdsCache.SetupTLS() if err != nil { return err } - p.xdsCache.AddListeners() + p.xdsCache.AddPermanentListeners() + p.xdsCache.AddPermanentClusters() return nil } @@ -246,45 +251,32 @@ func (p *IncrementalProcessor) removeRouteForServerInEnvoyCache(routeName string return nil } -func (p *IncrementalProcessor) updateEnvoyForModelVersion(modelRouteName string, modelVersion *store.ModelVersion, server *store.ServerSnapshot, trafficPercent uint32, isMirror bool) { +func (p *IncrementalProcessor) updateEnvoyForModelVersion(routeName string, modelVersion *store.ModelVersion, server *store.ServerSnapshot, trafficPercent uint32, isMirror bool) { logger := p.logger.WithField("func", "updateEnvoyForModelVersion") - - assignment := modelVersion.GetAssignment() // Get loaded replicas for model + assignment := modelVersion.GetAssignment() if len(assignment) == 0 { - logger.Debugf("No assigned replicas so returning for %s", modelRouteName) + logger.Debugf("Not updating route: %s - no assigned replicas for %v", routeName, modelVersion) return } - - clusterNameBase := modelVersion.GetMeta().GetName() + "_" + strconv.FormatInt(int64(modelVersion.GetVersion()), 10) - httpClusterName := clusterNameBase + "_http" - grpcClusterName := clusterNameBase + "_grpc" - p.xdsCache.AddCluster(httpClusterName, modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), false) - for _, replicaIdx := range assignment { - replica, ok := server.Replicas[replicaIdx] - if !ok { - logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name) - } else { - p.xdsCache.AddEndpoint(httpClusterName, replica.GetInferenceSvc(), uint32(replica.GetInferenceHttpPort())) - } - } - p.xdsCache.AddCluster(grpcClusterName, modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), true) - for _, replicaIdx := range assignment { - replica, ok := server.Replicas[replicaIdx] - if !ok { - logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name) - } else { - p.xdsCache.AddEndpoint(grpcClusterName, replica.GetInferenceSvc(), uint32(replica.GetInferenceGrpcPort())) - } - } + modelName := modelVersion.GetMeta().GetName() + modelVersionNumber := modelVersion.GetVersion() + httpClusterName, grpcClusterName := getClusterNames(modelName, modelVersionNumber) + p.xdsCache.AddClustersForRoute(routeName, modelName, httpClusterName, grpcClusterName, modelVersionNumber, assignment, server) logPayloads := false if modelVersion.GetDeploymentSpec() != nil { logPayloads = modelVersion.GetDeploymentSpec().LogPayloads } else { - logger.Warnf("model %s has not deployment spec", modelVersion.GetModel().GetMeta().GetName()) + logger.Warnf("model %s has not deployment spec", modelName) } + p.xdsCache.AddRouteClusterTraffic(routeName, modelName, httpClusterName, grpcClusterName, modelVersionNumber, trafficPercent, logPayloads, isMirror) +} - p.xdsCache.AddRouteClusterTraffic(modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), trafficPercent, httpClusterName, grpcClusterName, logPayloads, isMirror) +func getClusterNames(modelVersion string, modelVersionNumber uint32) (string, string) { + clusterNameBase := modelVersion + "_" + strconv.FormatInt(int64(modelVersionNumber), 10) + httpClusterName := clusterNameBase + "_http" + grpcClusterName := clusterNameBase + "_grpc" + return httpClusterName, grpcClusterName } func getTrafficShare(latestModel *store.ModelVersion, lastAvailableModelVersion *store.ModelVersion, weight uint32) (uint32, uint32) { @@ -305,7 +297,7 @@ func (p *IncrementalProcessor) addModelTraffic(routeName string, model *store.Mo if latestModel == nil { logger.Infof("latest model is nil for model %s route %s", model.Name, routeName) } - return fmt.Errorf("No live replica for model %s for model route %s", model.Name, routeName) + return fmt.Errorf("no live replica for model %s for model route %s", model.Name, routeName) } server, err := p.modelStore.GetServer(latestModel.Server(), false, false) @@ -321,6 +313,7 @@ func (p *IncrementalProcessor) addModelTraffic(routeName string, model *store.Mo logger.WithError(err).Errorf("Failed to find server %s for last available model %s", lastAvailableModelVersion.Server(), modelName) return err } + logger.Debugf("Splitting traffic between latest %s:%d %d percent and %s:%d %d percent", modelName, latestModel.GetVersion(), @@ -328,6 +321,7 @@ func (p *IncrementalProcessor) addModelTraffic(routeName string, model *store.Mo modelName, lastAvailableModelVersion.GetVersion(), trafficLastAvailableModel) + p.updateEnvoyForModelVersion(routeName, lastAvailableModelVersion, lastAvailableServer, trafficLastAvailableModel, isMirror) p.updateEnvoyForModelVersion(routeName, latestModel, server, trafficLatestModel, isMirror) } else { @@ -395,12 +389,19 @@ func (p *IncrementalProcessor) addModel(model *store.ModelSnapshot) error { func (p *IncrementalProcessor) addTrafficForExperiment(routeName string, exp *experiment.Experiment) error { switch exp.ResourceType { case experiment.PipelineResourceType: + + var mirrorSplit *resources.PipelineTrafficSplit + trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates)) + for _, candidate := range exp.Candidates { - p.xdsCache.AddPipelineRoute(routeName, candidate.Name, candidate.Weight, false) + trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight}) } if exp.Mirror != nil { - p.xdsCache.AddPipelineRoute(routeName, exp.Mirror.Name, exp.Mirror.Percent, true) + mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent} } + + p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit) + case experiment.ModelResourceType: for _, candidate := range exp.Candidates { candidateModel, err := p.modelStore.GetModel(candidate.Name) @@ -494,17 +495,20 @@ func (p *IncrementalProcessor) addPipeline(pipelineName string) error { if exp.Deleted { return fmt.Errorf("Experiment on pipeline %s, but %s is deleted", pip.Name, *exp.Default) } + var mirrorSplit *resources.PipelineTrafficSplit + trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates)) + for _, candidate := range exp.Candidates { - logger.Infof("Adding pipeline experiment candidate %s %s %d", routeName, candidate.Name, candidate.Weight) - p.xdsCache.AddPipelineRoute(routeName, candidate.Name, candidate.Weight, false) + trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight}) } if exp.Mirror != nil { - logger.Infof("Adding pipeline experiment mirror %s %s %d", routeName, exp.Mirror.Name, exp.Mirror.Percent) - p.xdsCache.AddPipelineRoute(routeName, exp.Mirror.Name, exp.Mirror.Percent, true) + mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent} } + + p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit) } else { logger.Infof("Adding normal pipeline route %s", routeName) - p.xdsCache.AddPipelineRoute(routeName, pip.Name, 100, false) + p.xdsCache.AddPipelineRoute(routeName, []resources.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil) } return p.updateEnvoy() @@ -548,7 +552,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error { model, err := p.modelStore.GetModel(modelName) if err != nil { - logger.WithError(err).Warnf("Failed to sync model %s", modelName) + logger.WithError(err).Warnf("sync: Failed to sync model %s", modelName) if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil { logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName) p.modelStore.UnlockModel(modelName) diff --git a/scheduler/pkg/envoy/processor/incremental_test.go b/scheduler/pkg/envoy/processor/incremental_test.go index cd9aded73b..271c6a2528 100644 --- a/scheduler/pkg/envoy/processor/incremental_test.go +++ b/scheduler/pkg/envoy/processor/incremental_test.go @@ -351,7 +351,7 @@ func TestRollingUpdate(t *testing.T) { experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil), pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore), } - inc.xdsCache.AddListeners() + inc.xdsCache.AddPermanentListeners() for _, op := range test.ops { op(inc, g) } @@ -421,7 +421,7 @@ func TestDraining(t *testing.T) { experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil), pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore), } - inc.xdsCache.AddListeners() + inc.xdsCache.AddPermanentListeners() for _, op := range test.ops { op(inc, g) } @@ -566,7 +566,7 @@ func TestModelSync(t *testing.T) { pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore), pendingModelVersions: test.pendingModelVersions, } - inc.xdsCache.AddListeners() + inc.xdsCache.AddPermanentListeners() for _, op := range test.ops { op(inc, g) } @@ -827,7 +827,8 @@ func TestEnvoySettings(t *testing.T) { inc.handlePipelinesEvents, ) - inc.xdsCache.AddListeners() + inc.xdsCache.AddPermanentListeners() + inc.xdsCache.AddPermanentClusters() for _, op := range test.ops { op(inc, g) time.Sleep(50 * time.Millisecond) // to allow event handlers to process @@ -974,7 +975,7 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route { for _, route := range virtualHost.Routes { trafficSplit := resources.Route{ RouteName: route.Name, - Clusters: make([]resources.TrafficSplits, 0), + Clusters: make([]resources.TrafficSplit, 0), } clusterSpecificer := route.GetRoute().GetClusterSpecifier() @@ -986,20 +987,27 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route { weightedClusters := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_WeightedClusters) for _, weightedCluster := range weightedClusters.WeightedClusters.Clusters { - trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplits{ + trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{ ModelName: weightedCluster.Name, TrafficWeight: weightedCluster.Weight.Value, }) } case *routev3.RouteAction_Cluster: cluster := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_Cluster) - trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplits{ + trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{ ModelName: cluster.Cluster, TrafficWeight: 100, }) } + if len(route.GetRoute().RequestMirrorPolicies) > 0 { + mirror := route.GetRoute().RequestMirrorPolicies[0] + trafficSplit.Mirror = &resources.TrafficSplit{ModelName: mirror.Cluster, TrafficWeight: mirror.RuntimeFraction.DefaultValue.Numerator} + } + + trafficSplits = append(trafficSplits, trafficSplit) + } return trafficSplits diff --git a/scheduler/pkg/envoy/processor/server_test.go b/scheduler/pkg/envoy/processor/server_test.go index 571b8beeae..871cfb059c 100644 --- a/scheduler/pkg/envoy/processor/server_test.go +++ b/scheduler/pkg/envoy/processor/server_test.go @@ -60,7 +60,7 @@ func TestFetch(t *testing.T) { nodeID: "node_1", } - err = inc.setListeners() + err = inc.init() g.Expect(err).To(BeNil()) conn, err := grpc.NewClient(":"+strconv.Itoa(port), grpc.WithTransportCredentials(insecure.NewCredentials())) diff --git a/scheduler/pkg/envoy/resources/cache.go b/scheduler/pkg/envoy/resources/cache.go index e6a795627f..1d63e0ae1d 100644 --- a/scheduler/pkg/envoy/resources/cache.go +++ b/scheduler/pkg/envoy/resources/cache.go @@ -23,17 +23,16 @@ type Listener struct { Address string Port uint32 RouteConfigurationName string - //RouteNames []string } type Route struct { RouteName string LogPayloads bool - Clusters []TrafficSplits - Mirrors []TrafficSplits + Clusters []TrafficSplit + Mirror *TrafficSplit } -type TrafficSplits struct { +type TrafficSplit struct { ModelName string ModelVersion uint32 TrafficWeight uint32 @@ -61,11 +60,11 @@ type Endpoint struct { type PipelineRoute struct { RouteName string - Clusters []PipelineTrafficSplits - Mirrors []PipelineTrafficSplits + Clusters []PipelineTrafficSplit + Mirror *PipelineTrafficSplit } -type PipelineTrafficSplits struct { +type PipelineTrafficSplit struct { PipelineName string TrafficWeight uint32 } diff --git a/scheduler/pkg/envoy/resources/resource.go b/scheduler/pkg/envoy/resources/resource.go index 85df13a899..236329bc60 100644 --- a/scheduler/pkg/envoy/resources/resource.go +++ b/scheduler/pkg/envoy/resources/resource.go @@ -62,6 +62,90 @@ const ( TLSRouteConfigurationName = "listener_tls" ) +var ( + pipelineRoutePathHttp = &route.RouteMatch_Prefix{Prefix: "/v2"} + pipelineRoutePathGrpc = &route.RouteMatch_Prefix{Prefix: "/inference.GRPCInferenceService"} +) + +func MakeHTTPListener(listenerName, address string, + port uint32, + routeConfigurationName string, + serverSecret *Secret, +) *listener.Listener { + routerConfig, _ := anypb.New(&router.Router{}) + // HTTP filter configuration + manager := &hcm.HttpConnectionManager{ + CodecType: hcm.HttpConnectionManager_AUTO, + StatPrefix: listenerName, + AlwaysSetRequestIdInResponse: false, + GenerateRequestId: &wrappers.BoolValue{Value: false}, + RouteSpecifier: &hcm.HttpConnectionManager_Rds{ + Rds: &hcm.Rds{ + ConfigSource: makeConfigSource(), + RouteConfigName: routeConfigurationName, + }, + }, + HttpFilters: []*hcm.HttpFilter{ + { + Name: "envoy.filters.http.tap", + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: createTapConfig(), + }, + }, + { + Name: "envoy.filters.http.lua", + ConfigType: &hcm.HttpFilter_TypedConfig{ + TypedConfig: createHeaderFilter(), + }, + }, + { + Name: wellknown.Router, + ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: routerConfig}, + }, + }, + AccessLog: []*accesslog.AccessLog{ + { + Name: "envoy.access_loggers.file", + ConfigType: &accesslog.AccessLog_TypedConfig{ + TypedConfig: createAccessLogConfig(), + }, + }, + }, + } + pbst, err := anypb.New(manager) + if err != nil { + panic(err) + } + + return &listener.Listener{ + Name: listenerName, + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Protocol: core.SocketAddress_TCP, + Address: address, + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: port, + }, + }, + }, + }, + FilterChains: []*listener.FilterChain{ + { + Filters: []*listener.Filter{ + { + Name: wellknown.HTTPConnectionManager, + ConfigType: &listener.Filter_TypedConfig{ + TypedConfig: pbst, + }, + }, + }, + TransportSocket: createDownstreamTransportSocket(serverSecret), // Add TLS if needed + }, + }, + } +} + func MakeCluster(clusterName string, eps []Endpoint, isGrpc bool, clientSecret *Secret) *cluster.Cluster { if isGrpc { // Need to ensure http 2 is used @@ -133,14 +217,105 @@ func MakeEndpoint(clusterName string, eps []Endpoint) *endpoint.ClusterLoadAssig } } +func MakeRoutes(routes map[string]Route, pipelines map[string]PipelineRoute) (*route.RouteConfiguration, *route.RouteConfiguration) { + rts := make([]*route.Route, 2*(len(routes)+len(pipelines))+ + countModelStickySessions(routes)+ + countPipelineStickySessions(pipelines)) + + for i := range rts { + rts[i] = &route.Route{ + Match: &route.RouteMatch{ + Headers: make([]*route.HeaderMatcher, 2), // We always do 2 header matches + }, + } + } + + rtsMirrors := make([]*route.Route, countModelMirrors(routes)+ + countPipelineMirrors(pipelines)) + + for i := range rtsMirrors { + rtsMirrors[i] = &route.Route{ + Match: &route.RouteMatch{ + Headers: make([]*route.HeaderMatcher, 2), // We always do 2 header matches + }, + } + } + + rtsIndex := 0 + mirrorsIndex := 0 + + for _, r := range routes { + for _, clusterTraffic := range r.Clusters { // it's an experiment, so create some sticky session routes + if isExperiment(r.Clusters) { + makeModelStickySessionEnvoyRoute(r.RouteName, rts[rtsIndex], r.LogPayloads, &clusterTraffic, false) + rtsIndex++ + makeModelStickySessionEnvoyRoute(r.RouteName, rts[rtsIndex], r.LogPayloads, &clusterTraffic, true) + rtsIndex++ + } + } + makeModelEnvoyRoute(&r, rts[rtsIndex], false, false) + rtsIndex++ + makeModelEnvoyRoute(&r, rts[rtsIndex], true, false) + rtsIndex++ + + if r.Mirror != nil { + makeModelEnvoyRoute(&r, rtsMirrors[mirrorsIndex], false, true) + mirrorsIndex++ + makeModelEnvoyRoute(&r, rtsMirrors[mirrorsIndex], true, true) + mirrorsIndex++ + } + } + + // Create Pipeline Routes + for _, r := range pipelines { + if isExperiment(r.Clusters) { // it's an experiment, so create some sticky session routes + for _, clusterTraffic := range r.Clusters { + makePipelineStickySessionEnvoyRoute(r.RouteName, rts[rtsIndex], &clusterTraffic, false) + rtsIndex++ + makePipelineStickySessionEnvoyRoute(r.RouteName, rts[rtsIndex], &clusterTraffic, true) + rtsIndex++ + } + } + + makePipelineEnvoyRoute(&r, rts[rtsIndex], false, false) + rtsIndex++ + makePipelineEnvoyRoute(&r, rts[rtsIndex], true, false) + rtsIndex++ + + if r.Mirror != nil { + makePipelineEnvoyRoute(&r, rtsMirrors[mirrorsIndex], false, true) + mirrorsIndex++ + makePipelineEnvoyRoute(&r, rtsMirrors[mirrorsIndex], true, true) + mirrorsIndex++ + } + } + + return &route.RouteConfiguration{ + Name: DefaultRouteConfigurationName, + VirtualHosts: []*route.VirtualHost{{ + Name: "seldon_service", + Domains: []string{"*"}, + Routes: rts, + }}, + }, + &route.RouteConfiguration{ + Name: MirrorRouteConfigurationName, + VirtualHosts: []*route.VirtualHost{{ + Name: "seldon_mirror", + Domains: []string{"*"}, + Routes: rtsMirrors, + }}, + } +} + func wrapRouteHeader(key string) string { return fmt.Sprintf("%s%s%s", SeldonRouteSeparator, key, SeldonRouteSeparator) } -func createMirrorRouteAction(trafficWeight uint32, rest bool) []*route.RouteAction_RequestMirrorPolicy { +func createMirrorRouteAction(trafficWeight uint32, isGrpc bool) []*route.RouteAction_RequestMirrorPolicy { var mirrors []*route.RouteAction_RequestMirrorPolicy clusterName := MirrorHttpClusterName - if !rest { + if isGrpc { clusterName = MirrorGrpcClusterName } mirrors = append(mirrors, &route.RouteAction_RequestMirrorPolicy{ @@ -157,14 +332,14 @@ func createMirrorRouteAction(trafficWeight uint32, rest bool) []*route.RouteActi // weighted clusters do not play well with session affinity see https://github.com/envoyproxy/envoy/issues/8167 // Traffic shifting may need to be reinvesigated https://github.com/envoyproxy/envoy/pull/18207 -func createWeightedModelClusterAction(clusterTraffics []TrafficSplits, mirrorTraffics []TrafficSplits, rest bool) *route.Route_Route { +func createWeightedModelClusterAction(clusterTraffics []TrafficSplit, mirrorTraffic *TrafficSplit, isGrpc bool) *route.Route_Route { // Add Weighted Clusters with given traffic percentages to each internal model var splits []*route.WeightedCluster_ClusterWeight var mirrors []*route.RouteAction_RequestMirrorPolicy var totWeight uint32 for _, clusterTraffic := range clusterTraffics { clusterName := clusterTraffic.HttpCluster - if !rest { + if isGrpc { clusterName = clusterTraffic.GrpcCluster } totWeight = totWeight + clusterTraffic.TrafficWeight @@ -201,8 +376,8 @@ func createWeightedModelClusterAction(clusterTraffics []TrafficSplits, mirrorTra }) } - if len(mirrorTraffics) > 0 { - mirrors = createMirrorRouteAction(mirrorTraffics[0].TrafficWeight, rest) + if mirrorTraffic != nil { + mirrors = createMirrorRouteAction(mirrorTraffic.TrafficWeight, isGrpc) } action := &route.Route_Route{ Route: &route.RouteAction{ @@ -242,57 +417,26 @@ func getRouteName(routeName string, isPipeline bool, isGrpc bool, isMirror bool) return fmt.Sprintf("%s%s%s%s", routeName, pipelineSuffix, httpSuffix, mirrorSuffix) } -func makeModelHttpRoute(r *Route, rt *route.Route, isMirror bool) { - rt.Name = getRouteName(r.RouteName, false, false, isMirror) - rt.Match.PathSpecifier = modelRouteMatchPathHttp - rt.Match.Headers[0] = &route.HeaderMatcher{ - Name: SeldonModelHeader, // Header name we will match on - HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ - StringMatch: &matcherv3.StringMatcher{ - MatchPattern: &matcherv3.StringMatcher_Exact{ - Exact: r.RouteName, - }, - }, - }, - } - rt.Match.Headers[1] = &route.HeaderMatcher{ - Name: SeldonRouteHeader, - HeaderMatchSpecifier: &route.HeaderMatcher_PresentMatch{ - PresentMatch: false, - }, - } - - if isMirror { - rt.Action = createWeightedModelClusterAction(r.Mirrors, []TrafficSplits{}, true) - } else { - rt.Action = createWeightedModelClusterAction(r.Clusters, r.Mirrors, true) - } - - if r.LogPayloads { - rt.ResponseHeadersToAdd = modelRouteHeaders - } -} - -func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplits, rt *route.Route, isGrpc bool) { +func makeModelStickySessionEnvoyRoute(routeName string, envoyRoute *route.Route, logPayloads bool, clusterTraffic *TrafficSplit, isGrpc bool) { if isGrpc { - rt.Name = r.RouteName + "_grpc_experiment" - rt.Match.PathSpecifier = modelRouteMatchPathGrpc + envoyRoute.Name = routeName + "_grpc_experiment" + envoyRoute.Match.PathSpecifier = modelRouteMatchPathGrpc } else { - rt.Name = r.RouteName + "_http_experiment" - rt.Match.PathSpecifier = modelRouteMatchPathHttp + envoyRoute.Name = routeName + "_http_experiment" + envoyRoute.Match.PathSpecifier = modelRouteMatchPathHttp } - rt.Match.Headers[0] = &route.HeaderMatcher{ + envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ Name: SeldonModelHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Exact{ - Exact: r.RouteName, + Exact: routeName, }, }, }, } - rt.Match.Headers[1] = &route.HeaderMatcher{ + envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ Name: SeldonRouteHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ @@ -304,7 +448,7 @@ func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplits, rt *ro }, } - rt.RequestHeadersToAdd = []*core.HeaderValueOption{ + envoyRoute.RequestHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ Key: SeldonInternalModelHeader, @@ -319,7 +463,7 @@ func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplits, rt *ro }, }, } - rt.ResponseHeadersToAdd = []*core.HeaderValueOption{ + envoyRoute.ResponseHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ Key: SeldonRouteHeader, @@ -329,7 +473,7 @@ func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplits, rt *ro }, } if isGrpc { - rt.Action = &route.Route_Route{ + envoyRoute.Action = &route.Route_Route{ Route: &route.RouteAction{ Timeout: &duration.Duration{Seconds: DefaultRouteTimeoutSecs}, ClusterSpecifier: &route.RouteAction_Cluster{ @@ -338,7 +482,7 @@ func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplits, rt *ro }, } } else { - rt.Action = &route.Route_Route{ + envoyRoute.Action = &route.Route_Route{ Route: &route.RouteAction{ Timeout: &duration.Duration{Seconds: DefaultRouteTimeoutSecs}, ClusterSpecifier: &route.RouteAction_Cluster{ @@ -347,15 +491,19 @@ func makeModelStickySessionRoute(r *Route, clusterTraffic *TrafficSplits, rt *ro }, } } - if r.LogPayloads { - rt.ResponseHeadersToAdd = append(rt.RequestHeadersToAdd, modelRouteHeaders...) + if logPayloads { + envoyRoute.ResponseHeadersToAdd = append(envoyRoute.RequestHeadersToAdd, modelRouteHeaders...) } } -func makeModelGrpcRoute(r *Route, rt *route.Route, isMirror bool) { - rt.Name = getRouteName(r.RouteName, false, true, isMirror) - rt.Match.PathSpecifier = modelRouteMatchPathGrpc - rt.Match.Headers[0] = &route.HeaderMatcher{ +func makeModelEnvoyRoute(r *Route, envoyRoute *route.Route, isGrpc, isMirror bool) { + envoyRoute.Name = getRouteName(r.RouteName, false, isGrpc, isMirror) + if isGrpc { + envoyRoute.Match.PathSpecifier = modelRouteMatchPathGrpc + } else { + envoyRoute.Match.PathSpecifier = modelRouteMatchPathHttp + } + envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ Name: SeldonModelHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ @@ -365,7 +513,7 @@ func makeModelGrpcRoute(r *Route, rt *route.Route, isMirror bool) { }, }, } - rt.Match.Headers[1] = &route.HeaderMatcher{ + envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ Name: SeldonRouteHeader, HeaderMatchSpecifier: &route.HeaderMatcher_PresentMatch{ PresentMatch: false, @@ -373,33 +521,58 @@ func makeModelGrpcRoute(r *Route, rt *route.Route, isMirror bool) { } if isMirror { - rt.Action = createWeightedModelClusterAction(r.Mirrors, []TrafficSplits{}, false) + envoyRoute.Action = createWeightedModelClusterAction([]TrafficSplit{*r.Mirror}, nil, isGrpc) } else { - rt.Action = createWeightedModelClusterAction(r.Clusters, r.Mirrors, false) + envoyRoute.Action = createWeightedModelClusterAction(r.Clusters, r.Mirror, isGrpc) } if r.LogPayloads { - rt.ResponseHeadersToAdd = append(rt.RequestHeadersToAdd, modelRouteHeaders...) + envoyRoute.ResponseHeadersToAdd = modelRouteHeaders } } -var ( - pipelineRoutePathHttp = &route.RouteMatch_Prefix{Prefix: "/v2"} - pipelineRoutePathGrpc = &route.RouteMatch_Prefix{Prefix: "/inference.GRPCInferenceService"} -) +func makePipelineEnvoyRoute(r *PipelineRoute, envoyRoute *route.Route, isGrpc, isMirror bool) { + envoyRoute.Name = getRouteName(r.RouteName, true, isGrpc, isMirror) + envoyRoute.Match.PathSpecifier = pipelineRoutePathHttp + if isGrpc { + envoyRoute.Match.PathSpecifier = pipelineRoutePathGrpc + } + envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ + Name: SeldonModelHeader, // Header name we will match on + HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ + StringMatch: &matcherv3.StringMatcher{ + MatchPattern: &matcherv3.StringMatcher_Exact{ + Exact: r.RouteName, + }, + }, + }, + } + envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ + Name: SeldonRouteHeader, + HeaderMatchSpecifier: &route.HeaderMatcher_PresentMatch{ + PresentMatch: false, + }, + } + + if isMirror { + envoyRoute.Action = createWeightedPipelineClusterAction([]PipelineTrafficSplit{*r.Mirror}, nil, isGrpc) + } else { + envoyRoute.Action = createWeightedPipelineClusterAction(r.Clusters, r.Mirror, isGrpc) + } +} func getPipelineModelName(pipelineName string) string { return fmt.Sprintf("%s.%s", pipelineName, SeldonPipelineHeaderSuffix) } -func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplits, mirrorTraffics []PipelineTrafficSplits, rest bool) *route.Route_Route { +func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplit, mirrorTraffic *PipelineTrafficSplit, isGrpc bool) *route.Route_Route { // Add Weighted Clusters with given traffic percentages to each internal model var splits []*route.WeightedCluster_ClusterWeight var mirrors []*route.RouteAction_RequestMirrorPolicy var totWeight uint32 for _, clusterTraffic := range clusterTraffics { clusterName := PipelineGatewayHttpClusterName - if !rest { + if isGrpc { clusterName = PipelineGatewayGrpcClusterName } totWeight = totWeight + clusterTraffic.TrafficWeight @@ -428,8 +601,8 @@ func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplits }) } - if len(mirrorTraffics) > 0 { - mirrors = createMirrorRouteAction(mirrorTraffics[0].TrafficWeight, rest) + if mirrorTraffic != nil { + mirrors = createMirrorRouteAction(mirrorTraffic.TrafficWeight, isGrpc) } action := &route.Route_Route{ Route: &route.RouteAction{ @@ -445,70 +618,16 @@ func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplits return action } -func makePipelineHttpRoute(r *PipelineRoute, rt *route.Route, isMirror bool) { - rt.Name = getRouteName(r.RouteName, true, false, isMirror) - rt.Match.PathSpecifier = pipelineRoutePathHttp - rt.Match.Headers[0] = &route.HeaderMatcher{ - Name: SeldonModelHeader, // Header name we will match on - HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ - StringMatch: &matcherv3.StringMatcher{ - MatchPattern: &matcherv3.StringMatcher_Exact{ - Exact: r.RouteName, - }, - }, - }, - } - rt.Match.Headers[1] = &route.HeaderMatcher{ - Name: SeldonRouteHeader, - HeaderMatchSpecifier: &route.HeaderMatcher_PresentMatch{ - PresentMatch: false, - }, - } - - if isMirror { - rt.Action = createWeightedPipelineClusterAction(r.Mirrors, []PipelineTrafficSplits{}, true) - } else { - rt.Action = createWeightedPipelineClusterAction(r.Clusters, r.Mirrors, true) - } -} - -func makePipelineGrpcRoute(r *PipelineRoute, rt *route.Route, isMirror bool) { - rt.Name = getRouteName(r.RouteName, true, true, isMirror) - rt.Match.PathSpecifier = pipelineRoutePathGrpc - rt.Match.Headers[0] = &route.HeaderMatcher{ - Name: SeldonModelHeader, // Header name we will match on - HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ - StringMatch: &matcherv3.StringMatcher{ - MatchPattern: &matcherv3.StringMatcher_Exact{ - Exact: r.RouteName, - }, - }, - }, - } - rt.Match.Headers[1] = &route.HeaderMatcher{ - Name: SeldonRouteHeader, - HeaderMatchSpecifier: &route.HeaderMatcher_PresentMatch{ - PresentMatch: false, - }, - } - - if isMirror { - rt.Action = createWeightedPipelineClusterAction(r.Mirrors, []PipelineTrafficSplits{}, false) - } else { - rt.Action = createWeightedPipelineClusterAction(r.Clusters, r.Mirrors, false) - } -} - -func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTrafficSplits, rt *route.Route, isGrpc bool) { +func makePipelineStickySessionEnvoyRoute(routeName string, envoyRoute *route.Route, clusterTraffic *PipelineTrafficSplit, isGrpc bool) { if isGrpc { - rt.Name = r.RouteName + "_grpc_experiment" - rt.Match.PathSpecifier = pipelineRoutePathGrpc + envoyRoute.Name = routeName + "_grpc_experiment" + envoyRoute.Match.PathSpecifier = pipelineRoutePathGrpc } else { - rt.Name = r.RouteName + "_http_experiment" - rt.Match.PathSpecifier = pipelineRoutePathHttp + envoyRoute.Name = routeName + "_http_experiment" + envoyRoute.Match.PathSpecifier = pipelineRoutePathHttp } - rt.Match.Headers[0] = &route.HeaderMatcher{ + envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ Name: SeldonRouteHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ @@ -518,17 +637,17 @@ func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTr }, }, } - rt.Match.Headers[1] = &route.HeaderMatcher{ + envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ Name: SeldonModelHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Exact{ - Exact: r.RouteName, + Exact: routeName, }, }, }, } - rt.RequestHeadersToAdd = []*core.HeaderValueOption{ + envoyRoute.RequestHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ Key: SeldonInternalModelHeader, @@ -536,7 +655,7 @@ func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTr }, }, } - rt.ResponseHeadersToAdd = []*core.HeaderValueOption{ + envoyRoute.ResponseHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ Key: SeldonRouteHeader, @@ -545,7 +664,7 @@ func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTr }, } if isGrpc { - rt.Action = &route.Route_Route{ + envoyRoute.Action = &route.Route_Route{ Route: &route.RouteAction{ Timeout: &duration.Duration{Seconds: DefaultRouteTimeoutSecs}, ClusterSpecifier: &route.RouteAction_Cluster{ @@ -554,7 +673,7 @@ func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTr }, } } else { - rt.Action = &route.Route_Route{ + envoyRoute.Action = &route.Route_Route{ Route: &route.RouteAction{ Timeout: &duration.Duration{Seconds: DefaultRouteTimeoutSecs}, ClusterSpecifier: &route.RouteAction_Cluster{ @@ -565,156 +684,50 @@ func makePipelineStickySessionRoute(r *PipelineRoute, clusterTraffic *PipelineTr } } -// This will allow sticky sessions for a) any experiment b) any in progression rollout -func isModelExperiment(r *Route) bool { - return len(r.Clusters) > 1 -} - -func isPipelineExperiment(r *PipelineRoute) bool { - return len(r.Clusters) > 1 -} - -func calcNumberOfModelStickySessionsNeeded(modelRoutes []*Route) int { +func countModelStickySessions(routes map[string]Route) int { count := 0 - for _, r := range modelRoutes { - if isModelExperiment(r) { + for _, r := range routes { + if isExperiment(r.Clusters) { count = count + (len(r.Clusters) * 2) // REST and GRPC routes for each model in an experiment } } return count } -func calcNumberOfPipelineStickySessionsNeeded(pipelineRoutes []*PipelineRoute) int { +func countPipelineStickySessions(pipelineRoutes map[string]PipelineRoute) int { count := 0 for _, r := range pipelineRoutes { - if isPipelineExperiment(r) { + if isExperiment(r.Clusters) { count = count + (len(r.Clusters) * 2) // REST and GRPC routes for each model in an experiment } } return count } -func calcNumberOfModelMirrorsNeeded(modelRoutes []*Route) int { +func isExperiment[T any](clusters []T) bool { + return len(clusters) > 1 +} + +func countModelMirrors(models map[string]Route) int { count := 0 - for _, r := range modelRoutes { - if len(r.Mirrors) > 0 { + for _, r := range models { + if r.Mirror != nil { count = count + 2 // REST and gRPC } } return count } -func calcNumberOfPipelineMirrorsNeeded(pipelineRoutes []*PipelineRoute) int { +func countPipelineMirrors(pipelines map[string]PipelineRoute) int { count := 0 - for _, r := range pipelineRoutes { - if len(r.Mirrors) > 0 { + for _, r := range pipelines { + if r.Mirror != nil { count = count + 2 // REST and gRPC } } return count } -func MakeRoute(modelRoutes []*Route, pipelineRoutes []*PipelineRoute) (*route.RouteConfiguration, *route.RouteConfiguration) { - rts := make([]*route.Route, 2*(len(modelRoutes)+ - len(pipelineRoutes))+ - calcNumberOfModelStickySessionsNeeded(modelRoutes)+ - calcNumberOfPipelineStickySessionsNeeded(pipelineRoutes)) - // Pre-allocate objects for better CPU pipelining - // Warning: assumes a fixes number of route-match headers - for i := 0; i < len(rts); i++ { - rts[i] = &route.Route{ - Match: &route.RouteMatch{ - Headers: make([]*route.HeaderMatcher, 2), // We always do 2 header matches - }, - } - } - - idx := 0 - - // Create Model Routes - for _, r := range modelRoutes { - for _, clusterTraffic := range r.Clusters { - if isModelExperiment(r) { - makeModelStickySessionRoute(r, &clusterTraffic, rts[idx], false) - idx++ - makeModelStickySessionRoute(r, &clusterTraffic, rts[idx], true) - idx++ - } - } - makeModelHttpRoute(r, rts[idx], false) - idx++ - makeModelGrpcRoute(r, rts[idx], false) - idx++ - } - - // Create Pipeline Routes - for _, r := range pipelineRoutes { - if isPipelineExperiment(r) { - for _, clusterTraffic := range r.Clusters { - makePipelineStickySessionRoute(r, &clusterTraffic, rts[idx], false) - idx++ - makePipelineStickySessionRoute(r, &clusterTraffic, rts[idx], true) - idx++ - } - } - makePipelineHttpRoute(r, rts[idx], false) - idx++ - makePipelineGrpcRoute(r, rts[idx], false) - idx++ - } - - rtsMirrors := make([]*route.Route, calcNumberOfModelMirrorsNeeded(modelRoutes)+ - calcNumberOfPipelineMirrorsNeeded(pipelineRoutes)) - // Pre-allocate objects for better CPU pipelining - // Warning: assumes a fixes number of route-match headers - for i := 0; i < len(rtsMirrors); i++ { - rtsMirrors[i] = &route.Route{ - Match: &route.RouteMatch{ - Headers: make([]*route.HeaderMatcher, 2), // We always do 2 header matches - }, - } - } - - idx = 0 - - // Create Model Mirror Routes - for _, r := range modelRoutes { - if len(r.Mirrors) > 0 { - makeModelHttpRoute(r, rtsMirrors[idx], true) - idx++ - makeModelGrpcRoute(r, rtsMirrors[idx], true) - idx++ - } - } - - // Create Pipeline Mirror Routes - for _, r := range pipelineRoutes { - if len(r.Mirrors) > 0 { - makePipelineHttpRoute(r, rtsMirrors[idx], true) - idx++ - makePipelineGrpcRoute(r, rtsMirrors[idx], true) - idx++ - } - } - - return &route.RouteConfiguration{ - Name: DefaultRouteConfigurationName, - VirtualHosts: []*route.VirtualHost{{ - Name: "seldon_service", - Domains: []string{"*"}, - Routes: rts, - }}, - }, - &route.RouteConfiguration{ - Name: MirrorRouteConfigurationName, - VirtualHosts: []*route.VirtualHost{{ - Name: "seldon_mirror", - Domains: []string{"*"}, - Routes: rtsMirrors, - }}, - } -} - func createTapConfig() *anypb.Any { // Create Tap Config tapFilter := tapfilter.Tap{ @@ -840,85 +853,6 @@ end return luaAny } -func MakeHTTPListener(listenerName, address string, - port uint32, - routeConfigurationName string, - serverSecret *Secret, -) *listener.Listener { - routerConfig, _ := anypb.New(&router.Router{}) - // HTTP filter configuration - manager := &hcm.HttpConnectionManager{ - CodecType: hcm.HttpConnectionManager_AUTO, - StatPrefix: listenerName, - AlwaysSetRequestIdInResponse: false, - GenerateRequestId: &wrappers.BoolValue{Value: false}, - RouteSpecifier: &hcm.HttpConnectionManager_Rds{ - Rds: &hcm.Rds{ - ConfigSource: makeConfigSource(), - RouteConfigName: routeConfigurationName, - }, - }, - HttpFilters: []*hcm.HttpFilter{ - { - Name: "envoy.filters.http.tap", - ConfigType: &hcm.HttpFilter_TypedConfig{ - TypedConfig: createTapConfig(), - }, - }, - { - Name: "envoy.filters.http.lua", - ConfigType: &hcm.HttpFilter_TypedConfig{ - TypedConfig: createHeaderFilter(), - }, - }, - { - Name: wellknown.Router, - ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: routerConfig}, - }, - }, - AccessLog: []*accesslog.AccessLog{ - { - Name: "envoy.access_loggers.file", - ConfigType: &accesslog.AccessLog_TypedConfig{ - TypedConfig: createAccessLogConfig(), - }, - }, - }, - } - pbst, err := anypb.New(manager) - if err != nil { - panic(err) - } - - return &listener.Listener{ - Name: listenerName, - Address: &core.Address{ - Address: &core.Address_SocketAddress{ - SocketAddress: &core.SocketAddress{ - Protocol: core.SocketAddress_TCP, - Address: address, - PortSpecifier: &core.SocketAddress_PortValue{ - PortValue: port, - }, - }, - }, - }, - FilterChains: []*listener.FilterChain{ - { - Filters: []*listener.Filter{ - { - Name: wellknown.HTTPConnectionManager, - ConfigType: &listener.Filter_TypedConfig{ - TypedConfig: pbst, - }, - }, - }, - TransportSocket: createDownstreamTransportSocket(serverSecret), // Add TLS if needed - }, - }, - } -} - func makeConfigSource() *core.ConfigSource { source := &core.ConfigSource{} source.ResourceApiVersion = resource.DefaultAPIVersion diff --git a/scheduler/pkg/envoy/resources/resource_test.go b/scheduler/pkg/envoy/resources/resource_test.go index 0a19c6b7a1..a3f697707f 100644 --- a/scheduler/pkg/envoy/resources/resource_test.go +++ b/scheduler/pkg/envoy/resources/resource_test.go @@ -19,8 +19,8 @@ func TestMakeRoute(t *testing.T) { g := NewGomegaWithT(t) type test struct { name string - modelRoutes []*Route - pipelineRoutes []*PipelineRoute + modelRoutes map[string]Route + pipelineRoutes map[string]PipelineRoute expectedDefaultRoutes int expectedMirrorRoutes int } @@ -28,142 +28,132 @@ func TestMakeRoute(t *testing.T) { tests := []test{ { name: "one model", - modelRoutes: []*Route{ - { - RouteName: "r1", - Clusters: []TrafficSplits{ - { - ModelName: "m1", - ModelVersion: 1, - TrafficWeight: 100, - HttpCluster: "h1", - GrpcCluster: "g1", - }, + modelRoutes: map[string]Route{"r1": { + RouteName: "r1", + Clusters: []TrafficSplit{ + { + ModelName: "m1", + ModelVersion: 1, + TrafficWeight: 100, + HttpCluster: "h1", + GrpcCluster: "g1", }, }, }, + }, expectedDefaultRoutes: 2, expectedMirrorRoutes: 0, }, { name: "one pipeline", - pipelineRoutes: []*PipelineRoute{ - { - RouteName: "r1", - Clusters: []PipelineTrafficSplits{ - { - PipelineName: "p1", - TrafficWeight: 100, - }, + pipelineRoutes: map[string]PipelineRoute{"r1": { + RouteName: "r1", + Clusters: []PipelineTrafficSplit{ + { + PipelineName: "p1", + TrafficWeight: 100, }, }, }, + }, expectedDefaultRoutes: 2, expectedMirrorRoutes: 0, }, { name: "pipeline experiment", - pipelineRoutes: []*PipelineRoute{ - { - RouteName: "r1", - Clusters: []PipelineTrafficSplits{ - { - PipelineName: "p1", - TrafficWeight: 50, - }, - { - PipelineName: "p2", - TrafficWeight: 50, - }, + pipelineRoutes: map[string]PipelineRoute{"r1": { + RouteName: "r1", + Clusters: []PipelineTrafficSplit{ + { + PipelineName: "p1", + TrafficWeight: 50, + }, + { + PipelineName: "p2", + TrafficWeight: 50, }, }, }, + }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 0, }, { name: "pipeline experiment with mirror", - pipelineRoutes: []*PipelineRoute{ - { - RouteName: "r1", - Clusters: []PipelineTrafficSplits{ - { - PipelineName: "p1", - TrafficWeight: 50, - }, - { - PipelineName: "p2", - TrafficWeight: 50, - }, + pipelineRoutes: map[string]PipelineRoute{"r1": { + RouteName: "r1", + Clusters: []PipelineTrafficSplit{ + { + PipelineName: "p1", + TrafficWeight: 50, }, - Mirrors: []PipelineTrafficSplits{ - { - PipelineName: "p3", - TrafficWeight: 100, - }, + { + PipelineName: "p2", + TrafficWeight: 50, }, }, + Mirror: &PipelineTrafficSplit{ + PipelineName: "p3", + TrafficWeight: 100, + }, + }, }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 2, }, { name: "model experiment", - modelRoutes: []*Route{ - { - RouteName: "r1", - Clusters: []TrafficSplits{ - { - ModelName: "m1", - ModelVersion: 1, - TrafficWeight: 50, - HttpCluster: "h1", - GrpcCluster: "g1", - }, - { - ModelName: "m2", - ModelVersion: 1, - TrafficWeight: 50, - HttpCluster: "h1", - GrpcCluster: "g1", - }, + modelRoutes: map[string]Route{"r1": { + RouteName: "r1", + Clusters: []TrafficSplit{ + { + ModelName: "m1", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", + }, + { + ModelName: "m2", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", }, }, }, + }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 0, }, { name: "experiment with model mirror", - modelRoutes: []*Route{ - { - RouteName: "r1", - Clusters: []TrafficSplits{ - { - ModelName: "m1", - ModelVersion: 1, - TrafficWeight: 50, - HttpCluster: "h1", - GrpcCluster: "g1", - }, - { - ModelName: "m2", - ModelVersion: 1, - TrafficWeight: 50, - HttpCluster: "h1", - GrpcCluster: "g1", - }, + modelRoutes: map[string]Route{"r1": { + RouteName: "r1", + Clusters: []TrafficSplit{ + { + ModelName: "m1", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", }, - Mirrors: []TrafficSplits{ - { - ModelName: "m3", - ModelVersion: 1, - TrafficWeight: 100, - HttpCluster: "h1", - GrpcCluster: "g1", - }, + { + ModelName: "m2", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", }, }, + Mirror: &TrafficSplit{ + ModelName: "m3", + ModelVersion: 1, + TrafficWeight: 100, + HttpCluster: "h1", + GrpcCluster: "g1", + }, + }, }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 2, @@ -172,7 +162,7 @@ func TestMakeRoute(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - rcDef, rcMirror := MakeRoute(test.modelRoutes, test.pipelineRoutes) + rcDef, rcMirror := MakeRoutes(test.modelRoutes, test.pipelineRoutes) g.Expect(len(rcDef.VirtualHosts[0].Routes)).To(Equal(test.expectedDefaultRoutes)) g.Expect(len(rcMirror.VirtualHosts[0].Routes)).To(Equal(test.expectedMirrorRoutes)) }) diff --git a/scheduler/pkg/envoy/xdscache/seldoncache.go b/scheduler/pkg/envoy/xdscache/seldoncache.go index 9ecc0ea27a..9e47a36612 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache.go @@ -18,6 +18,7 @@ import ( seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" ) const ( @@ -29,6 +30,9 @@ const ( mirrorListenerAddress = "0.0.0.0" mirrorListenerPort uint32 = 9001 + permanentListenerCount int = 2 // seldon_service and seldon_mirrors + permanentClusterCount int = 4 // pipeline gateway * 2 + model gateway * 2 + EnvoyDownstreamServerCertName = "downstream_server" EnvoyDownstreamClientCertName = "downstream_client" EnvoyUpstreamServerCertName = "upstream_server" @@ -36,7 +40,8 @@ const ( ) type SeldonXDSCache struct { - Listeners map[string]resources.Listener + permanentListeners []types.Resource + permanentClusters []types.Resource Routes map[string]resources.Route Clusters map[string]resources.Cluster Pipelines map[string]resources.PipelineRoute @@ -54,19 +59,56 @@ type PipelineGatewayDetails struct { func NewSeldonXDSCache(logger logrus.FieldLogger, pipelineGatewayDetails *PipelineGatewayDetails) *SeldonXDSCache { return &SeldonXDSCache{ - Listeners: make(map[string]resources.Listener), + permanentListeners: make([]types.Resource, permanentListenerCount), + permanentClusters: make([]types.Resource, permanentClusterCount), Clusters: make(map[string]resources.Cluster), Routes: make(map[string]resources.Route), Pipelines: make(map[string]resources.PipelineRoute), Secrets: make(map[string]resources.Secret), PipelineGatewayDetails: pipelineGatewayDetails, - logger: logger.WithField("source", "XDSCache"), + logger: logger.WithField("source", "SeldonXDSCache"), } } -func (xds *SeldonXDSCache) ClusterContents() []types.Resource { - var r []types.Resource +func (xds *SeldonXDSCache) SetupTLS() error { + logger := xds.logger.WithField("func", "SetupTLS") + protocol := seldontls.GetSecurityProtocolFromEnv(seldontls.EnvSecurityPrefixEnvoy) + if protocol == seldontls.SecurityProtocolSSL { + xds.TLSActive = true + + // Envoy client to talk to agent or Pipelinegateway + logger.Info("Upstream TLS active") + tlsUpstreamClient, err := seldontls.NewCertificateStore(seldontls.Prefix(seldontls.EnvSecurityPrefixEnvoyUpstreamClient), + seldontls.ValidationPrefix(seldontls.EnvSecurityPrefixEnvoyUpstreamServer)) + if err != nil { + return err + } + xds.AddSecret(EnvoyUpstreamClientCertName, EnvoyUpstreamServerCertName, tlsUpstreamClient) + + // Envoy listener - external calls to Seldon + logger.Info("Downstream TLS active") + tlsDownstreamServer, err := seldontls.NewCertificateStore(seldontls.Prefix(seldontls.EnvSecurityPrefixEnvoyDownstreamServer), + seldontls.ValidationPrefix(seldontls.EnvSecurityPrefixEnvoyDownstreamClient)) + if err != nil { + return err + } + xds.AddSecret(EnvoyDownstreamServerCertName, EnvoyDownstreamClientCertName, tlsDownstreamServer) + } + return nil +} +func (xds *SeldonXDSCache) AddPermanentListeners() { + var serverSecret *resources.Secret + if xds.TLSActive { + if secret, ok := xds.Secrets[EnvoyDownstreamServerCertName]; ok { + serverSecret = &secret + } + } + xds.permanentListeners[0] = resources.MakeHTTPListener(defaultListenerName, defaultListenerAddress, defaultListenerPort, resources.DefaultRouteConfigurationName, serverSecret) + xds.permanentListeners[1] = resources.MakeHTTPListener(mirrorListenerName, mirrorListenerAddress, mirrorListenerPort, resources.MirrorRouteConfigurationName, serverSecret) +} + +func (xds *SeldonXDSCache) AddPermanentClusters() { var clientSecret *resources.Secret if xds.TLSActive { if secret, ok := xds.Secrets[EnvoyUpstreamClientCertName]; ok { @@ -76,35 +118,49 @@ func (xds *SeldonXDSCache) ClusterContents() []types.Resource { // Add pipeline gateway clusters xds.logger.Infof("Add http pipeline cluster %s host:%s port:%d", resources.PipelineGatewayHttpClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.HttpPort) - r = append(r, resources.MakeCluster(resources.PipelineGatewayHttpClusterName, []resources.Endpoint{ + xds.permanentClusters[0] = resources.MakeCluster(resources.PipelineGatewayHttpClusterName, []resources.Endpoint{ { UpstreamHost: xds.PipelineGatewayDetails.Host, UpstreamPort: uint32(xds.PipelineGatewayDetails.HttpPort), }, - }, false, clientSecret)) + }, false, clientSecret) + xds.logger.Infof("Add grpc pipeline cluster %s host:%s port:%d", resources.PipelineGatewayGrpcClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.GrpcPort) - r = append(r, resources.MakeCluster(resources.PipelineGatewayGrpcClusterName, []resources.Endpoint{ + xds.permanentClusters[1] = resources.MakeCluster(resources.PipelineGatewayGrpcClusterName, []resources.Endpoint{ { UpstreamHost: xds.PipelineGatewayDetails.Host, UpstreamPort: uint32(xds.PipelineGatewayDetails.GrpcPort), }, - }, true, clientSecret)) + }, true, clientSecret) // Add Mirror clusters xds.logger.Infof("Add http mirror cluster %s host:%s port:%d", resources.MirrorHttpClusterName, mirrorListenerAddress, mirrorListenerPort) - r = append(r, resources.MakeCluster(resources.MirrorHttpClusterName, []resources.Endpoint{ + xds.permanentClusters[2] = resources.MakeCluster(resources.MirrorHttpClusterName, []resources.Endpoint{ { UpstreamHost: mirrorListenerAddress, UpstreamPort: mirrorListenerPort, }, - }, false, nil)) + }, false, nil) xds.logger.Infof("Add grpc mirror cluster %s host:%s port:%d", resources.MirrorGrpcClusterName, mirrorListenerAddress, mirrorListenerPort) - r = append(r, resources.MakeCluster(resources.MirrorGrpcClusterName, []resources.Endpoint{ + xds.permanentClusters[3] = resources.MakeCluster(resources.MirrorGrpcClusterName, []resources.Endpoint{ { UpstreamHost: mirrorListenerAddress, UpstreamPort: mirrorListenerPort, }, - }, true, nil)) + }, true, nil) +} + +func (xds *SeldonXDSCache) ClusterContents() []types.Resource { + var r []types.Resource + + var clientSecret *resources.Secret + if xds.TLSActive { + if secret, ok := xds.Secrets[EnvoyUpstreamClientCertName]; ok { + clientSecret = &secret + } + } + + r = append(r, xds.permanentClusters...) for _, c := range xds.Clusters { endpoints := make([]resources.Endpoint, 0, len(c.Endpoints)) @@ -118,41 +174,12 @@ func (xds *SeldonXDSCache) ClusterContents() []types.Resource { } func (xds *SeldonXDSCache) RouteContents() []types.Resource { - routesArray := make([]*resources.Route, len(xds.Routes)) - rIdx := 0 - for _, r := range xds.Routes { // This could be very large as is equal to number of models (100k?) - modelRoute := r - routesArray[rIdx] = &modelRoute - rIdx++ - } - - pipelinesArray := make([]*resources.PipelineRoute, len(xds.Pipelines)) - pIdx := 0 - for _, r := range xds.Pipelines { // Likely to be less pipelines than models - pipelineRoute := r - pipelinesArray[pIdx] = &pipelineRoute - pIdx++ - } - - defaultRoutes, mirrorRoutes := resources.MakeRoute(routesArray, pipelinesArray) + defaultRoutes, mirrorRoutes := resources.MakeRoutes(xds.Routes, xds.Pipelines) return []types.Resource{defaultRoutes, mirrorRoutes} } func (xds *SeldonXDSCache) ListenerContents() []types.Resource { - var r []types.Resource - - var serverSecret *resources.Secret - if xds.TLSActive { - if secret, ok := xds.Secrets[EnvoyDownstreamServerCertName]; ok { - serverSecret = &secret - } - } - - for _, l := range xds.Listeners { - r = append(r, resources.MakeHTTPListener(l.Name, l.Address, l.Port, l.RouteConfigurationName, serverSecret)) - } - - return r + return xds.permanentListeners } func (xds *SeldonXDSCache) SecretContents() []types.Resource { @@ -170,42 +197,18 @@ func (xds *SeldonXDSCache) SecretContents() []types.Resource { return r } -func (xds *SeldonXDSCache) AddPipelineRoute(routeName string, pipelineName string, trafficWeight uint32, isMirror bool) { +func (xds *SeldonXDSCache) AddPipelineRoute(routeName string, trafficSplits []resources.PipelineTrafficSplit, mirror *resources.PipelineTrafficSplit) { + xds.RemovePipelineRoute(routeName) pipelineRoute, ok := xds.Pipelines[routeName] if !ok { - if isMirror { - xds.Pipelines[routeName] = resources.PipelineRoute{ - RouteName: routeName, - Mirrors: []resources.PipelineTrafficSplits{ - { - PipelineName: pipelineName, - TrafficWeight: trafficWeight, - }, - }, - } - } else { - xds.Pipelines[routeName] = resources.PipelineRoute{ - RouteName: routeName, - Clusters: []resources.PipelineTrafficSplits{ - { - PipelineName: pipelineName, - TrafficWeight: trafficWeight, - }, - }, - } + xds.Pipelines[routeName] = resources.PipelineRoute{ + RouteName: routeName, + Mirror: mirror, + Clusters: trafficSplits, } } else { - if isMirror { - pipelineRoute.Mirrors = append(pipelineRoute.Mirrors, resources.PipelineTrafficSplits{ - PipelineName: pipelineName, - TrafficWeight: trafficWeight, - }) - } else { - pipelineRoute.Clusters = append(pipelineRoute.Clusters, resources.PipelineTrafficSplits{ - PipelineName: pipelineName, - TrafficWeight: trafficWeight, - }) - } + pipelineRoute.Mirror = mirror + pipelineRoute.Clusters = trafficSplits xds.Pipelines[routeName] = pipelineRoute } } @@ -222,55 +225,10 @@ func (xds *SeldonXDSCache) AddSecret(name string, validationSecretName string, c } } -func (xds *SeldonXDSCache) SetupTLS() error { - logger := xds.logger.WithField("func", "SetupTLS") - protocol := seldontls.GetSecurityProtocolFromEnv(seldontls.EnvSecurityPrefixEnvoy) - if protocol == seldontls.SecurityProtocolSSL { - xds.TLSActive = true - - // Envoy client to talk to agent or Pipelinegateway - logger.Info("Upstream TLS active") - tlsUpstreamClient, err := seldontls.NewCertificateStore(seldontls.Prefix(seldontls.EnvSecurityPrefixEnvoyUpstreamClient), - seldontls.ValidationPrefix(seldontls.EnvSecurityPrefixEnvoyUpstreamServer)) - if err != nil { - return err - } - xds.AddSecret(EnvoyUpstreamClientCertName, EnvoyUpstreamServerCertName, tlsUpstreamClient) - - // Envoy listener - external calls to Seldon - logger.Info("Downstream TLS active") - tlsDownstreamServer, err := seldontls.NewCertificateStore(seldontls.Prefix(seldontls.EnvSecurityPrefixEnvoyDownstreamServer), - seldontls.ValidationPrefix(seldontls.EnvSecurityPrefixEnvoyDownstreamClient)) - if err != nil { - return err - } - xds.AddSecret(EnvoyDownstreamServerCertName, EnvoyDownstreamClientCertName, tlsDownstreamServer) - } - return nil -} - -func (xds *SeldonXDSCache) AddListeners() { - xds.Listeners[defaultListenerName] = resources.Listener{ - Name: defaultListenerName, - Address: defaultListenerAddress, - Port: defaultListenerPort, - RouteConfigurationName: resources.DefaultRouteConfigurationName, - } - xds.Listeners[mirrorListenerName] = resources.Listener{ - Name: mirrorListenerName, - Address: mirrorListenerAddress, - Port: mirrorListenerPort, - RouteConfigurationName: resources.MirrorRouteConfigurationName, - } -} - func (xds *SeldonXDSCache) AddRouteClusterTraffic( - routeName string, - modelName string, + routeName, modelName, httpClusterName, grpcClusterName string, modelVersion uint32, trafficPercent uint32, - httpClusterName string, - grpcClusterName string, logPayloads bool, isMirror bool, ) { @@ -281,65 +239,80 @@ func (xds *SeldonXDSCache) AddRouteClusterTraffic( LogPayloads: logPayloads, } } + // Always log payloads if any version wants it - so during a rolling update if one wants it then it will done if logPayloads { route.LogPayloads = true } - clusterTraffic := resources.TrafficSplits{ + clusterTraffic := resources.TrafficSplit{ ModelName: modelName, ModelVersion: modelVersion, TrafficWeight: trafficPercent, HttpCluster: httpClusterName, GrpcCluster: grpcClusterName, } + if isMirror { - route.Mirrors = append(route.Mirrors, clusterTraffic) + route.Mirror = &clusterTraffic } else { route.Clusters = append(route.Clusters, clusterTraffic) } + xds.Routes[routeName] = route } -func (xds *SeldonXDSCache) AddCluster( - name string, - routeName string, - modelName string, +func (xds *SeldonXDSCache) AddClustersForRoute( + routeName, modelName, httpClusterName, grpcClusterName string, modelVersion uint32, - isGrpc bool, + assignment []int, + server *store.ServerSnapshot, ) { - cluster, ok := xds.Clusters[name] + logger := xds.logger.WithField("func", "AddClustersForRoute") + + routeVersionKey := resources.RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion} + + httpCluster, ok := xds.Clusters[httpClusterName] if !ok { - cluster = resources.Cluster{ - Name: name, + httpCluster = resources.Cluster{ + Name: httpClusterName, Endpoints: make(map[string]resources.Endpoint), Routes: make(map[resources.RouteVersionKey]bool), - Grpc: isGrpc, + Grpc: false, } } - cluster.Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion}] = true - xds.Clusters[name] = cluster -} + xds.Clusters[httpClusterName] = httpCluster + httpCluster.Routes[routeVersionKey] = true -func (xds *SeldonXDSCache) removeRouteFromCluster(routeName string, route resources.Route, cluster resources.TrafficSplits) error { - httpCluster, ok := xds.Clusters[cluster.HttpCluster] + grpcCluster, ok := xds.Clusters[grpcClusterName] if !ok { - return fmt.Errorf("Can't find http cluster for route %s cluster %s route %+v", routeName, cluster.HttpCluster, route) - } - delete(httpCluster.Routes, resources.RouteVersionKey{RouteName: routeName, ModelName: cluster.ModelName, Version: cluster.ModelVersion}) - if len(httpCluster.Routes) == 0 { - delete(xds.Clusters, cluster.HttpCluster) + grpcCluster = resources.Cluster{ + Name: grpcClusterName, + Endpoints: make(map[string]resources.Endpoint), + Routes: make(map[resources.RouteVersionKey]bool), + Grpc: true, + } } - grpcCluster, ok := xds.Clusters[cluster.GrpcCluster] - if !ok { - return fmt.Errorf("Can't find grpc cluster for route %s cluster %s route %+v", routeName, cluster.GrpcCluster, route) - } - delete(grpcCluster.Routes, resources.RouteVersionKey{RouteName: routeName, ModelName: cluster.ModelName, Version: cluster.ModelVersion}) - if len(grpcCluster.Routes) == 0 { - delete(xds.Clusters, cluster.GrpcCluster) + for _, replicaIdx := range assignment { + replica, ok := server.Replicas[replicaIdx] + if !ok { + logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name) + } else { + httpEndpointName := fmt.Sprintf("%s:%d", replica.GetInferenceSvc(), replica.GetInferenceHttpPort()) + httpCluster.Endpoints[httpEndpointName] = resources.Endpoint{ + UpstreamHost: replica.GetInferenceSvc(), + UpstreamPort: uint32(replica.GetInferenceHttpPort()), + } + grpcEndpointName := fmt.Sprintf("%s:%d", replica.GetInferenceSvc(), replica.GetInferenceGrpcPort()) + grpcCluster.Endpoints[grpcEndpointName] = resources.Endpoint{ + UpstreamHost: replica.GetInferenceSvc(), + UpstreamPort: uint32(replica.GetInferenceGrpcPort()), + } + } } - return nil + xds.Clusters[grpcClusterName] = grpcCluster + grpcCluster.Routes[routeVersionKey] = true } func (xds *SeldonXDSCache) RemoveRoute(routeName string) error { @@ -352,13 +325,13 @@ func (xds *SeldonXDSCache) RemoveRoute(routeName string) error { } delete(xds.Routes, routeName) for _, cluster := range route.Clusters { - err := xds.removeRouteFromCluster(routeName, route, cluster) + err := xds.removeRouteFromCluster(route, cluster) if err != nil { return err } } - for _, mirror := range route.Mirrors { - err := xds.removeRouteFromCluster(routeName, route, mirror) + if route.Mirror != nil { + err := xds.removeRouteFromCluster(route, *route.Mirror) if err != nil { return err } @@ -366,13 +339,26 @@ func (xds *SeldonXDSCache) RemoveRoute(routeName string) error { return nil } -func (xds *SeldonXDSCache) AddEndpoint(clusterName, upstreamHost string, upstreamPort uint32) { - cluster := xds.Clusters[clusterName] - k := fmt.Sprintf("%s:%d", upstreamHost, upstreamPort) - cluster.Endpoints[k] = resources.Endpoint{ - UpstreamHost: upstreamHost, - UpstreamPort: upstreamPort, +func (xds *SeldonXDSCache) removeRouteFromCluster(route resources.Route, cluster resources.TrafficSplit) error { + removeCluster := func(route resources.Route, clusterName string, split resources.TrafficSplit) error { + cluster, ok := xds.Clusters[clusterName] + if !ok { + return fmt.Errorf("can't find cluster for route %s cluster %s route %+v", route.RouteName, clusterName, route) + } + delete(cluster.Routes, resources.RouteVersionKey{RouteName: route.RouteName, ModelName: split.ModelName, Version: split.ModelVersion}) + if len(cluster.Routes) == 0 { + delete(xds.Clusters, clusterName) + } + return nil } - xds.Clusters[clusterName] = cluster + err := removeCluster(route, cluster.HttpCluster, cluster) + if err != nil { + return err + } + err = removeCluster(route, cluster.GrpcCluster, cluster) + if err != nil { + return err + } + return nil } diff --git a/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go b/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go index e131c8b6c7..ae1bdc70c7 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go @@ -16,6 +16,8 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/sirupsen/logrus" + + "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" ) // Prevent compiler from optimising away benchmarks @@ -25,14 +27,15 @@ func benchmarkRouteContents(b *testing.B, numResources uint) { x := NewSeldonXDSCache(logrus.New(), nil) for n := 0; n < int(numResources); n++ { - x.AddPipelineRoute(strconv.Itoa(n), strconv.Itoa(n), 100, false) + x.AddPipelineRoute(strconv.Itoa(n), []resources.PipelineTrafficSplit{{PipelineName: strconv.Itoa(n), TrafficWeight: 100}}, nil) + x.AddRouteClusterTraffic( fmt.Sprintf("model-%d", n), fmt.Sprintf("model-%d", n), - 1, - 100, "http", "grpc", + 1, + 100, false, false, ) diff --git a/scheduler/pkg/envoy/xdscache/seldoncache_test.go b/scheduler/pkg/envoy/xdscache/seldoncache_test.go index c3682b03b9..ed1b6478c2 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache_test.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache_test.go @@ -17,9 +17,11 @@ import ( "github.com/otiai10/copy" log "github.com/sirupsen/logrus" + "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" ) // Checks a cluster remains until all routes are removed @@ -28,28 +30,22 @@ func TestAddRemoveHttpAndGrpcRoute(t *testing.T) { logger := log.New() c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) - addVersionedRoute := func(c *SeldonXDSCache, routeName string, modelName string, httpCluster string, grpcCluster string, traffic uint32, version uint32) { - c.AddCluster(httpCluster, routeName, modelName, version, false) - c.AddCluster(grpcCluster, routeName, modelName, version, true) - c.AddRouteClusterTraffic(routeName, modelName, version, traffic, httpCluster, grpcCluster, true, false) - c.AddEndpoint(httpCluster, "0.0.0.0", 9000) - c.AddEndpoint(grpcCluster, "0.0.0.0", 9001) - } - - httpCluster := "http1" - grpcCluster := "grpc1" + httpCluster := "m1_1_http" + grpcCluster := "m1_1_grpc" model1 := "m1" - model2 := "m2" - addVersionedRoute(c, model1, model1, httpCluster, grpcCluster, 100, 1) - addVersionedRoute(c, model2, model2, httpCluster, grpcCluster, 100, 1) + route1 := "r1" + route2 := "r2" + + addVersionedRoute(c, route1, model1, httpCluster, grpcCluster, 100, 1) + addVersionedRoute(c, route2, model1, httpCluster, grpcCluster, 100, 1) - err := c.RemoveRoute(model1) + err := c.RemoveRoute(route1) g.Expect(err).To(BeNil()) _, ok := c.Clusters[httpCluster] g.Expect(ok).To(BeTrue()) // http Cluster remains as r2 still connected _, ok = c.Clusters[grpcCluster] g.Expect(ok).To(BeTrue()) // grpc Cluster remains as r2 still connected - err = c.RemoveRoute(model2) + err = c.RemoveRoute(route2) g.Expect(err).To(BeNil()) _, ok = c.Clusters[httpCluster] g.Expect(ok).To(BeFalse()) // http Cluster removed @@ -62,59 +58,55 @@ func TestAddRemoveHttpAndGrpcRouteVersions(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - addVersionedRoute := func(c *SeldonXDSCache, routeName string, modelName string, httpCluster string, grpcCluster string, traffic uint32, version uint32) { - c.AddCluster(httpCluster, routeName, modelName, version, false) - c.AddCluster(grpcCluster, routeName, modelName, version, true) - c.AddRouteClusterTraffic(routeName, modelName, version, traffic, httpCluster, grpcCluster, true, false) - c.AddEndpoint(httpCluster, "0.0.0.0", 9000) - c.AddEndpoint(grpcCluster, "0.0.0.0", 9001) - } - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) - httpCluster := "http1" - grpcCluster := "grpc1" + httpCluster1 := "m1_1_http" + grpcCluster1 := "m1_1_grpc" + httpCluster2 := "m1_2_http" + grpcCluster2 := "m1_2_grpc" model1 := "m1" - model2 := "m2" - addVersionedRoute(c, model1, model1, httpCluster, grpcCluster, 40, 1) - addVersionedRoute(c, model1, model1, httpCluster, grpcCluster, 60, 2) + route1 := "r1" + route2 := "r2" + + addVersionedRoute(c, route1, model1, httpCluster1, grpcCluster1, 40, 1) + addVersionedRoute(c, route1, model1, httpCluster2, grpcCluster2, 60, 2) // check what we have added - g.Expect(len(c.Routes[model1].Clusters)).To(Equal(2)) - clusters := c.Routes[model1].Clusters + g.Expect(len(c.Routes[route1].Clusters)).To(Equal(2)) + clusters := c.Routes[route1].Clusters g.Expect(clusters[0].TrafficWeight).To(Equal(uint32(40))) g.Expect(clusters[1].TrafficWeight).To(Equal(uint32(60))) - g.Expect(len(c.Clusters[httpCluster].Endpoints)).To(Equal(1)) - g.Expect(len(c.Clusters[grpcCluster].Endpoints)).To(Equal(1)) - g.Expect(c.Clusters[httpCluster].Grpc).To(BeFalse()) - g.Expect(c.Clusters[grpcCluster].Grpc).To(BeTrue()) - g.Expect(c.Clusters[httpCluster].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[httpCluster].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model1, Version: 2}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(len(c.Clusters[httpCluster1].Endpoints)).To(Equal(1)) + g.Expect(len(c.Clusters[grpcCluster1].Endpoints)).To(Equal(1)) + g.Expect(c.Clusters[httpCluster1].Grpc).To(BeFalse()) + g.Expect(c.Clusters[grpcCluster1].Grpc).To(BeTrue()) + g.Expect(c.Clusters[httpCluster1].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster1].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpCluster2].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster2].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 2}]).To(BeTrue()) - addVersionedRoute(c, model2, model2, httpCluster, grpcCluster, 100, 1) + addVersionedRoute(c, route2, model1, httpCluster1, grpcCluster1, 100, 1) // check what we added - g.Expect(len(c.Routes[model2].Clusters)).To(Equal(1)) - clusters = c.Routes[model2].Clusters + g.Expect(len(c.Routes[route2].Clusters)).To(Equal(1)) + clusters = c.Routes[route2].Clusters g.Expect(clusters[0].TrafficWeight).To(Equal(uint32(100))) - g.Expect(c.Clusters[httpCluster].Routes[resources.RouteVersionKey{RouteName: model2, ModelName: model2, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster].Routes[resources.RouteVersionKey{RouteName: model2, ModelName: model2, Version: 1}]).To(BeTrue()) - g.Expect(len(c.Clusters[httpCluster].Endpoints)).To(Equal(1)) - g.Expect(len(c.Clusters[grpcCluster].Endpoints)).To(Equal(1)) + g.Expect(c.Clusters[httpCluster1].Routes[resources.RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster1].Routes[resources.RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(len(c.Clusters[httpCluster1].Endpoints)).To(Equal(1)) + g.Expect(len(c.Clusters[grpcCluster1].Endpoints)).To(Equal(1)) - err := c.RemoveRoute(model1) + err := c.RemoveRoute(route1) g.Expect(err).To(BeNil()) - _, ok := c.Clusters[httpCluster] + _, ok := c.Clusters[httpCluster1] g.Expect(ok).To(BeTrue()) // http Cluster remains as r2 still connected - _, ok = c.Clusters[grpcCluster] + _, ok = c.Clusters[grpcCluster1] g.Expect(ok).To(BeTrue()) // grpc Cluster remains as r2 still connected - err = c.RemoveRoute(model2) + err = c.RemoveRoute(route2) g.Expect(err).To(BeNil()) - _, ok = c.Clusters[httpCluster] + _, ok = c.Clusters[httpCluster1] g.Expect(ok).To(BeFalse()) // http Cluster removed - _, ok = c.Clusters[grpcCluster] + _, ok = c.Clusters[grpcCluster1] g.Expect(ok).To(BeFalse()) // grpc Cluster removed } @@ -123,42 +115,37 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForSameModel(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - addVersionedRoute := func(c *SeldonXDSCache, routeName string, modelName string, httpCluster string, grpcCluster string, traffic uint32, version uint32) { - c.AddCluster(httpCluster, routeName, modelName, version, false) - c.AddCluster(grpcCluster, routeName, modelName, version, true) - c.AddRouteClusterTraffic(routeName, modelName, version, traffic, httpCluster, grpcCluster, true, false) - c.AddEndpoint(httpCluster, "0.0.0.0", 9000) - c.AddEndpoint(grpcCluster, "0.0.0.0", 9001) - } - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) routeName := "r1" - httpCluster := "http1" - grpcCluster := "grpc1" + httpCluster1 := "m1_1_http" + grpcCluster1 := "m1_1_grpc" + httpCluster2 := "m1_2_http" + grpcCluster2 := "m1_2_grpc" model1 := "m1" - addVersionedRoute(c, routeName, model1, httpCluster, grpcCluster, 40, 1) - addVersionedRoute(c, routeName, model1, httpCluster, grpcCluster, 60, 2) + + addVersionedRoute(c, routeName, model1, httpCluster1, grpcCluster1, 40, 1) + addVersionedRoute(c, routeName, model1, httpCluster2, grpcCluster2, 60, 2) // check what we have added g.Expect(len(c.Routes[routeName].Clusters)).To(Equal(2)) clusters := c.Routes[routeName].Clusters g.Expect(clusters[0].TrafficWeight).To(Equal(uint32(40))) g.Expect(clusters[1].TrafficWeight).To(Equal(uint32(60))) - g.Expect(len(c.Clusters[httpCluster].Endpoints)).To(Equal(1)) - g.Expect(len(c.Clusters[grpcCluster].Endpoints)).To(Equal(1)) - g.Expect(c.Clusters[httpCluster].Grpc).To(BeFalse()) - g.Expect(c.Clusters[grpcCluster].Grpc).To(BeTrue()) - g.Expect(c.Clusters[httpCluster].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[httpCluster].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(len(c.Clusters[httpCluster1].Endpoints)).To(Equal(1)) + g.Expect(len(c.Clusters[grpcCluster1].Endpoints)).To(Equal(1)) + g.Expect(c.Clusters[httpCluster1].Grpc).To(BeFalse()) + g.Expect(c.Clusters[grpcCluster1].Grpc).To(BeTrue()) + g.Expect(c.Clusters[httpCluster1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpCluster2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) err := c.RemoveRoute(routeName) g.Expect(err).To(BeNil()) - _, ok := c.Clusters[httpCluster] + _, ok := c.Clusters[httpCluster1] g.Expect(ok).To(BeFalse()) // http Cluster removed - _, ok = c.Clusters[grpcCluster] + _, ok = c.Clusters[grpcCluster1] g.Expect(ok).To(BeFalse()) // grpc Cluster removed } @@ -167,40 +154,34 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentModels(t *testing.T) { g := NewGomegaWithT(t) logger := log.New() - addVersionedRoute := func(c *SeldonXDSCache, modelRouteName string, modelName string, httpCluster string, grpcCluster string, traffic uint32, version uint32) { - c.AddCluster(httpCluster, modelRouteName, modelName, version, false) - c.AddCluster(grpcCluster, modelRouteName, modelName, version, true) - c.AddRouteClusterTraffic(modelRouteName, modelName, version, traffic, httpCluster, grpcCluster, true, false) - c.AddEndpoint(httpCluster, "0.0.0.0", 9000) - c.AddEndpoint(grpcCluster, "0.0.0.0", 9001) - } - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) - httpClusterModel1 := "model1_http1" - grpcClusterModel1 := "model1_grpc1" - httpClusterModel2 := "model2_http1" - grpcClusterModel2 := "model2_grpc1" + httpClusterModel1 := "m1_1_http" + grpcClusterModel1 := "m1_1_grpc" + httpClusterModel2 := "m2_1_http" + grpcClusterModel2 := "m2_1_grpc" model1 := "m1" model2 := "m2" - addVersionedRoute(c, model1, model1, httpClusterModel1, grpcClusterModel1, 40, 1) - addVersionedRoute(c, model1, model2, httpClusterModel2, grpcClusterModel2, 60, 1) + routeName := "r1" + + addVersionedRoute(c, routeName, model1, httpClusterModel1, grpcClusterModel1, 40, 1) + addVersionedRoute(c, routeName, model2, httpClusterModel2, grpcClusterModel2, 60, 1) // check what we have added - g.Expect(len(c.Routes[model1].Clusters)).To(Equal(2)) - clusters := c.Routes[model1].Clusters + g.Expect(len(c.Routes[routeName].Clusters)).To(Equal(2)) + clusters := c.Routes[routeName].Clusters g.Expect(clusters[0].TrafficWeight).To(Equal(uint32(40))) g.Expect(clusters[1].TrafficWeight).To(Equal(uint32(60))) g.Expect(len(c.Clusters[httpClusterModel1].Endpoints)).To(Equal(1)) g.Expect(len(c.Clusters[grpcClusterModel1].Endpoints)).To(Equal(1)) g.Expect(c.Clusters[httpClusterModel1].Grpc).To(BeFalse()) g.Expect(c.Clusters[grpcClusterModel1].Grpc).To(BeTrue()) - g.Expect(c.Clusters[httpClusterModel1].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcClusterModel1].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[httpClusterModel2].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model2, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcClusterModel2].Routes[resources.RouteVersionKey{RouteName: model1, ModelName: model2, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpClusterModel1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcClusterModel1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpClusterModel2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcClusterModel2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) - err := c.RemoveRoute(model1) + err := c.RemoveRoute(routeName) g.Expect(err).To(BeNil()) _, ok := c.Clusters[httpClusterModel1] g.Expect(ok).To(BeFalse()) // http Cluster removed @@ -216,21 +197,14 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentRoutesSameModel(t *testing g := NewGomegaWithT(t) logger := log.New() - addVersionedRoute := func(c *SeldonXDSCache, modelRouteName string, modelName string, httpCluster string, grpcCluster string, traffic uint32, version uint32) { - c.AddCluster(httpCluster, modelRouteName, modelName, version, false) - c.AddCluster(grpcCluster, modelRouteName, modelName, version, true) - c.AddRouteClusterTraffic(modelRouteName, modelName, version, traffic, httpCluster, grpcCluster, true, false) - c.AddEndpoint(httpCluster, "0.0.0.0", 9000) - c.AddEndpoint(grpcCluster, "0.0.0.0", 9001) - } - c := NewSeldonXDSCache(logger, &PipelineGatewayDetails{}) route1 := "r1" route2 := "r2" - httpClusterModel1 := "model1_http1" - grpcClusterModel1 := "model1_grpc1" + httpClusterModel1 := "m1_1_http" + grpcClusterModel1 := "m1_1_grpc" model1 := "m1" + addVersionedRoute(c, route1, model1, httpClusterModel1, grpcClusterModel1, 100, 1) addVersionedRoute(c, route2, model1, httpClusterModel1, grpcClusterModel1, 100, 1) @@ -324,3 +298,57 @@ func TestSetupTLS(t *testing.T) { }) } } + +func addVersionedRoute(c *SeldonXDSCache, modelRouteName string, modelName string, httpCluster string, grpcCluster string, traffic uint32, version uint32) { + modelVersion := store.NewModelVersion( + &scheduler.Model{ + Meta: &scheduler.MetaData{Name: modelName}, + DeploymentSpec: &scheduler.DeploymentSpec{LogPayloads: false}, + }, + version, + "server", + map[int]store.ReplicaStatus{ + 1: {State: store.Loaded}, + }, + false, + store.ModelAvailable, + ) + + addCluster(c, httpCluster, modelRouteName, modelName, version, false) + addCluster(c, grpcCluster, modelRouteName, modelName, version, true) + c.AddRouteClusterTraffic(modelRouteName, modelName, httpCluster, grpcCluster, modelVersion.GetVersion(), traffic, false, false) + addEndpoint(c, httpCluster, "0.0.0.0", 9000) + addEndpoint(c, grpcCluster, "0.0.0.0", 9001) +} + +func addCluster( + xds *SeldonXDSCache, + name string, + routeName string, + modelName string, + modelVersion uint32, + isGrpc bool, +) { + cluster, ok := xds.Clusters[name] + if !ok { + cluster = resources.Cluster{ + Name: name, + Endpoints: make(map[string]resources.Endpoint), + Routes: make(map[resources.RouteVersionKey]bool), + Grpc: isGrpc, + } + } + cluster.Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion}] = true + xds.Clusters[name] = cluster +} + +func addEndpoint(xds *SeldonXDSCache, clusterName, upstreamHost string, upstreamPort uint32) { + cluster := xds.Clusters[clusterName] + k := fmt.Sprintf("%s:%d", upstreamHost, upstreamPort) + cluster.Endpoints[k] = resources.Endpoint{ + UpstreamHost: upstreamHost, + UpstreamPort: upstreamPort, + } + + xds.Clusters[clusterName] = cluster +}