Skip to content

Commit

Permalink
refactor(envoy): refactoring and optimising the components that build…
Browse files Browse the repository at this point in the history
… envoy config (#6119)

* adding a proper test

* move some stuff around and delete unused methods

* linting

* cleaning up the test

* some renaming and test updates

* merging v2

* fix rename

* fixing test

* add mirror to route configs

* reverting some test data

* fixing a bug

* changing structure for clarity

* fix model name

* some more optimisations

* cleaning up

* linting

* fix logger

* PR review changes
  • Loading branch information
driev authored Dec 5, 2024
1 parent 881ce1f commit c431854
Show file tree
Hide file tree
Showing 10 changed files with 759 additions and 758 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
oos: linux
goarch: amd64
pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor
cpu: 12th Gen Intel(R) Core(TM) i7-12800H
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 261 4293734 ns/op 1728869 B/op 20354 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 57 19236652 ns/op 11745105 B/op 148552 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 15 165481121 ns/op 306686874 B/op 4121256 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 1 1449165416 ns/op 4010677584 B/op 54872332 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 100 17107753 ns/op 21719815 B/op 273169 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 27 66313339 ns/op 118376950 B/op 1388585 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 3 519671198 ns/op 967958834 B/op 11993174 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 4341797404 ns/op 13883915576 B/op 181619840 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 6369 162639 ns/op 681659 B/op 6429 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 10 102798616 ns/op 264589261 B/op 3582995 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 117708448 ns/op 348929572 B/op 4632386 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 2 683663227 ns/op 2238235876 B/op 29665474 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 12 87394856 ns/op 297229676 B/op 4071575 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 114736702 ns/op 374778539 B/op 4887162 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 340307094 ns/op 1366063312 B/op 16472416 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 2402863570 ns/op 7096934200 B/op 93288518 allocs/op
PASS
ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 45.753s


After


goos: linux
goarch: amd64
pkg: github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor
cpu: 12th Gen Intel(R) Core(TM) i7-12800H
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_10ms-20 99 10503076 ns/op 5459504 B/op 75936 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_10ms-20 100 12329506 ns/op 11528623 B/op 147768 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_10ms-20 32 57135215 ns/op 200141562 B/op 2638843 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_10ms-20 3 425881058 ns/op 1155633304 B/op 15271185 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_10ms-20 97 12796395 ns/op 49125454 B/op 644298 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_10ms-20 34 33008128 ns/op 173017588 B/op 2127563 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_10ms-20 4 324180730 ns/op 1990164076 B/op 25634294 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_10ms-20 1 2786099232 ns/op 16107104968 B/op 215067138 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_1_Batch_100ms-20 5761 230485 ns/op 1276192 B/op 14658 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_1_Batch_100ms-20 12 95563752 ns/op 368770140 B/op 5040953 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_1_Batch_100ms-20 9 118337597 ns/op 345704658 B/op 4589376 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_1_Batch_100ms-20 3 531757210 ns/op 2030309925 B/op 26656810 allocs/op
BenchmarkModelUpdate_Models_10_Replicas_10_Batch_100ms-20 18 103613236 ns/op 149228799 B/op 1985478 allocs/op
BenchmarkModelUpdate_Models_100_Replicas_10_Batch_100ms-20 9 113923163 ns/op 225481680 B/op 2822586 allocs/op
BenchmarkModelUpdate_Models_1_000_Replicas_10_Batch_100ms-20 6 272610358 ns/op 945017966 B/op 10781455 allocs/op
BenchmarkModelUpdate_Models_10_000_Replicas_10_Batch_100ms-20 1 1734657705 ns/op 3920402984 B/op 49687374 allocs/op
PASS
ok github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/processor 37.457s
98 changes: 51 additions & 47 deletions scheduler/pkg/envoy/processor/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewIncrementalProcessor(
cache: cache,
nodeID: nodeID,
snapshotVersion: rand.Int63n(1000),
logger: log.WithField("source", "EnvoyServer"),
logger: log.WithField("source", "IncrementalProcessor"),
xdsCache: xdscache.NewSeldonXDSCache(log, pipelineGatewayDetails),
modelStore: modelStore,
experimentServer: experimentServer,
Expand All @@ -91,7 +91,7 @@ func NewIncrementalProcessor(
batchTriggerManual: nil,
}

err := ip.setListeners()
err := ip.init()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -154,22 +154,26 @@ func (p *IncrementalProcessor) handleExperimentEvents(event coordinator.Experime
} else {
if event.UpdatedExperiment {
err := p.experimentUpdate(exp)
var err2 error
if err != nil {
logger.WithError(err).Errorf("Failed to process sync for experiment %s", event.String())
err2 = p.experimentServer.SetStatus(event.ExperimentName, false, err.Error())
p.setExperimentStatus(event, false, err.Error())
} else {
err2 = p.experimentServer.SetStatus(event.ExperimentName, true, "experiment active")
}
if err2 != nil {
logger.WithError(err2).Errorf("Failed to set experiment activation")
p.setExperimentStatus(event, true, "experiment active")
}
}
}
}
}()
}

