diff --git a/scheduler/pkg/agent/rproxy.go b/scheduler/pkg/agent/rproxy.go index f0b3d4a2d7..db7a067590 100644 --- a/scheduler/pkg/agent/rproxy.go +++ b/scheduler/pkg/agent/rproxy.go @@ -29,7 +29,6 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/interfaces" "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -76,14 +75,14 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e var originalBody []byte var err error - internalModelName := req.Header.Get(resources.SeldonInternalModelHeader) + internalModelName := req.Header.Get(util.SeldonInternalModelHeader) // externalModelName is the name of the model as it is known to the client, we should not use - // resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy) - // however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader + // util.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy) + // however for the metrics we need the actual model name and this is done by using util.SeldonInternalModelHeader externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName) if err != nil { t.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName) - externalModelName = req.Header.Get(resources.SeldonModelHeader) + externalModelName = req.Header.Get(util.SeldonModelHeader) } // to sync between scalingMetricsSetup and scalingMetricsTearDown calls running in go routines @@ -121,7 +120,7 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e // in the case of triton, a request to a model that is not found is considered a bad request // this is likely to increase latency for genuine bad requests as we will retry twice if res.StatusCode == http.StatusNotFound || res.StatusCode == http.StatusBadRequest { - internalModelName := req.Header.Get(resources.SeldonInternalModelHeader) + internalModelName := req.Header.Get(util.SeldonInternalModelHeader) if v2Err := t.loader(internalModelName); v2Err != nil { t.logger.WithError(v2Err).Warnf("cannot load model %s", internalModelName) } @@ -143,26 +142,26 @@ func (t *lazyModelLoadTransport) RoundTrip(req *http.Request) (*http.Response, e func (rp *reverseHTTPProxy) addHandlers(proxy http.Handler) http.Handler { return otelhttp.NewHandler(rp.metrics.AddModelHistogramMetricsHandler(func(w http.ResponseWriter, r *http.Request) { startTime := time.Now() - rp.logger.Debugf("Received request with host %s and internal header %v", r.Host, r.Header.Values(resources.SeldonInternalModelHeader)) + rp.logger.Debugf("Received request with host %s and internal header %v", r.Host, r.Header.Values(util.SeldonInternalModelHeader)) rewriteHostHandler(r) - internalModelName := r.Header.Get(resources.SeldonInternalModelHeader) + internalModelName := r.Header.Get(util.SeldonInternalModelHeader) // externalModelName is the name of the model as it is known to the client, we should not use - // resources.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy) - // however for the metrics we need the actual model name and this is done by using resources.SeldonInternalModelHeader + // util.SeldonModelHeader though as it can contain the experiment tag (used for routing by envoy) + // however for the metrics we need the actual model name and this is done by using util.SeldonInternalModelHeader externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName) if err != nil { rp.logger.WithError(err).Warnf("cannot extract model name from %s, revert to actual header", internalModelName) - externalModelName = r.Header.Get(resources.SeldonModelHeader) + externalModelName = r.Header.Get(util.SeldonModelHeader) } //TODO should we return a 404 if headers not found? if externalModelName == "" || internalModelName == "" { - rp.logger.Warnf("Failed to extract model name %s:[%s] %s:[%s]", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName) + rp.logger.Warnf("Failed to extract model name %s:[%s] %s:[%s]", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName) proxy.ServeHTTP(w, r) return } else { - rp.logger.Debugf("Extracted model name %s:%s %s:%s", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName) + rp.logger.Debugf("Extracted model name %s:%s %s:%s", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName) } if err := rp.stateManager.EnsureLoadModel(internalModelName); err != nil { diff --git a/scheduler/pkg/agent/rproxy_grpc.go b/scheduler/pkg/agent/rproxy_grpc.go index 8a3cdc2570..740fbcfb21 100644 --- a/scheduler/pkg/agent/rproxy_grpc.go +++ b/scheduler/pkg/agent/rproxy_grpc.go @@ -31,7 +31,6 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling" "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelserver_controlplane/oip" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -169,10 +168,10 @@ func (rp *reverseGRPCProxy) extractModelNamesFromContext(ctx context.Context) (s var internalModelName, externalModelName string var inHeader bool if internalModelName, externalModelName, inHeader = extractModelNamesFromHeaders(ctx); inHeader { - rp.logger.Debugf("Extracted model name %s:%s %s:%s", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName) + rp.logger.Debugf("Extracted model name %s:%s %s:%s", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName) return internalModelName, externalModelName, nil } else { - msg := fmt.Sprintf("Failed to extract model name %s:[%s] %s:[%s]", resources.SeldonInternalModelHeader, internalModelName, resources.SeldonModelHeader, externalModelName) + msg := fmt.Sprintf("Failed to extract model name %s:[%s] %s:[%s]", util.SeldonInternalModelHeader, internalModelName, util.SeldonModelHeader, externalModelName) rp.logger.Error(msg) return "", "", status.Error(codes.FailedPrecondition, msg) } @@ -354,10 +353,10 @@ func extractHeader(key string, md metadata.MD) string { func extractModelNamesFromHeaders(ctx context.Context) (string, string, bool) { md, ok := metadata.FromIncomingContext(ctx) if ok { - internalModelName := extractHeader(resources.SeldonInternalModelHeader, md) + internalModelName := extractHeader(util.SeldonInternalModelHeader, md) externalModelName, _, err := util.GetOrignalModelNameAndVersion(internalModelName) if err != nil { - externalModelName = extractHeader(resources.SeldonModelHeader, md) + externalModelName = extractHeader(util.SeldonModelHeader, md) } return internalModelName, externalModelName, internalModelName != "" && externalModelName != "" } diff --git a/scheduler/pkg/agent/rproxy_grpc_test.go b/scheduler/pkg/agent/rproxy_grpc_test.go index 7ea196c086..34af87fb96 100644 --- a/scheduler/pkg/agent/rproxy_grpc_test.go +++ b/scheduler/pkg/agent/rproxy_grpc_test.go @@ -26,7 +26,6 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/internal/testing_utils" "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils" "github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" @@ -120,21 +119,21 @@ func TestReverseGRPCServiceSmoke(t *testing.T) { doInfer := func(modelSuffixInternal, modelSuffix string) (*v2.ModelInferResponse, error) { client := v2.NewGRPCInferenceServiceClient(conn) ctx := context.Background() - ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffixInternal, resources.SeldonModelHeader, dummyModelNamePrefix+modelSuffix) + ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffixInternal, util.SeldonModelHeader, dummyModelNamePrefix+modelSuffix) return client.ModelInfer(ctx, &v2.ModelInferRequest{ModelName: dummyModelNamePrefix}) // note without suffix } doMeta := func(modelSuffix string) (*v2.ModelMetadataResponse, error) { client := v2.NewGRPCInferenceServiceClient(conn) ctx := context.Background() - ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix) + ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, util.SeldonModelHeader, dummyModelNamePrefix) return client.ModelMetadata(ctx, &v2.ModelMetadataRequest{Name: dummyModelNamePrefix}) // note without suffix } doModelReady := func(modelSuffix string) (*v2.ModelReadyResponse, error) { client := v2.NewGRPCInferenceServiceClient(conn) ctx := context.Background() - ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, resources.SeldonModelHeader, dummyModelNamePrefix) + ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonInternalModelHeader, dummyModelNamePrefix+modelSuffix, util.SeldonModelHeader, dummyModelNamePrefix) return client.ModelReady(ctx, &v2.ModelReadyRequest{Name: dummyModelNamePrefix}) // note without suffix } diff --git a/scheduler/pkg/agent/rproxy_test.go b/scheduler/pkg/agent/rproxy_test.go index 8d5ccc7204..6a63e93fa7 100644 --- a/scheduler/pkg/agent/rproxy_test.go +++ b/scheduler/pkg/agent/rproxy_test.go @@ -29,7 +29,6 @@ import ( "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/interfaces" "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/internal/testing_utils" "github.com/seldonio/seldon-core/scheduler/v2/pkg/agent/modelscaling" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" testing_utils2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils" "github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" @@ -272,8 +271,8 @@ func TestReverseProxySmoke(t *testing.T) { req, err := http.NewRequest(http.MethodPost, url, nil) g.Expect(err).To(BeNil()) req.Header.Set("contentType", "application/json") - req.Header.Set(resources.SeldonModelHeader, test.modelExternalHeader) - req.Header.Set(resources.SeldonInternalModelHeader, test.modelToRequest) + req.Header.Set(util.SeldonModelHeader, test.modelExternalHeader) + req.Header.Set(util.SeldonInternalModelHeader, test.modelToRequest) resp, err := http.DefaultClient.Do(req) g.Expect(err).To(BeNil()) @@ -459,7 +458,7 @@ func TestLazyLoadRoundTripper(t *testing.T) { httpClient.Transport = &lazyModelLoadTransport{ loader, http.DefaultTransport, metricsHandler, modelScalingStatsCollector, log.New()} mockMLServerState.setModelServerUnloaded(dummyModel) - req.Header.Set(resources.SeldonInternalModelHeader, dummyModel) + req.Header.Set(util.SeldonInternalModelHeader, dummyModel) resp, err := httpClient.Do(req) g.Expect(err).To(BeNil()) g.Expect(resp.StatusCode).To(Equal(http.StatusOK)) diff --git a/scheduler/pkg/envoy/processor/incremental.go b/scheduler/pkg/envoy/processor/incremental.go index 4ae707ea2e..f45ced38ad 100644 --- a/scheduler/pkg/envoy/processor/incremental.go +++ b/scheduler/pkg/envoy/processor/incremental.go @@ -24,7 +24,6 @@ import ( "github.com/sirupsen/logrus" "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache" "github.com/seldonio/seldon-core/scheduler/v2/pkg/scheduler/cleaner" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" @@ -390,14 +389,14 @@ func (p *IncrementalProcessor) addTrafficForExperiment(routeName string, exp *ex switch exp.ResourceType { case experiment.PipelineResourceType: - var mirrorSplit *resources.PipelineTrafficSplit - trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates)) + var mirrorSplit *xdscache.PipelineTrafficSplit + trafficSplits := make([]xdscache.PipelineTrafficSplit, len(exp.Candidates)) for _, candidate := range exp.Candidates { - trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight}) + trafficSplits = append(trafficSplits, xdscache.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight}) } if exp.Mirror != nil { - mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent} + mirrorSplit = &xdscache.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent} } p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit) @@ -463,7 +462,7 @@ func (p *IncrementalProcessor) removeExperiment(exp *experiment.Experiment) erro } func getPipelineRouteName(pipelineName string) string { - return fmt.Sprintf("%s.%s", pipelineName, resources.SeldonPipelineHeaderSuffix) + return fmt.Sprintf("%s.%s", pipelineName, util.SeldonPipelineHeaderSuffix) } // TODO make envoy updates for pipelines batched @@ -495,20 +494,20 @@ func (p *IncrementalProcessor) addPipeline(pipelineName string) error { if exp.Deleted { return fmt.Errorf("Experiment on pipeline %s, but %s is deleted", pip.Name, *exp.Default) } - var mirrorSplit *resources.PipelineTrafficSplit - trafficSplits := make([]resources.PipelineTrafficSplit, len(exp.Candidates)) + var mirrorSplit *xdscache.PipelineTrafficSplit + trafficSplits := make([]xdscache.PipelineTrafficSplit, len(exp.Candidates)) for _, candidate := range exp.Candidates { - trafficSplits = append(trafficSplits, resources.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight}) + trafficSplits = append(trafficSplits, xdscache.PipelineTrafficSplit{PipelineName: candidate.Name, TrafficWeight: candidate.Weight}) } if exp.Mirror != nil { - mirrorSplit = &resources.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent} + mirrorSplit = &xdscache.PipelineTrafficSplit{PipelineName: exp.Mirror.Name, TrafficWeight: exp.Mirror.Percent} } p.xdsCache.AddPipelineRoute(routeName, trafficSplits, mirrorSplit) } else { logger.Infof("Adding normal pipeline route %s", routeName) - p.xdsCache.AddPipelineRoute(routeName, []resources.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil) + p.xdsCache.AddPipelineRoute(routeName, []xdscache.PipelineTrafficSplit{{PipelineName: pip.Name, TrafficWeight: 100}}, nil) } return p.updateEnvoy() diff --git a/scheduler/pkg/envoy/processor/incremental_test.go b/scheduler/pkg/envoy/processor/incremental_test.go index 271c6a2528..4ef6317f89 100644 --- a/scheduler/pkg/envoy/processor/incremental_test.go +++ b/scheduler/pkg/envoy/processor/incremental_test.go @@ -31,7 +31,6 @@ import ( "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/xdscache" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/experiment" @@ -969,13 +968,13 @@ func createSnapshot(g Gomega, resources []types.Resource, filename string) { g.Expect(err).To(BeNil()) } -func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route { - trafficSplits := make([]resources.Route, 0) +func getTrafficSplits(virtualHost *routev3.VirtualHost) []xdscache.Route { + trafficSplits := make([]xdscache.Route, 0) for _, route := range virtualHost.Routes { - trafficSplit := resources.Route{ + trafficSplit := xdscache.Route{ RouteName: route.Name, - Clusters: make([]resources.TrafficSplit, 0), + Clusters: make([]xdscache.TrafficSplit, 0), } clusterSpecificer := route.GetRoute().GetClusterSpecifier() @@ -987,14 +986,14 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route { weightedClusters := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_WeightedClusters) for _, weightedCluster := range weightedClusters.WeightedClusters.Clusters { - trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{ + trafficSplit.Clusters = append(trafficSplit.Clusters, xdscache.TrafficSplit{ ModelName: weightedCluster.Name, TrafficWeight: weightedCluster.Weight.Value, }) } case *routev3.RouteAction_Cluster: cluster := route.GetRoute().GetClusterSpecifier().(*routev3.RouteAction_Cluster) - trafficSplit.Clusters = append(trafficSplit.Clusters, resources.TrafficSplit{ + trafficSplit.Clusters = append(trafficSplit.Clusters, xdscache.TrafficSplit{ ModelName: cluster.Cluster, TrafficWeight: 100, }) @@ -1003,7 +1002,7 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route { if len(route.GetRoute().RequestMirrorPolicies) > 0 { mirror := route.GetRoute().RequestMirrorPolicies[0] - trafficSplit.Mirror = &resources.TrafficSplit{ModelName: mirror.Cluster, TrafficWeight: mirror.RuntimeFraction.DefaultValue.Numerator} + trafficSplit.Mirror = &xdscache.TrafficSplit{ModelName: mirror.Cluster, TrafficWeight: mirror.RuntimeFraction.DefaultValue.Numerator} } trafficSplits = append(trafficSplits, trafficSplit) @@ -1013,12 +1012,12 @@ func getTrafficSplits(virtualHost *routev3.VirtualHost) []resources.Route { return trafficSplits } -func getEndpoints(loadAssignment *endpointv3.ClusterLoadAssignment) []resources.Endpoint { - endpoints := make([]resources.Endpoint, 0) +func getEndpoints(loadAssignment *endpointv3.ClusterLoadAssignment) []xdscache.Endpoint { + endpoints := make([]xdscache.Endpoint, 0) for _, localityLbEndpoint := range loadAssignment.Endpoints { for _, lbEndpoint := range localityLbEndpoint.LbEndpoints { endpointEndpoint := lbEndpoint.HostIdentifier.(*endpointv3.LbEndpoint_Endpoint) - endpoints = append(endpoints, resources.Endpoint{ + endpoints = append(endpoints, xdscache.Endpoint{ UpstreamHost: endpointEndpoint.Endpoint.Address.GetSocketAddress().Address, UpstreamPort: endpointEndpoint.Endpoint.Address.GetSocketAddress().GetPortValue(), }) diff --git a/scheduler/pkg/envoy/resources/cache.go b/scheduler/pkg/envoy/xdscache/cache.go similarity index 98% rename from scheduler/pkg/envoy/resources/cache.go rename to scheduler/pkg/envoy/xdscache/cache.go index 1d63e0ae1d..b21fc5dd99 100644 --- a/scheduler/pkg/envoy/resources/cache.go +++ b/scheduler/pkg/envoy/xdscache/cache.go @@ -7,7 +7,7 @@ Use of this software is governed by the Change License after the Change Date as each is defined in accordance with the LICENSE file. */ -package resources +package xdscache import "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" diff --git a/scheduler/pkg/envoy/resources/resource.go b/scheduler/pkg/envoy/xdscache/resource.go similarity index 94% rename from scheduler/pkg/envoy/resources/resource.go rename to scheduler/pkg/envoy/xdscache/resource.go index 236329bc60..035c1b7432 100644 --- a/scheduler/pkg/envoy/resources/resource.go +++ b/scheduler/pkg/envoy/xdscache/resource.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package resources +package xdscache import ( "fmt" @@ -48,15 +48,8 @@ import ( const ( SeldonLoggingHeader = "Seldon-Logging" EnvoyLogPathPrefix = "/tmp/request-log" - SeldonModelHeader = "seldon-model" - SeldonPipelineHeader = "pipeline" - SeldonInternalModelHeader = "seldon-internal-model" - SeldonRouteHeader = "x-seldon-route" SeldonRouteSeparator = ":" // Tried % but this seemed to break envoy matching. Maybe % is a special character or connected to regexp. A bug? - SeldonModelHeaderSuffix = "model" - SeldonPipelineHeaderSuffix = "pipeline" - DefaultRouteTimeoutSecs = 0 // TODO allow configurable override - ExternalHeaderPrefix = "x-" + DefaultRouteTimeoutSecs = 0 // TODO allow configurable override DefaultRouteConfigurationName = "listener_0" MirrorRouteConfigurationName = "listener_1" TLSRouteConfigurationName = "listener_tls" @@ -349,11 +342,11 @@ func createWeightedModelClusterAction(clusterTraffics []TrafficSplit, mirrorTraf Weight: &wrappers.UInt32Value{ Value: clusterTraffic.TrafficWeight, }, - RequestHeadersToRemove: []string{SeldonInternalModelHeader}, + RequestHeadersToRemove: []string{util.SeldonInternalModelHeader}, RequestHeadersToAdd: []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonInternalModelHeader, + Key: util.SeldonInternalModelHeader, // note: this is implementation specific for agent and it is exposed here // basically the model versions are flattened and it is loaded as // _ @@ -367,7 +360,7 @@ func createWeightedModelClusterAction(clusterTraffics []TrafficSplit, mirrorTraf ResponseHeadersToAdd: []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonRouteHeader, + Key: util.SeldonRouteHeader, Value: wrapRouteHeader(util.GetVersionedModelName( clusterTraffic.ModelName, clusterTraffic.ModelVersion)), }, @@ -427,7 +420,7 @@ func makeModelStickySessionEnvoyRoute(routeName string, envoyRoute *route.Route, } envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ - Name: SeldonModelHeader, // Header name we will match on + Name: util.SeldonModelHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Exact{ @@ -437,7 +430,7 @@ func makeModelStickySessionEnvoyRoute(routeName string, envoyRoute *route.Route, }, } envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ - Name: SeldonRouteHeader, // Header name we will match on + Name: util.SeldonRouteHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Contains{ @@ -451,14 +444,14 @@ func makeModelStickySessionEnvoyRoute(routeName string, envoyRoute *route.Route, envoyRoute.RequestHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonInternalModelHeader, + Key: util.SeldonInternalModelHeader, Value: util.GetVersionedModelName( clusterTraffic.ModelName, clusterTraffic.ModelVersion), }, }, { Header: &core.HeaderValue{ - Key: SeldonModelHeader, + Key: util.SeldonModelHeader, Value: clusterTraffic.ModelName, }, }, @@ -466,7 +459,7 @@ func makeModelStickySessionEnvoyRoute(routeName string, envoyRoute *route.Route, envoyRoute.ResponseHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonRouteHeader, + Key: util.SeldonRouteHeader, Value: wrapRouteHeader(util.GetVersionedModelName( clusterTraffic.ModelName, clusterTraffic.ModelVersion)), }, @@ -504,7 +497,7 @@ func makeModelEnvoyRoute(r *Route, envoyRoute *route.Route, isGrpc, isMirror boo envoyRoute.Match.PathSpecifier = modelRouteMatchPathHttp } envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ - Name: SeldonModelHeader, // Header name we will match on + Name: util.SeldonModelHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Exact{ @@ -514,7 +507,7 @@ func makeModelEnvoyRoute(r *Route, envoyRoute *route.Route, isGrpc, isMirror boo }, } envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ - Name: SeldonRouteHeader, + Name: util.SeldonRouteHeader, HeaderMatchSpecifier: &route.HeaderMatcher_PresentMatch{ PresentMatch: false, }, @@ -538,7 +531,7 @@ func makePipelineEnvoyRoute(r *PipelineRoute, envoyRoute *route.Route, isGrpc, i envoyRoute.Match.PathSpecifier = pipelineRoutePathGrpc } envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ - Name: SeldonModelHeader, // Header name we will match on + Name: util.SeldonModelHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Exact{ @@ -548,7 +541,7 @@ func makePipelineEnvoyRoute(r *PipelineRoute, envoyRoute *route.Route, isGrpc, i }, } envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ - Name: SeldonRouteHeader, + Name: util.SeldonRouteHeader, HeaderMatchSpecifier: &route.HeaderMatcher_PresentMatch{ PresentMatch: false, }, @@ -562,7 +555,7 @@ func makePipelineEnvoyRoute(r *PipelineRoute, envoyRoute *route.Route, isGrpc, i } func getPipelineModelName(pipelineName string) string { - return fmt.Sprintf("%s.%s", pipelineName, SeldonPipelineHeaderSuffix) + return fmt.Sprintf("%s.%s", pipelineName, util.SeldonPipelineHeaderSuffix) } func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplit, mirrorTraffic *PipelineTrafficSplit, isGrpc bool) *route.Route_Route { @@ -585,7 +578,7 @@ func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplit, RequestHeadersToAdd: []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonInternalModelHeader, + Key: util.SeldonInternalModelHeader, Value: getPipelineModelName(clusterTraffic.PipelineName), }, }, @@ -593,7 +586,7 @@ func createWeightedPipelineClusterAction(clusterTraffics []PipelineTrafficSplit, ResponseHeadersToAdd: []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonRouteHeader, + Key: util.SeldonRouteHeader, Value: wrapRouteHeader(getPipelineModelName(clusterTraffic.PipelineName)), }, }, @@ -628,7 +621,7 @@ func makePipelineStickySessionEnvoyRoute(routeName string, envoyRoute *route.Rou } envoyRoute.Match.Headers[0] = &route.HeaderMatcher{ - Name: SeldonRouteHeader, // Header name we will match on + Name: util.SeldonRouteHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Contains{ @@ -638,7 +631,7 @@ func makePipelineStickySessionEnvoyRoute(routeName string, envoyRoute *route.Rou }, } envoyRoute.Match.Headers[1] = &route.HeaderMatcher{ - Name: SeldonModelHeader, // Header name we will match on + Name: util.SeldonModelHeader, // Header name we will match on HeaderMatchSpecifier: &route.HeaderMatcher_StringMatch{ StringMatch: &matcherv3.StringMatcher{ MatchPattern: &matcherv3.StringMatcher_Exact{ @@ -650,7 +643,7 @@ func makePipelineStickySessionEnvoyRoute(routeName string, envoyRoute *route.Rou envoyRoute.RequestHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonInternalModelHeader, + Key: util.SeldonInternalModelHeader, Value: getPipelineModelName(clusterTraffic.PipelineName), }, }, @@ -658,7 +651,7 @@ func makePipelineStickySessionEnvoyRoute(routeName string, envoyRoute *route.Rou envoyRoute.ResponseHeadersToAdd = []*core.HeaderValueOption{ { Header: &core.HeaderValue{ - Key: SeldonRouteHeader, + Key: util.SeldonRouteHeader, Value: wrapRouteHeader(getPipelineModelName(clusterTraffic.PipelineName)), }, }, @@ -817,8 +810,8 @@ func createHeaderFilter() *anypb.Any { DefaultSourceCode: &core.DataSource{ Specifier: &core.DataSource_InlineString{ InlineString: `function envoy_on_request(request_handle) - local modelHeader = request_handle:headers():get("` + SeldonModelHeader + `") - local routeHeader = request_handle:headers():get("` + SeldonRouteHeader + `") + local modelHeader = request_handle:headers():get("` + util.SeldonModelHeader + `") + local routeHeader = request_handle:headers():get("` + util.SeldonRouteHeader + `") if (modelHeader == nil or modelHeader == '') and (routeHeader == nil or routeHeader == '') then local path = request_handle:headers():get(":path") local i, j = string.find(path,"/v2/models/") @@ -827,9 +820,9 @@ func createHeaderFilter() *anypb.Any { i, j = string.find(s, "/") if i then local model = string.sub(s,0,i-1) - request_handle:headers():add("` + SeldonModelHeader + `",model) + request_handle:headers():add("` + util.SeldonModelHeader + `",model) else - request_handle:headers():add("` + SeldonModelHeader + `",s) + request_handle:headers():add("` + util.SeldonModelHeader + `",s) end else i, j = string.find(path,"/v2/pipelines/") @@ -837,7 +830,7 @@ func createHeaderFilter() *anypb.Any { local s = string.sub(path,j+1) i, j = string.find(s, "/") local model = string.sub(s,0,i-1) - request_handle:headers():add("` + SeldonModelHeader + `",model..".` + SeldonPipelineHeaderSuffix + `") + request_handle:headers():add("` + util.SeldonModelHeader + `",model..".` + util.SeldonPipelineHeaderSuffix + `") end end end diff --git a/scheduler/pkg/envoy/resources/resource_test.go b/scheduler/pkg/envoy/xdscache/resource_test.go similarity index 65% rename from scheduler/pkg/envoy/resources/resource_test.go rename to scheduler/pkg/envoy/xdscache/resource_test.go index a3f697707f..1a21aba696 100644 --- a/scheduler/pkg/envoy/resources/resource_test.go +++ b/scheduler/pkg/envoy/xdscache/resource_test.go @@ -7,7 +7,7 @@ Use of this software is governed by the Change License after the Change Date as each is defined in accordance with the LICENSE file. */ -package resources +package xdscache import ( "testing" @@ -28,132 +28,138 @@ func TestMakeRoute(t *testing.T) { tests := []test{ { name: "one model", - modelRoutes: map[string]Route{"r1": { - RouteName: "r1", - Clusters: []TrafficSplit{ - { - ModelName: "m1", - ModelVersion: 1, - TrafficWeight: 100, - HttpCluster: "h1", - GrpcCluster: "g1", + modelRoutes: map[string]Route{ + "r1": { + RouteName: "r1", + Clusters: []TrafficSplit{ + { + ModelName: "m1", + ModelVersion: 1, + TrafficWeight: 100, + HttpCluster: "h1", + GrpcCluster: "g1", + }, }, }, }, - }, expectedDefaultRoutes: 2, expectedMirrorRoutes: 0, }, { name: "one pipeline", - pipelineRoutes: map[string]PipelineRoute{"r1": { - RouteName: "r1", - Clusters: []PipelineTrafficSplit{ - { - PipelineName: "p1", - TrafficWeight: 100, + pipelineRoutes: map[string]PipelineRoute{ + "r1": { + RouteName: "r1", + Clusters: []PipelineTrafficSplit{ + { + PipelineName: "p1", + TrafficWeight: 100, + }, }, }, }, - }, expectedDefaultRoutes: 2, expectedMirrorRoutes: 0, }, { name: "pipeline experiment", - pipelineRoutes: map[string]PipelineRoute{"r1": { - RouteName: "r1", - Clusters: []PipelineTrafficSplit{ - { - PipelineName: "p1", - TrafficWeight: 50, - }, - { - PipelineName: "p2", - TrafficWeight: 50, + pipelineRoutes: map[string]PipelineRoute{ + "r1": { + RouteName: "r1", + Clusters: []PipelineTrafficSplit{ + { + PipelineName: "p1", + TrafficWeight: 50, + }, + { + PipelineName: "p2", + TrafficWeight: 50, + }, }, }, }, - }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 0, }, { name: "pipeline experiment with mirror", - pipelineRoutes: map[string]PipelineRoute{"r1": { - RouteName: "r1", - Clusters: []PipelineTrafficSplit{ - { - PipelineName: "p1", - TrafficWeight: 50, + pipelineRoutes: map[string]PipelineRoute{ + "r1": { + RouteName: "r1", + Clusters: []PipelineTrafficSplit{ + { + PipelineName: "p1", + TrafficWeight: 50, + }, + { + PipelineName: "p2", + TrafficWeight: 50, + }, }, - { - PipelineName: "p2", - TrafficWeight: 50, + Mirror: &PipelineTrafficSplit{ + PipelineName: "p3", + TrafficWeight: 100, }, }, - Mirror: &PipelineTrafficSplit{ - PipelineName: "p3", - TrafficWeight: 100, - }, - }, }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 2, }, { name: "model experiment", - modelRoutes: map[string]Route{"r1": { - RouteName: "r1", - Clusters: []TrafficSplit{ - { - ModelName: "m1", - ModelVersion: 1, - TrafficWeight: 50, - HttpCluster: "h1", - GrpcCluster: "g1", - }, - { - ModelName: "m2", - ModelVersion: 1, - TrafficWeight: 50, - HttpCluster: "h1", - GrpcCluster: "g1", + modelRoutes: map[string]Route{ + "r1": { + RouteName: "r1", + Clusters: []TrafficSplit{ + { + ModelName: "m1", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", + }, + { + ModelName: "m2", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", + }, }, }, }, - }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 0, }, { name: "experiment with model mirror", - modelRoutes: map[string]Route{"r1": { - RouteName: "r1", - Clusters: []TrafficSplit{ - { - ModelName: "m1", - ModelVersion: 1, - TrafficWeight: 50, - HttpCluster: "h1", - GrpcCluster: "g1", + modelRoutes: map[string]Route{ + "r1": { + RouteName: "r1", + Clusters: []TrafficSplit{ + { + ModelName: "m1", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", + }, + { + ModelName: "m2", + ModelVersion: 1, + TrafficWeight: 50, + HttpCluster: "h1", + GrpcCluster: "g1", + }, }, - { - ModelName: "m2", + Mirror: &TrafficSplit{ + ModelName: "m3", ModelVersion: 1, - TrafficWeight: 50, + TrafficWeight: 100, HttpCluster: "h1", GrpcCluster: "g1", }, }, - Mirror: &TrafficSplit{ - ModelName: "m3", - ModelVersion: 1, - TrafficWeight: 100, - HttpCluster: "h1", - GrpcCluster: "g1", - }, - }, }, expectedDefaultRoutes: 6, expectedMirrorRoutes: 2, diff --git a/scheduler/pkg/envoy/xdscache/seldoncache.go b/scheduler/pkg/envoy/xdscache/seldoncache.go index 9e47a36612..374860e6f9 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache.go @@ -17,7 +17,6 @@ import ( seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" ) @@ -42,10 +41,10 @@ const ( type SeldonXDSCache struct { permanentListeners []types.Resource permanentClusters []types.Resource - Routes map[string]resources.Route - Clusters map[string]resources.Cluster - Pipelines map[string]resources.PipelineRoute - Secrets map[string]resources.Secret + Routes map[string]Route + Clusters map[string]Cluster + Pipelines map[string]PipelineRoute + Secrets map[string]Secret PipelineGatewayDetails *PipelineGatewayDetails logger logrus.FieldLogger TLSActive bool @@ -61,10 +60,10 @@ func NewSeldonXDSCache(logger logrus.FieldLogger, pipelineGatewayDetails *Pipeli return &SeldonXDSCache{ permanentListeners: make([]types.Resource, permanentListenerCount), permanentClusters: make([]types.Resource, permanentClusterCount), - Clusters: make(map[string]resources.Cluster), - Routes: make(map[string]resources.Route), - Pipelines: make(map[string]resources.PipelineRoute), - Secrets: make(map[string]resources.Secret), + Clusters: make(map[string]Cluster), + Routes: make(map[string]Route), + Pipelines: make(map[string]PipelineRoute), + Secrets: make(map[string]Secret), PipelineGatewayDetails: pipelineGatewayDetails, logger: logger.WithField("source", "SeldonXDSCache"), } @@ -98,18 +97,18 @@ func (xds *SeldonXDSCache) SetupTLS() error { } func (xds *SeldonXDSCache) AddPermanentListeners() { - var serverSecret *resources.Secret + var serverSecret *Secret if xds.TLSActive { if secret, ok := xds.Secrets[EnvoyDownstreamServerCertName]; ok { serverSecret = &secret } } - xds.permanentListeners[0] = resources.MakeHTTPListener(defaultListenerName, defaultListenerAddress, defaultListenerPort, resources.DefaultRouteConfigurationName, serverSecret) - xds.permanentListeners[1] = resources.MakeHTTPListener(mirrorListenerName, mirrorListenerAddress, mirrorListenerPort, resources.MirrorRouteConfigurationName, serverSecret) + xds.permanentListeners[0] = MakeHTTPListener(defaultListenerName, defaultListenerAddress, defaultListenerPort, DefaultRouteConfigurationName, serverSecret) + xds.permanentListeners[1] = MakeHTTPListener(mirrorListenerName, mirrorListenerAddress, mirrorListenerPort, MirrorRouteConfigurationName, serverSecret) } func (xds *SeldonXDSCache) AddPermanentClusters() { - var clientSecret *resources.Secret + var clientSecret *Secret if xds.TLSActive { if secret, ok := xds.Secrets[EnvoyUpstreamClientCertName]; ok { clientSecret = &secret @@ -117,16 +116,16 @@ func (xds *SeldonXDSCache) AddPermanentClusters() { } // Add pipeline gateway clusters - xds.logger.Infof("Add http pipeline cluster %s host:%s port:%d", resources.PipelineGatewayHttpClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.HttpPort) - xds.permanentClusters[0] = resources.MakeCluster(resources.PipelineGatewayHttpClusterName, []resources.Endpoint{ + xds.logger.Infof("Add http pipeline cluster %s host:%s port:%d", PipelineGatewayHttpClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.HttpPort) + xds.permanentClusters[0] = MakeCluster(PipelineGatewayHttpClusterName, []Endpoint{ { UpstreamHost: xds.PipelineGatewayDetails.Host, UpstreamPort: uint32(xds.PipelineGatewayDetails.HttpPort), }, }, false, clientSecret) - xds.logger.Infof("Add grpc pipeline cluster %s host:%s port:%d", resources.PipelineGatewayGrpcClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.GrpcPort) - xds.permanentClusters[1] = resources.MakeCluster(resources.PipelineGatewayGrpcClusterName, []resources.Endpoint{ + xds.logger.Infof("Add grpc pipeline cluster %s host:%s port:%d", PipelineGatewayGrpcClusterName, xds.PipelineGatewayDetails.Host, xds.PipelineGatewayDetails.GrpcPort) + xds.permanentClusters[1] = MakeCluster(PipelineGatewayGrpcClusterName, []Endpoint{ { UpstreamHost: xds.PipelineGatewayDetails.Host, UpstreamPort: uint32(xds.PipelineGatewayDetails.GrpcPort), @@ -134,15 +133,15 @@ func (xds *SeldonXDSCache) AddPermanentClusters() { }, true, clientSecret) // Add Mirror clusters - xds.logger.Infof("Add http mirror cluster %s host:%s port:%d", resources.MirrorHttpClusterName, mirrorListenerAddress, mirrorListenerPort) - xds.permanentClusters[2] = resources.MakeCluster(resources.MirrorHttpClusterName, []resources.Endpoint{ + xds.logger.Infof("Add http mirror cluster %s host:%s port:%d", MirrorHttpClusterName, mirrorListenerAddress, mirrorListenerPort) + xds.permanentClusters[2] = MakeCluster(MirrorHttpClusterName, []Endpoint{ { UpstreamHost: mirrorListenerAddress, UpstreamPort: mirrorListenerPort, }, }, false, nil) - xds.logger.Infof("Add grpc mirror cluster %s host:%s port:%d", resources.MirrorGrpcClusterName, mirrorListenerAddress, mirrorListenerPort) - xds.permanentClusters[3] = resources.MakeCluster(resources.MirrorGrpcClusterName, []resources.Endpoint{ + xds.logger.Infof("Add grpc mirror cluster %s host:%s port:%d", MirrorGrpcClusterName, mirrorListenerAddress, mirrorListenerPort) + xds.permanentClusters[3] = MakeCluster(MirrorGrpcClusterName, []Endpoint{ { UpstreamHost: mirrorListenerAddress, UpstreamPort: mirrorListenerPort, @@ -153,7 +152,7 @@ func (xds *SeldonXDSCache) AddPermanentClusters() { func (xds *SeldonXDSCache) ClusterContents() []types.Resource { var r []types.Resource - var clientSecret *resources.Secret + var clientSecret *Secret if xds.TLSActive { if secret, ok := xds.Secrets[EnvoyUpstreamClientCertName]; ok { clientSecret = &secret @@ -163,18 +162,18 @@ func (xds *SeldonXDSCache) ClusterContents() []types.Resource { r = append(r, xds.permanentClusters...) for _, c := range xds.Clusters { - endpoints := make([]resources.Endpoint, 0, len(c.Endpoints)) + endpoints := make([]Endpoint, 0, len(c.Endpoints)) for _, value := range c.Endpoints { // Likely to be small (<100?) as is number of model replicas endpoints = append(endpoints, value) } - r = append(r, resources.MakeCluster(c.Name, endpoints, c.Grpc, clientSecret)) + r = append(r, MakeCluster(c.Name, endpoints, c.Grpc, clientSecret)) } return r } func (xds *SeldonXDSCache) RouteContents() []types.Resource { - defaultRoutes, mirrorRoutes := resources.MakeRoutes(xds.Routes, xds.Pipelines) + defaultRoutes, mirrorRoutes := MakeRoutes(xds.Routes, xds.Pipelines) return []types.Resource{defaultRoutes, mirrorRoutes} } @@ -187,7 +186,7 @@ func (xds *SeldonXDSCache) SecretContents() []types.Resource { var r []types.Resource for _, s := range xds.Secrets { - secrets := resources.MakeSecretResource(s.Name, s.ValidationSecretName, s.Certificate) + secrets := MakeSecretResource(s.Name, s.ValidationSecretName, s.Certificate) logger.Infof("Adding secrets for %s(%s) of length %d", s.Name, s.ValidationSecretName, len(secrets)) for _, secret := range secrets { r = append(r, secret) @@ -197,11 +196,11 @@ func (xds *SeldonXDSCache) SecretContents() []types.Resource { return r } -func (xds *SeldonXDSCache) AddPipelineRoute(routeName string, trafficSplits []resources.PipelineTrafficSplit, mirror *resources.PipelineTrafficSplit) { +func (xds *SeldonXDSCache) AddPipelineRoute(routeName string, trafficSplits []PipelineTrafficSplit, mirror *PipelineTrafficSplit) { xds.RemovePipelineRoute(routeName) pipelineRoute, ok := xds.Pipelines[routeName] if !ok { - xds.Pipelines[routeName] = resources.PipelineRoute{ + xds.Pipelines[routeName] = PipelineRoute{ RouteName: routeName, Mirror: mirror, Clusters: trafficSplits, @@ -218,7 +217,7 @@ func (xds *SeldonXDSCache) RemovePipelineRoute(pipelineName string) { } func (xds *SeldonXDSCache) AddSecret(name string, validationSecretName string, certificate *seldontls.CertificateStore) { - xds.Secrets[name] = resources.Secret{ + xds.Secrets[name] = Secret{ Name: name, ValidationSecretName: validationSecretName, Certificate: certificate, @@ -234,7 +233,7 @@ func (xds *SeldonXDSCache) AddRouteClusterTraffic( ) { route, ok := xds.Routes[routeName] if !ok { - route = resources.Route{ + route = Route{ RouteName: routeName, LogPayloads: logPayloads, } @@ -245,7 +244,7 @@ func (xds *SeldonXDSCache) AddRouteClusterTraffic( route.LogPayloads = true } - clusterTraffic := resources.TrafficSplit{ + clusterTraffic := TrafficSplit{ ModelName: modelName, ModelVersion: modelVersion, TrafficWeight: trafficPercent, @@ -270,14 +269,14 @@ func (xds *SeldonXDSCache) AddClustersForRoute( ) { logger := xds.logger.WithField("func", "AddClustersForRoute") - routeVersionKey := resources.RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion} + routeVersionKey := RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion} httpCluster, ok := xds.Clusters[httpClusterName] if !ok { - httpCluster = resources.Cluster{ + httpCluster = Cluster{ Name: httpClusterName, - Endpoints: make(map[string]resources.Endpoint), - Routes: make(map[resources.RouteVersionKey]bool), + Endpoints: make(map[string]Endpoint), + Routes: make(map[RouteVersionKey]bool), Grpc: false, } } @@ -286,10 +285,10 @@ func (xds *SeldonXDSCache) AddClustersForRoute( grpcCluster, ok := xds.Clusters[grpcClusterName] if !ok { - grpcCluster = resources.Cluster{ + grpcCluster = Cluster{ Name: grpcClusterName, - Endpoints: make(map[string]resources.Endpoint), - Routes: make(map[resources.RouteVersionKey]bool), + Endpoints: make(map[string]Endpoint), + Routes: make(map[RouteVersionKey]bool), Grpc: true, } } @@ -300,12 +299,12 @@ func (xds *SeldonXDSCache) AddClustersForRoute( logger.Warnf("Invalid replica index %d for server %s", replicaIdx, server.Name) } else { httpEndpointName := fmt.Sprintf("%s:%d", replica.GetInferenceSvc(), replica.GetInferenceHttpPort()) - httpCluster.Endpoints[httpEndpointName] = resources.Endpoint{ + httpCluster.Endpoints[httpEndpointName] = Endpoint{ UpstreamHost: replica.GetInferenceSvc(), UpstreamPort: uint32(replica.GetInferenceHttpPort()), } grpcEndpointName := fmt.Sprintf("%s:%d", replica.GetInferenceSvc(), replica.GetInferenceGrpcPort()) - grpcCluster.Endpoints[grpcEndpointName] = resources.Endpoint{ + grpcCluster.Endpoints[grpcEndpointName] = Endpoint{ UpstreamHost: replica.GetInferenceSvc(), UpstreamPort: uint32(replica.GetInferenceGrpcPort()), } @@ -339,13 +338,13 @@ func (xds *SeldonXDSCache) RemoveRoute(routeName string) error { return nil } -func (xds *SeldonXDSCache) removeRouteFromCluster(route resources.Route, cluster resources.TrafficSplit) error { - removeCluster := func(route resources.Route, clusterName string, split resources.TrafficSplit) error { +func (xds *SeldonXDSCache) removeRouteFromCluster(route Route, cluster TrafficSplit) error { + removeCluster := func(route Route, clusterName string, split TrafficSplit) error { cluster, ok := xds.Clusters[clusterName] if !ok { return fmt.Errorf("can't find cluster for route %s cluster %s route %+v", route.RouteName, clusterName, route) } - delete(cluster.Routes, resources.RouteVersionKey{RouteName: route.RouteName, ModelName: split.ModelName, Version: split.ModelVersion}) + delete(cluster.Routes, RouteVersionKey{RouteName: route.RouteName, ModelName: split.ModelName, Version: split.ModelVersion}) if len(cluster.Routes) == 0 { delete(xds.Clusters, clusterName) } diff --git a/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go b/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go index ae1bdc70c7..92d411b9a4 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache_benchmark_test.go @@ -16,8 +16,6 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/sirupsen/logrus" - - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" ) // Prevent compiler from optimising away benchmarks @@ -27,7 +25,7 @@ func benchmarkRouteContents(b *testing.B, numResources uint) { x := NewSeldonXDSCache(logrus.New(), nil) for n := 0; n < int(numResources); n++ { - x.AddPipelineRoute(strconv.Itoa(n), []resources.PipelineTrafficSplit{{PipelineName: strconv.Itoa(n), TrafficWeight: 100}}, nil) + x.AddPipelineRoute(strconv.Itoa(n), []PipelineTrafficSplit{{PipelineName: strconv.Itoa(n), TrafficWeight: 100}}, nil) x.AddRouteClusterTraffic( fmt.Sprintf("model-%d", n), diff --git a/scheduler/pkg/envoy/xdscache/seldoncache_test.go b/scheduler/pkg/envoy/xdscache/seldoncache_test.go index ed1b6478c2..3bc205c58c 100644 --- a/scheduler/pkg/envoy/xdscache/seldoncache_test.go +++ b/scheduler/pkg/envoy/xdscache/seldoncache_test.go @@ -20,7 +20,6 @@ import ( "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" seldontls "github.com/seldonio/seldon-core/components/tls/v2/pkg/tls" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/store" ) @@ -80,10 +79,10 @@ func TestAddRemoveHttpAndGrpcRouteVersions(t *testing.T) { g.Expect(len(c.Clusters[grpcCluster1].Endpoints)).To(Equal(1)) g.Expect(c.Clusters[httpCluster1].Grpc).To(BeFalse()) g.Expect(c.Clusters[grpcCluster1].Grpc).To(BeTrue()) - g.Expect(c.Clusters[httpCluster1].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster1].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[httpCluster2].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 2}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster2].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(c.Clusters[httpCluster1].Routes[RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster1].Routes[RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpCluster2].Routes[RouteVersionKey{RouteName: route1, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster2].Routes[RouteVersionKey{RouteName: route1, ModelName: model1, Version: 2}]).To(BeTrue()) addVersionedRoute(c, route2, model1, httpCluster1, grpcCluster1, 100, 1) @@ -91,8 +90,8 @@ func TestAddRemoveHttpAndGrpcRouteVersions(t *testing.T) { g.Expect(len(c.Routes[route2].Clusters)).To(Equal(1)) clusters = c.Routes[route2].Clusters g.Expect(clusters[0].TrafficWeight).To(Equal(uint32(100))) - g.Expect(c.Clusters[httpCluster1].Routes[resources.RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster1].Routes[resources.RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpCluster1].Routes[RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster1].Routes[RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) g.Expect(len(c.Clusters[httpCluster1].Endpoints)).To(Equal(1)) g.Expect(len(c.Clusters[grpcCluster1].Endpoints)).To(Equal(1)) @@ -136,10 +135,10 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForSameModel(t *testing.T) { g.Expect(len(c.Clusters[grpcCluster1].Endpoints)).To(Equal(1)) g.Expect(c.Clusters[httpCluster1].Grpc).To(BeFalse()) g.Expect(c.Clusters[grpcCluster1].Grpc).To(BeTrue()) - g.Expect(c.Clusters[httpCluster1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[httpCluster2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) - g.Expect(c.Clusters[grpcCluster2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(c.Clusters[httpCluster1].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster1].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpCluster2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) + g.Expect(c.Clusters[grpcCluster2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 2}]).To(BeTrue()) err := c.RemoveRoute(routeName) g.Expect(err).To(BeNil()) @@ -176,10 +175,10 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentModels(t *testing.T) { g.Expect(len(c.Clusters[grpcClusterModel1].Endpoints)).To(Equal(1)) g.Expect(c.Clusters[httpClusterModel1].Grpc).To(BeFalse()) g.Expect(c.Clusters[grpcClusterModel1].Grpc).To(BeTrue()) - g.Expect(c.Clusters[httpClusterModel1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcClusterModel1].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[httpClusterModel2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcClusterModel2].Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpClusterModel1].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcClusterModel1].Routes[RouteVersionKey{RouteName: routeName, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpClusterModel2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcClusterModel2].Routes[RouteVersionKey{RouteName: routeName, ModelName: model2, Version: 1}]).To(BeTrue()) err := c.RemoveRoute(routeName) g.Expect(err).To(BeNil()) @@ -217,10 +216,10 @@ func TestAddRemoveHttpAndGrpcRouteVersionsForDifferentRoutesSameModel(t *testing g.Expect(len(c.Clusters[grpcClusterModel1].Endpoints)).To(Equal(1)) g.Expect(c.Clusters[httpClusterModel1].Grpc).To(BeFalse()) g.Expect(c.Clusters[grpcClusterModel1].Grpc).To(BeTrue()) - g.Expect(c.Clusters[httpClusterModel1].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcClusterModel1].Routes[resources.RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[httpClusterModel1].Routes[resources.RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) - g.Expect(c.Clusters[grpcClusterModel1].Routes[resources.RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpClusterModel1].Routes[RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcClusterModel1].Routes[RouteVersionKey{RouteName: route1, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[httpClusterModel1].Routes[RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) + g.Expect(c.Clusters[grpcClusterModel1].Routes[RouteVersionKey{RouteName: route2, ModelName: model1, Version: 1}]).To(BeTrue()) err := c.RemoveRoute(route1) g.Expect(err).To(BeNil()) @@ -331,21 +330,21 @@ func addCluster( ) { cluster, ok := xds.Clusters[name] if !ok { - cluster = resources.Cluster{ + cluster = Cluster{ Name: name, - Endpoints: make(map[string]resources.Endpoint), - Routes: make(map[resources.RouteVersionKey]bool), + Endpoints: make(map[string]Endpoint), + Routes: make(map[RouteVersionKey]bool), Grpc: isGrpc, } } - cluster.Routes[resources.RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion}] = true + cluster.Routes[RouteVersionKey{RouteName: routeName, ModelName: modelName, Version: modelVersion}] = true xds.Clusters[name] = cluster } func addEndpoint(xds *SeldonXDSCache, clusterName, upstreamHost string, upstreamPort uint32) { cluster := xds.Clusters[clusterName] k := fmt.Sprintf("%s:%d", upstreamHost, upstreamPort) - cluster.Endpoints[k] = resources.Endpoint{ + cluster.Endpoints[k] = Endpoint{ UpstreamHost: upstreamHost, UpstreamPort: upstreamPort, } diff --git a/scheduler/pkg/envoy/resources/tls.go b/scheduler/pkg/envoy/xdscache/tls.go similarity index 99% rename from scheduler/pkg/envoy/resources/tls.go rename to scheduler/pkg/envoy/xdscache/tls.go index 11d734d401..fe33c3ceb1 100644 --- a/scheduler/pkg/envoy/resources/tls.go +++ b/scheduler/pkg/envoy/xdscache/tls.go @@ -7,7 +7,7 @@ Use of this software is governed by the Change License after the Change Date as each is defined in accordance with the LICENSE file. */ -package resources +package xdscache import ( core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" diff --git a/scheduler/pkg/envoy/resources/tls_test.go b/scheduler/pkg/envoy/xdscache/tls_test.go similarity index 97% rename from scheduler/pkg/envoy/resources/tls_test.go rename to scheduler/pkg/envoy/xdscache/tls_test.go index ee35dc62c2..ae3fea6c5c 100644 --- a/scheduler/pkg/envoy/resources/tls_test.go +++ b/scheduler/pkg/envoy/xdscache/tls_test.go @@ -7,7 +7,7 @@ Use of this software is governed by the Change License after the Change Date as each is defined in accordance with the LICENSE file. */ -package resources +package xdscache import ( "crypto/tls" @@ -28,32 +28,32 @@ type FakeCertificateStore struct { } func (f FakeCertificateStore) GetServerCertificate(info *tls.ClientHelloInfo) (*tls.Certificate, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (f FakeCertificateStore) GetClientCertificate(info *tls.CertificateRequestInfo) (*tls.Certificate, error) { - //TODO implement me + // TODO implement me panic("implement me") } func (f FakeCertificateStore) CreateClientTLSConfig() *tls.Config { - //TODO implement me + // TODO implement me panic("implement me") } func (f FakeCertificateStore) CreateClientTransportCredentials() credentials.TransportCredentials { - //TODO implement me + // TODO implement me panic("implement me") } func (f FakeCertificateStore) CreateServerTLSConfig() *tls.Config { - //TODO implement me + // TODO implement me panic("implement me") } func (f FakeCertificateStore) CreateServerTransportCredentials() credentials.TransportCredentials { - //TODO implement me + // TODO implement me panic("implement me") } diff --git a/scheduler/pkg/kafka/dataflow/server_test.go b/scheduler/pkg/kafka/dataflow/server_test.go index 382eda0d9a..57a0a1cfb7 100644 --- a/scheduler/pkg/kafka/dataflow/server_test.go +++ b/scheduler/pkg/kafka/dataflow/server_test.go @@ -808,7 +808,6 @@ func createTestScheduler(t *testing.T, serverName string) (*ChainerServer, *coor configFilePath := fmt.Sprintf("%s/kafka.json", t.TempDir()) _ = os.WriteFile(configFilePath, []byte(data), 0644) kc, _ := kafka_config.NewKafkaConfig(configFilePath) - b := util.NewRingLoadBalancer(1) b.AddServer(serverName) s, _ := NewChainerServer(logger, eventHub, pipelineServer, "test-ns", b, kc) diff --git a/scheduler/pkg/kafka/gateway/utils.go b/scheduler/pkg/kafka/gateway/utils.go index 9686326bd3..a7f4142ed4 100644 --- a/scheduler/pkg/kafka/gateway/utils.go +++ b/scheduler/pkg/kafka/gateway/utils.go @@ -15,13 +15,13 @@ import ( "google.golang.org/grpc/metadata" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) func extractHeadersHttp(headers http.Header) map[string][]string { filteredHeaders := make(map[string][]string) for k, v := range headers { - if strings.HasPrefix(k, resources.ExternalHeaderPrefix) { + if strings.HasPrefix(k, util.ExternalHeaderPrefix) { filteredHeaders[k] = v } } @@ -31,12 +31,12 @@ func extractHeadersHttp(headers http.Header) map[string][]string { func extractHeadersGrpc(headers metadata.MD, trailers metadata.MD) map[string][]string { filteredHeaders := make(map[string][]string) for k, v := range headers { - if strings.HasPrefix(k, resources.ExternalHeaderPrefix) { + if strings.HasPrefix(k, util.ExternalHeaderPrefix) { filteredHeaders[k] = v } } for k, v := range trailers { - if strings.HasPrefix(k, resources.ExternalHeaderPrefix) { + if strings.HasPrefix(k, util.ExternalHeaderPrefix) { filteredHeaders[k] = v } } diff --git a/scheduler/pkg/kafka/gateway/worker.go b/scheduler/pkg/kafka/gateway/worker.go index 21047e7a2d..d126eff71f 100644 --- a/scheduler/pkg/kafka/gateway/worker.go +++ b/scheduler/pkg/kafka/gateway/worker.go @@ -37,7 +37,6 @@ import ( v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" kafka2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka" pipeline "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/pipeline" seldontracer "github.com/seldonio/seldon-core/scheduler/v2/pkg/tracing" @@ -313,7 +312,7 @@ func (iw *InferWorker) restRequest(ctx context.Context, job *InferWork, maybeCon } req.Header.Set("Content-Type", "application/json") - req.Header.Set(resources.SeldonModelHeader, job.modelName) + req.Header.Set(util.SeldonModelHeader, job.modelName) if reqId, ok := job.headers[util.RequestIdHeader]; ok { req.Header[util.RequestIdHeader] = []string{reqId} } @@ -358,13 +357,13 @@ func (iw *InferWorker) restRequest(ctx context.Context, job *InferWork, maybeCon // Add all external headers to request metadata func addMetadataToOutgoingContext(ctx context.Context, job *InferWork, logger log.FieldLogger) context.Context { for k, v := range job.headers { - if strings.HasPrefix(k, resources.ExternalHeaderPrefix) && - k != resources.SeldonRouteHeader { // We don;t want to send x-seldon-route as this will confuse envoy + if strings.HasPrefix(k, util.ExternalHeaderPrefix) && + k != util.SeldonRouteHeader { // We don;t want to send x-seldon-route as this will confuse envoy logger.Debugf("Adding outgoing ctx metadata %s:%s", k, v) ctx = metadata.AppendToOutgoingContext(ctx, k, v) } } - ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonModelHeader, job.modelName) + ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonModelHeader, job.modelName) return ctx } diff --git a/scheduler/pkg/kafka/gateway/worker_test.go b/scheduler/pkg/kafka/gateway/worker_test.go index 7fb5995f26..e4008e1a7c 100644 --- a/scheduler/pkg/kafka/gateway/worker_test.go +++ b/scheduler/pkg/kafka/gateway/worker_test.go @@ -31,7 +31,6 @@ import ( v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane" kafka_config "github.com/seldonio/seldon-core/components/kafka/v2/pkg/config" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" kafka2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka" seldontracer "github.com/seldonio/seldon-core/scheduler/v2/pkg/tracing" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" @@ -482,26 +481,26 @@ func TestAddMetadataToOutgoingContext(t *testing.T) { { name: "ignore xseldon-route header", ctx: metadata.NewIncomingContext(context.TODO(), metadata.New(map[string]string{})), - job: &InferWork{modelName: "foo", headers: map[string]string{resources.SeldonRouteHeader: ":a:"}}, - expectedHeaders: map[string][]string{resources.SeldonModelHeader: {"foo"}}, + job: &InferWork{modelName: "foo", headers: map[string]string{util.SeldonRouteHeader: ":a:"}}, + expectedHeaders: map[string][]string{util.SeldonModelHeader: {"foo"}}, }, { name: "pass x-request-id header", ctx: metadata.NewIncomingContext(context.TODO(), metadata.New(map[string]string{})), job: &InferWork{modelName: "foo", headers: map[string]string{util.RequestIdHeader: "1234"}}, - expectedHeaders: map[string][]string{resources.SeldonModelHeader: {"foo"}, util.RequestIdHeader: {"1234"}}, + expectedHeaders: map[string][]string{util.SeldonModelHeader: {"foo"}, util.RequestIdHeader: {"1234"}}, }, { name: "pass custom header", ctx: metadata.NewIncomingContext(context.TODO(), metadata.New(map[string]string{})), job: &InferWork{modelName: "foo", headers: map[string]string{"x-myheader": "1234"}}, - expectedHeaders: map[string][]string{resources.SeldonModelHeader: {"foo"}, "x-myheader": {"1234"}}, + expectedHeaders: map[string][]string{util.SeldonModelHeader: {"foo"}, "x-myheader": {"1234"}}, }, { name: "ignore non x- prefix headers", ctx: metadata.NewIncomingContext(context.TODO(), metadata.New(map[string]string{})), job: &InferWork{modelName: "foo", headers: map[string]string{"myheader": "1234"}}, - expectedHeaders: map[string][]string{resources.SeldonModelHeader: {"foo"}}, + expectedHeaders: map[string][]string{util.SeldonModelHeader: {"foo"}}, }, } for _, test := range tests { diff --git a/scheduler/pkg/kafka/pipeline/grpcserver.go b/scheduler/pkg/kafka/pipeline/grpcserver.go index 9fc17ea004..1fcaf4ea29 100644 --- a/scheduler/pkg/kafka/pipeline/grpcserver.go +++ b/scheduler/pkg/kafka/pipeline/grpcserver.go @@ -26,7 +26,6 @@ import ( v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" status2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/pipeline/status" "github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" @@ -111,16 +110,16 @@ func (g *GatewayGrpcServer) getRequestId(md metadata.MD) string { func (g *GatewayGrpcServer) ModelInfer(ctx context.Context, r *v2.ModelInferRequest) (*v2.ModelInferResponse, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, fmt.Sprintf("failed to find any metadata - required %s or %s", resources.SeldonModelHeader, resources.SeldonInternalModelHeader)) + return nil, status.Errorf(codes.FailedPrecondition, fmt.Sprintf("failed to find any metadata - required %s or %s", util.SeldonModelHeader, util.SeldonInternalModelHeader)) } - g.logger.Debugf("Seldon model header %v and seldon internal model header %v", md[resources.SeldonModelHeader], md[resources.SeldonInternalModelHeader]) - header := extractHeader(resources.SeldonInternalModelHeader, md) // Internal model header has precedence - if header == "" { // If we don't find internal model header fall back on public one - header = extractHeader(resources.SeldonModelHeader, md) + g.logger.Debugf("Seldon model header %v and seldon internal model header %v", md[util.SeldonModelHeader], md[util.SeldonInternalModelHeader]) + header := extractHeader(util.SeldonInternalModelHeader, md) // Internal model header has precedence + if header == "" { // If we don't find internal model header fall back on public one + header = extractHeader(util.SeldonModelHeader, md) } resourceName, isModel, err := createResourceNameFromHeader(header) if err != nil { - return nil, status.Errorf(codes.FailedPrecondition, fmt.Sprintf("failed to find valid header %s, found %s", resources.SeldonModelHeader, resourceName)) + return nil, status.Errorf(codes.FailedPrecondition, fmt.Sprintf("failed to find valid header %s, found %s", util.SeldonModelHeader, resourceName)) } startTime := time.Now() @@ -159,7 +158,7 @@ func (g *GatewayGrpcServer) ModelInfer(ctx context.Context, r *v2.ModelInferRequ func (g *GatewayGrpcServer) ModelReady(ctx context.Context, req *v2.ModelReadyRequest) (*v2.ModelReadyResponse, error) { md, ok := metadata.FromIncomingContext(ctx) if !ok { - return nil, status.Errorf(codes.FailedPrecondition, fmt.Sprintf("failed to find any metadata - required %s or %s", resources.SeldonModelHeader, resources.SeldonInternalModelHeader)) + return nil, status.Errorf(codes.FailedPrecondition, fmt.Sprintf("failed to find any metadata - required %s or %s", util.SeldonModelHeader, util.SeldonInternalModelHeader)) } ready, err := g.pipelineReadyChecker.CheckPipelineReady(ctx, req.GetName(), g.getRequestId(md)) if err != nil { diff --git a/scheduler/pkg/kafka/pipeline/grpcserver_test.go b/scheduler/pkg/kafka/pipeline/grpcserver_test.go index 96d4697a80..43567830c8 100644 --- a/scheduler/pkg/kafka/pipeline/grpcserver_test.go +++ b/scheduler/pkg/kafka/pipeline/grpcserver_test.go @@ -24,7 +24,6 @@ import ( v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -133,7 +132,7 @@ func TestGrpcServer(t *testing.T) { g.Expect(err).To(BeNil()) client := v2.NewGRPCInferenceServiceClient(conn) ctx := context.TODO() - ctx = metadata.AppendToOutgoingContext(ctx, resources.SeldonModelHeader, test.header) + ctx = metadata.AppendToOutgoingContext(ctx, util.SeldonModelHeader, test.header) var header, trailer metadata.MD res, err := client.ModelInfer(ctx, test.req, grpc.Header(&header), grpc.Trailer(&trailer)) if test.error { diff --git a/scheduler/pkg/kafka/pipeline/httpserver.go b/scheduler/pkg/kafka/pipeline/httpserver.go index 13b8c14ed8..b5ba40eae2 100644 --- a/scheduler/pkg/kafka/pipeline/httpserver.go +++ b/scheduler/pkg/kafka/pipeline/httpserver.go @@ -23,7 +23,6 @@ import ( log "github.com/sirupsen/logrus" "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka/pipeline/status" "github.com/seldonio/seldon-core/scheduler/v2/pkg/metrics" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" @@ -187,9 +186,9 @@ func (g *GatewayHttpServer) infer(w http.ResponseWriter, req *http.Request, reso } func getResourceFromHeaders(req *http.Request, logger log.FieldLogger) (string, bool, error) { - modelHeader := req.Header.Get(resources.SeldonModelHeader) + modelHeader := req.Header.Get(util.SeldonModelHeader) // may have multiple header values due to shadow/mirror processing - modelInternalHeader := req.Header.Values(resources.SeldonInternalModelHeader) + modelInternalHeader := req.Header.Values(util.SeldonInternalModelHeader) logger.Debugf("Seldon model header %s and seldon internal model header %s", modelHeader, modelInternalHeader) if len(modelInternalHeader) > 0 { return createResourceNameFromHeader(modelInternalHeader[len(modelInternalHeader)-1]) // get last header if multiple diff --git a/scheduler/pkg/kafka/pipeline/httpserver_test.go b/scheduler/pkg/kafka/pipeline/httpserver_test.go index 3b42454705..f81685eced 100644 --- a/scheduler/pkg/kafka/pipeline/httpserver_test.go +++ b/scheduler/pkg/kafka/pipeline/httpserver_test.go @@ -28,7 +28,6 @@ import ( v2 "github.com/seldonio/seldon-core/apis/go/v2/mlops/v2_dataplane" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/internal/testing_utils" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -174,7 +173,7 @@ func TestHttpServer(t *testing.T) { r := strings.NewReader(test.req) req, err := http.NewRequest(http.MethodPost, url, r) g.Expect(err).To(BeNil()) - req.Header.Set(resources.SeldonModelHeader, test.header) + req.Header.Set(util.SeldonModelHeader, test.header) req.Header.Set("contentType", "application/json") resp, err := http.DefaultClient.Do(req) g.Expect(err).To(BeNil()) diff --git a/scheduler/pkg/kafka/pipeline/kafkamanager.go b/scheduler/pkg/kafka/pipeline/kafkamanager.go index 81e793e739..8ec148dd35 100644 --- a/scheduler/pkg/kafka/pipeline/kafkamanager.go +++ b/scheduler/pkg/kafka/pipeline/kafkamanager.go @@ -24,7 +24,6 @@ import ( kafka_config "github.com/seldonio/seldon-core/components/kafka/v2/pkg/config" config_tls "github.com/seldonio/seldon-core/components/tls/v2/pkg/config" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" kafka2 "github.com/seldonio/seldon-core/scheduler/v2/pkg/kafka" seldontracer "github.com/seldonio/seldon-core/scheduler/v2/pkg/tracing" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" @@ -220,7 +219,7 @@ func (km *KafkaManager) Infer( outputTopic = km.topicNamer.GetModelTopicInputs(resourceName) } logger.Debugf("Produce on topic %s with key %s", outputTopic, compositeKey) - kafkaHeaders := append(headers, kafka.Header{Key: resources.SeldonPipelineHeader, Value: []byte(resourceName)}) + kafkaHeaders := append(headers, kafka.Header{Key: util.SeldonPipelineHeader, Value: []byte(resourceName)}) kafkaHeaders = addRequestIdToKafkaHeadersIfMissing(kafkaHeaders, requestId) msg := &kafka.Message{ diff --git a/scheduler/pkg/kafka/pipeline/status/model_rest.go b/scheduler/pkg/kafka/pipeline/status/model_rest.go index 5e2b5c1761..8f8822587c 100644 --- a/scheduler/pkg/kafka/pipeline/status/model_rest.go +++ b/scheduler/pkg/kafka/pipeline/status/model_rest.go @@ -19,7 +19,6 @@ import ( "github.com/sirupsen/logrus" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -66,7 +65,7 @@ func (mr *ModelRestCaller) CheckModelReady(ctx context.Context, modelName string if err != nil { return false, err } - req.Header.Set(resources.SeldonModelHeader, modelName) + req.Header.Set(util.SeldonModelHeader, modelName) req.Header.Set(util.RequestIdHeader, requestId) response, err := mr.httpClient.Do(req) if err != nil { diff --git a/scheduler/pkg/kafka/pipeline/utils.go b/scheduler/pkg/kafka/pipeline/utils.go index 3941f91876..fe6e0caee7 100644 --- a/scheduler/pkg/kafka/pipeline/utils.go +++ b/scheduler/pkg/kafka/pipeline/utils.go @@ -17,7 +17,6 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "google.golang.org/grpc/metadata" - "github.com/seldonio/seldon-core/scheduler/v2/pkg/envoy/resources" "github.com/seldonio/seldon-core/scheduler/v2/pkg/util" ) @@ -30,14 +29,14 @@ func createResourceNameFromHeader(header string) (string, bool, error) { } case 2: switch parts[1] { - case resources.SeldonPipelineHeaderSuffix: + case util.SeldonPipelineHeaderSuffix: return parts[0], false, nil - case resources.SeldonModelHeaderSuffix: + case util.SeldonModelHeaderSuffix: return parts[0], true, nil } } return "", false, fmt.Errorf( - "Bad or missing header %s %s", resources.SeldonModelHeader, header) + "Bad or missing header %s %s", util.SeldonModelHeader, header) } func addRequestIdToKafkaHeadersIfMissing(headers []kafka.Header, requestId string) []kafka.Header { @@ -57,7 +56,7 @@ func convertHttpHeadersToKafkaHeaders(httpHeaders http.Header) []kafka.Header { var kafkaHeaders []kafka.Header for k, vals := range httpHeaders { key := strings.ToLower(k) - if strings.HasPrefix(key, resources.ExternalHeaderPrefix) { + if strings.HasPrefix(key, util.ExternalHeaderPrefix) { for _, headerValue := range vals { kafkaHeaders = append(kafkaHeaders, kafka.Header{Key: key, Value: []byte(headerValue)}) } @@ -69,7 +68,7 @@ func convertHttpHeadersToKafkaHeaders(httpHeaders http.Header) []kafka.Header { func convertKafkaHeadersToHttpHeaders(kafkaHeaders []kafka.Header) http.Header { httpHeaders := make(http.Header) for _, kafkaHeader := range kafkaHeaders { - if strings.HasPrefix(strings.ToLower(kafkaHeader.Key), resources.ExternalHeaderPrefix) { + if strings.HasPrefix(strings.ToLower(kafkaHeader.Key), util.ExternalHeaderPrefix) { if val, ok := httpHeaders[kafkaHeader.Key]; ok { val = append(val, string(kafkaHeader.Value)) httpHeaders[kafkaHeader.Key] = val @@ -84,7 +83,7 @@ func convertKafkaHeadersToHttpHeaders(kafkaHeaders []kafka.Header) http.Header { func convertGrpcMetadataToKafkaHeaders(grpcMetadata metadata.MD) []kafka.Header { var kafkaHeaders []kafka.Header for k, vals := range grpcMetadata { - if strings.HasPrefix(strings.ToLower(k), resources.ExternalHeaderPrefix) { + if strings.HasPrefix(strings.ToLower(k), util.ExternalHeaderPrefix) { for _, headerValue := range vals { kafkaHeaders = append(kafkaHeaders, kafka.Header{Key: k, Value: []byte(headerValue)}) } @@ -96,7 +95,7 @@ func convertGrpcMetadataToKafkaHeaders(grpcMetadata metadata.MD) []kafka.Header func convertKafkaHeadersToGrpcMetadata(kafkaHeaders []kafka.Header) metadata.MD { grpcMetadata := make(metadata.MD) for _, kafkaHeader := range kafkaHeaders { - if strings.HasPrefix(strings.ToLower(kafkaHeader.Key), resources.ExternalHeaderPrefix) { + if strings.HasPrefix(strings.ToLower(kafkaHeader.Key), util.ExternalHeaderPrefix) { if val, ok := grpcMetadata[kafkaHeader.Key]; ok { val = append(val, string(kafkaHeader.Value)) grpcMetadata[kafkaHeader.Key] = val diff --git a/scheduler/pkg/util/constants.go b/scheduler/pkg/util/constants.go index ea837db8ce..3f61402f06 100644 --- a/scheduler/pkg/util/constants.go +++ b/scheduler/pkg/util/constants.go @@ -13,6 +13,18 @@ import ( "time" ) +// Headers +const ( + SeldonModelHeader = "seldon-model" + SeldonPipelineHeader = "pipeline" + SeldonInternalModelHeader = "seldon-internal-model" + SeldonLoggingHeader = "Seldon-Logging" + SeldonRouteHeader = "x-seldon-route" + ExternalHeaderPrefix = "x-" + SeldonModelHeaderSuffix = "model" + SeldonPipelineHeaderSuffix = "pipeline" +) + // REST const ( DefaultReverseProxyHTTPPort = 9999