func (p *IncrementalProcessor) setExperimentStatus(event coordinator.ExperimentEventMsg, active bool, msg string) {
logger := p.logger.WithField("func", "setExperimentStatus")
err := p.experimentServer.SetStatus(event.ExperimentName, active, msg)
if err != nil {
logger.WithError(err).Errorf("Failed to set experiment activation")
}
}

func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg) {
logger := p.logger.WithField("func", "handleModelEvents")
logger.Debugf("Received sync for model %s", event.String())
Expand All @@ -182,14 +186,15 @@ func (p *IncrementalProcessor) handleModelEvents(event coordinator.ModelEventMsg
}()
}

func (p *IncrementalProcessor) setListeners() error {
func (p *IncrementalProcessor) init() error {
p.mu.Lock()
defer p.mu.Unlock()
err := p.xdsCache.SetupTLS()
if err != nil {
return err
}
p.xdsCache.AddListeners()
p.xdsCache.AddPermanentListeners()
p.xdsCache.AddPermanentClusters()
return nil
}

Expand Down Expand Up @@ -246,45 +251,32 @@ func (p *IncrementalProcessor) removeRouteForServerInEnvoyCache(routeName string
return nil
}

func (p *IncrementalProcessor) updateEnvoyForModelVersion(modelRouteName string, modelVersion *store.ModelVersion, server *store.ServerSnapshot, trafficPercent uint32, isMirror bool) {
func (p *IncrementalProcessor) updateEnvoyForModelVersion(routeName string, modelVersion *store.ModelVersion, server *store.ServerSnapshot, trafficPercent uint32, isMirror bool) {
logger := p.logger.WithField("func", "updateEnvoyForModelVersion")

assignment := modelVersion.GetAssignment() // Get loaded replicas for model
assignment := modelVersion.GetAssignment()
if len(assignment) == 0 {
logger.Debugf("No assigned replicas so returning for %s", modelRouteName)
logger.Debugf("Not updating route: %s - no assigned replicas for %v", routeName, modelVersion)
return
}

clusterNameBase := modelVersion.GetMeta().GetName() + "_" + strconv.FormatInt(int64(modelVersion.GetVersion()), 10)
httpClusterName := clusterNameBase + "_http"
grpcClusterName := clusterNameBase + "_grpc"
p.xdsCache.AddCluster(httpClusterName, modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), false)
for _, replicaIdx := range assignment {
replica, ok := server.Replicas[replicaIdx]
if !ok {
logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name)
} else {
p.xdsCache.AddEndpoint(httpClusterName, replica.GetInferenceSvc(), uint32(replica.GetInferenceHttpPort()))
}
}
p.xdsCache.AddCluster(grpcClusterName, modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), true)
for _, replicaIdx := range assignment {
replica, ok := server.Replicas[replicaIdx]
if !ok {
logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name)
} else {
p.xdsCache.AddEndpoint(grpcClusterName, replica.GetInferenceSvc(), uint32(replica.GetInferenceGrpcPort()))
}
}
modelName := modelVersion.GetMeta().GetName()
modelVersionNumber := modelVersion.GetVersion()
httpClusterName, grpcClusterName := getClusterNames(modelName, modelVersionNumber)
p.xdsCache.AddClustersForRoute(routeName, modelName, httpClusterName, grpcClusterName, modelVersionNumber, assignment, server)

logPayloads := false
if modelVersion.GetDeploymentSpec() != nil {
logPayloads = modelVersion.GetDeploymentSpec().LogPayloads
} else {
logger.Warnf("model %s has not deployment spec", modelVersion.GetModel().GetMeta().GetName())
logger.Warnf("model %s has not deployment spec", modelName)
}
p.xdsCache.AddRouteClusterTraffic(routeName, modelName, httpClusterName, grpcClusterName, modelVersionNumber, trafficPercent, logPayloads, isMirror)
}

p.xdsCache.AddRouteClusterTraffic(modelRouteName, modelVersion.GetModel().GetMeta().GetName(), modelVersion.GetVersion(), trafficPercent, httpClusterName, grpcClusterName, logPayloads, isMirror)
func getClusterNames(modelVersion string, modelVersionNumber uint32) (string, string) {
clusterNameBase := modelVersion + "_" + strconv.FormatInt(int64(modelVersionNumber), 10)
httpClusterName := clusterNameBase + "_http"
grpcClusterName := clusterNameBase + "_grpc"
return httpClusterName, grpcClusterName
}

func getTrafficShare(latestModel *store.ModelVersion, lastAvailableModelVersion *store.ModelVersion, weight uint32) (uint32, uint32) {
Expand All @@ -305,7 +297,7 @@ func (p *IncrementalProcessor) addModelTraffic(routeName string, model *store.Mo
if latestModel == nil {
logger.Infof("latest model is nil for model %s route %s", model.Name, routeName)
}
return fmt.Errorf("No live replica for model %s for model route %s", model.Name, routeName)
return fmt.Errorf("no live replica for model %s for model route %s", model.Name, routeName)
}

server, err := p.modelStore.GetServer(latestModel.Server(), false, false)
Expand All @@ -321,13 +313,15 @@ func (p *IncrementalProcessor) addModelTraffic(routeName string, model *store.Mo
logger.WithError(err).Errorf("Failed to find server %s for last available model %s", lastAvailableModelVersion.Server(), modelName)
return err
}

logger.Debugf("Splitting traffic between latest %s:%d %d percent and %s:%d %d percent",
modelName,
latestModel.GetVersion(),
trafficLatestModel,
modelName,
lastAvailableModelVersion.GetVersion(),
trafficLastAvailableModel)

p.updateEnvoyForModelVersion(routeName, lastAvailableModelVersion, lastAvailableServer, trafficLastAvailableModel, isMirror)
p.updateEnvoyForModelVersion(routeName, latestModel, server, trafficLatestModel, isMirror)
} else {
Expand Down Expand Up @@ -395,12 +389,19 @@ func (p *IncrementalProcessor) addModel(model *store.ModelSnapshot) error {
func (p *IncrementalProcessor) addTrafficForExperiment(routeName string, exp *experiment.Experiment) error {
switch exp.ResourceType {
case experiment.PipelineResourceType:

var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
p.xdsCache.AddPipelineRoute(routeName, candidate.Name, candidate.Weight, false)
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
p.xdsCache.AddPipelineRoute(routeName, exp.Mirror.Name, exp.Mirror.Percent, true)
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)

case experiment.ModelResourceType:
for _, candidate := range exp.Candidates {
candidateModel, err := p.modelStore.GetModel(candidate.Name)
Expand Down Expand Up @@ -494,17 +495,20 @@ func (p *IncrementalProcessor) addPipeline(pipelineName string) error {
if exp.Deleted {
return fmt.Errorf("Experiment on pipeline %s, but %s is deleted", pip.Name, *exp.Default)
}
var mirrorSplit *resources.PipelineTrafficSplit
trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates))

for _, candidate := range exp.Candidates {
logger.Infof("Adding pipeline experiment candidate %s %s %d", routeName, candidate.Name, candidate.Weight)
p.xdsCache.AddPipelineRoute(routeName, candidate.Name, candidate.Weight, false)
trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight})
}
if exp.Mirror != nil {
logger.Infof("Adding pipeline experiment mirror %s %s %d", routeName, exp.Mirror.Name, exp.Mirror.Percent)
p.xdsCache.AddPipelineRoute(routeName, exp.Mirror.Name, exp.Mirror.Percent, true)
mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent}
}

p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit)
} else {
logger.Infof("Adding normal pipeline route %s", routeName)
p.xdsCache.AddPipelineRoute(routeName, pip.Name, 100, false)
p.xdsCache.AddPipelineRoute(routeName, []resources.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil)
}

return p.updateEnvoy()
Expand Down Expand Up @@ -548,7 +552,7 @@ func (p *IncrementalProcessor) modelUpdate(modelName string) error {

model, err := p.modelStore.GetModel(modelName)
if err != nil {
logger.WithError(err).Warnf("Failed to sync model %s", modelName)
logger.WithError(err).Warnf("sync: Failed to sync model %s", modelName)
if err := p.removeRouteForServerInEnvoyCache(modelName); err != nil {
logger.WithError(err).Errorf("Failed to remove model route from envoy %s", modelName)
p.modelStore.UnlockModel(modelName)
Expand Down
22 changes: 15 additions & 7 deletions scheduler/pkg/envoy/processor/incremental_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestRollingUpdate(t *testing.T) {
experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil),
pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore),
}
inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
for _, op := range test.ops {
op(inc, g)
}
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestDraining(t *testing.T) {
experimentServer: experiment.NewExperimentServer(log.New(), nil, nil, nil),
pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore),
}
inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
for _, op := range test.ops {
op(inc, g)
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func TestModelSync(t *testing.T) {
pipelineHandler: pipeline.NewPipelineStore(log.New(), nil, modelStore),
pendingModelVersions: test.pendingModelVersions,
}
inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
for _, op := range test.ops {
op(inc, g)
}
Expand Down Expand Up @@ -827,7 +827,8 @@ func TestEnvoySettings(t *testing.T) {
inc.handlePipelinesEvents,
)

inc.xdsCache.AddListeners()
inc.xdsCache.AddPermanentListeners()
inc.xdsCache.AddPermanentClusters()
for _, op := range test.ops {
op(inc, g)
time.Sleep(50 * time.Millisecond) // to allow event handlers to process
Expand Down Expand Up @@ -974,7 +975,7 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
for _, route := range virtualHost.Routes {
trafficSplit := resources.Route{
RouteName: route.Name,
Clusters: make([]resources.TrafficSplits, 0),
Clusters: make([]resources.TrafficSplit, 0),
}

clusterSpecificer := route.GetRoute().GetClusterSpecifier()
Expand All @@ -986,20 +987,27 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route {
weightedClusters := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_WeightedClusters)

for _, weightedCluster := range weightedClusters.WeightedClusters.Clusters {
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplits{
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
ModelName: weightedCluster.Name,
TrafficWeight: weightedCluster.Weight.Value,
})
}
case *routev3.RouteAction_Cluster:
cluster := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_Cluster)
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplits{
trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{
ModelName: cluster.Cluster,
TrafficWeight: 100,
})

}

if len(route.GetRoute().RequestMirrorPolicies) > 0 {
mirror := route.GetRoute().RequestMirrorPolicies[0]
trafficSplit.Mirror = &resources.TrafficSplit{ModelName: mirror.Cluster, TrafficWeight: mirror.RuntimeFraction.DefaultValue.Numerator}
}

trafficSplits = append(trafficSplits, trafficSplit)

}

return trafficSplits
Expand Down
2 changes: 1 addition & 1 deletion scheduler/pkg/envoy/processor/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestFetch(t *testing.T) {
nodeID: "node_1",
}

err = inc.setListeners()
err = inc.init()
g.Expect(err).To(BeNil())

conn, err := grpc.NewClient(":"+strconv.Itoa(port), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand Down
13 changes: 6 additions & 7 deletions scheduler/pkg/envoy/resources/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ type Listener struct {
Address string
Port uint32
RouteConfigurationName string
//RouteNames []string
}

type Route struct {
RouteName string
LogPayloads bool
Clusters []TrafficSplits
Mirrors []TrafficSplits
Clusters []TrafficSplit
Mirror *TrafficSplit
}

type TrafficSplits struct {
type TrafficSplit struct {
ModelName string
ModelVersion uint32
TrafficWeight uint32
Expand Down Expand Up @@ -61,11 +60,11 @@ type Endpoint struct {

type PipelineRoute struct {
RouteName string
Clusters []PipelineTrafficSplits
Mirrors []PipelineTrafficSplits
Clusters []PipelineTrafficSplit
Mirror *PipelineTrafficSplit
}

type PipelineTrafficSplits struct {
type PipelineTrafficSplit struct {
PipelineName string
TrafficWeight uint32
}
Expand Down
Loading

0 comments on commit c431854

Please sign in to comment.