Skip to content

Commit

Permalink
Weighted routing logic change
Browse files Browse the repository at this point in the history
  • Loading branch information
vinupa committed Mar 2, 2025
1 parent 0184bec commit 320a005
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 96 deletions.
23 changes: 2 additions & 21 deletions adapter/internal/oasparser/envoyconf/routes_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,9 @@ func generateRouteMatch(routeRegex string) *routev3.RouteMatch {
return match
}

func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, ratelimitCriteria *ratelimitCriteria, mirrorClusterNames []string, isBackendBasedAIRatelimitEnabled bool, descriptorValueForBackendBasedAIRatelimit string, weightedCluster *routev3.WeightedCluster_ClusterWeight, isWeighted bool) (action *routev3.Route_Route) {
func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, ratelimitCriteria *ratelimitCriteria, mirrorClusterNames []string, isBackendBasedAIRatelimitEnabled bool, descriptorValueForBackendBasedAIRatelimit string, weightedCluster *routev3.WeightedCluster, isWeighted bool) (action *routev3.Route_Route) {

if isWeighted {
// check if weightedCluster is already in the list
exists := false
for i, existingWeightedCluster := range weightedClusters {
if existingWeightedCluster.Name == weightedCluster.Name {
if existingWeightedCluster.Weight.GetValue() == weightedCluster.Weight.GetValue() {
exists = true
} else {
// Remove the existing entry with the same name but different weight
weightedClusters = append(weightedClusters[:i], weightedClusters[i+1:]...)
}
}
}

// if not existing, add to the list
if !exists {
weightedClusters = append(weightedClusters, weightedCluster)
}
action = &routev3.Route_Route{
Route: &routev3.RouteAction{
HostRewriteSpecifier: &routev3.RouteAction_AutoHostRewrite{
Expand All @@ -157,9 +140,7 @@ func generateRouteAction(apiType string, routeConfig *model.EndpointConfig, rate
UpgradeConfigs: getUpgradeConfig(apiType),
MaxStreamDuration: getMaxStreamDuration(apiType),
ClusterSpecifier: &routev3.RouteAction_WeightedClusters{
WeightedClusters: &routev3.WeightedCluster{
Clusters: weightedClusters,
},
WeightedClusters: weightedCluster,
},
},
}
Expand Down
192 changes: 121 additions & 71 deletions adapter/internal/oasparser/envoyconf/routes_with_clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,59 +263,33 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
mirrorClusterNames := map[string][]string{}
resourcePath := resource.GetPath()
endpoint := resource.GetEndpoints()

allWeightsAreOne := true
for _, ep := range endpoint.Endpoints {
basePath := ""
basePath = strings.TrimSuffix(ep.Basepath, "/")

//existingClusterName := getExistingClusterName(*endpoint, processedEndpoints)

clusterName = getClusterName(endpoint.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, ep.Host+ep.Basepath)
cluster, address, err := processEndpoints(clusterName, endpoint, timeout, basePath, &ep)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, cluster...)
endpoints = append(endpoints, address...)
if ep.Weight != 1 {
allWeightsAreOne = false
break
}

// Creating clusters for request mirroring endpoints
for _, op := range resource.GetOperations() {
if op.GetMirrorEndpointClusters() != nil && len(op.GetMirrorEndpointClusters()) > 0 {
mirrorEndpointClusters := op.GetMirrorEndpointClusters()
for _, mirrorEndpointCluster := range mirrorEndpointClusters {
for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints {
mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/")
existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints)
var mirrorClusterName string
if existingMirrorClusterName == "" {
mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, mirrorEndpoint.Host+mirrorEndpoint.Basepath)
mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath, &mirrorEndpoint)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, mirrorCluster...)
endpoints = append(endpoints, mirrorAddress...)
processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster
}
} else {
mirrorClusterName = existingMirrorClusterName
}
if _, exists := mirrorClusterNames[op.GetID()]; !exists {
mirrorClusterNames[op.GetID()] = []string{}
}
mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName)
}
}
}
}
if len(endpoint.Endpoints) > 1 && !allWeightsAreOne {
logger.LoggerOasparser.Infof("Multiple endpoints detected with weights that are not all equal to 1 for resource: %s", resourcePath)
basePath := ""
basePath = strings.TrimSuffix(endpoint.Endpoints[0].Basepath, "/")

for _, ep := range endpoint.Endpoints {
clusterName = getClusterName(endpoint.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, ep.Host+ep.Basepath)
cluster, address, err := processEndpoints(clusterName, endpoint, timeout, basePath, &ep)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, cluster...)
endpoints = append(endpoints, address...)
}
}

// Create resource level interceptor clusters if required
clustersI, endpointsI, operationalReqInterceptors, operationalRespInterceptorVal := createInterceptorResourceClusters(adapterInternalAPI,
interceptorCerts, vHost, organizationID, apiRequestInterceptor, apiResponseInterceptor, resource)
clusters = append(clusters, clustersI...)
endpoints = append(endpoints, endpointsI...)
routeParams := genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, false, mirrorClusterNames)
clusterName = getClusterName(endpoint.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, endpoint.Endpoints[0].Host+endpoint.Endpoints[0].Basepath)
routeParams := genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, nil, nil, organizationID,
false, false, nil)

routeP, err := createRoutes(routeParams)
if err != nil {
Expand All @@ -326,15 +300,88 @@ func CreateRoutesWithClusters(adapterInternalAPI *model.AdapterInternalAPI, inte
}
routes = append(routes, routeP...)
if adapterInternalAPI.IsDefaultVersion {
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, true, mirrorClusterNames))
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, nil, nil, organizationID,
false, false, nil))
if errDefaultPath != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR, "Error while creating routes for API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), removeFirstOccurrence(resource.GetPath(), adapterInternalAPI.GetVersion()), errDefaultPath.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", errDefaultPath)
}
routes = append(routes, defaultRoutes...)
}
}
} else {
logger.LoggerOasparser.Infof("Single endpoint or mulitple endpoints all with weight equal to 1 detected for resource: %s", resourcePath)
for _, ep := range endpoint.Endpoints {
basePath := ""
basePath = strings.TrimSuffix(ep.Basepath, "/")

//existingClusterName := getExistingClusterName(*endpoint, processedEndpoints)

clusterName = getClusterName(endpoint.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, ep.Host+ep.Basepath)
cluster, address, err := processEndpoints(clusterName, endpoint, timeout, basePath, &ep)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, cluster...)
endpoints = append(endpoints, address...)
}

// Creating clusters for request mirroring endpoints
for _, op := range resource.GetOperations() {
if op.GetMirrorEndpointClusters() != nil && len(op.GetMirrorEndpointClusters()) > 0 {
mirrorEndpointClusters := op.GetMirrorEndpointClusters()
for _, mirrorEndpointCluster := range mirrorEndpointClusters {
for _, mirrorEndpoint := range mirrorEndpointCluster.Endpoints {
mirrorBasepath := strings.TrimSuffix(mirrorEndpoint.Basepath, "/")
existingMirrorClusterName := getExistingClusterName(*mirrorEndpointCluster, processedEndpoints)
var mirrorClusterName string
if existingMirrorClusterName == "" {
mirrorClusterName = getClusterName(mirrorEndpointCluster.EndpointPrefix, organizationID, vHost, adapterInternalAPI.GetTitle(), apiVersion, mirrorEndpoint.Host+mirrorEndpoint.Basepath)
mirrorCluster, mirrorAddress, err := processEndpoints(mirrorClusterName, mirrorEndpointCluster, timeout, mirrorBasepath, &mirrorEndpoint)
if err != nil {
logger.LoggerOasparser.ErrorC(logging.PrintError(logging.Error2239, logging.MAJOR, "Error while adding resource level mirror filter endpoints for %s:%v-%v. %v", apiTitle, apiVersion, resourcePath, err.Error()))
} else {
clusters = append(clusters, mirrorCluster...)
endpoints = append(endpoints, mirrorAddress...)
processedEndpoints[mirrorClusterName] = *mirrorEndpointCluster
}
} else {
mirrorClusterName = existingMirrorClusterName
}
if _, exists := mirrorClusterNames[op.GetID()]; !exists {
mirrorClusterNames[op.GetID()] = []string{}
}
mirrorClusterNames[op.GetID()] = append(mirrorClusterNames[op.GetID()], mirrorClusterName)
}
}
}
}

// Create resource level interceptor clusters if required
clustersI, endpointsI, operationalReqInterceptors, operationalRespInterceptorVal := createInterceptorResourceClusters(adapterInternalAPI,
interceptorCerts, vHost, organizationID, apiRequestInterceptor, apiResponseInterceptor, resource)
clusters = append(clusters, clustersI...)
endpoints = append(endpoints, endpointsI...)
routeParams := genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, false, mirrorClusterNames)

routeP, err := createRoutes(routeParams)
if err != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR,
"Error while creating routes for API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(),
adapterInternalAPI.GetVersion(), resource.GetPath(), err.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", err)
}
routes = append(routes, routeP...)
if adapterInternalAPI.IsDefaultVersion {
defaultRoutes, errDefaultPath := createRoutes(genRouteCreateParams(adapterInternalAPI, resource, vHost, basePath, clusterName, *operationalReqInterceptors, *operationalRespInterceptorVal, organizationID,
false, true, mirrorClusterNames))
if errDefaultPath != nil {
logger.LoggerXds.ErrorC(logging.PrintError(logging.Error2231, logging.MAJOR, "Error while creating routes for API %s %s for path: %s Error: %s", adapterInternalAPI.GetTitle(), adapterInternalAPI.GetVersion(), removeFirstOccurrence(resource.GetPath(), adapterInternalAPI.GetVersion()), errDefaultPath.Error()))
return nil, nil, nil, fmt.Errorf("error while creating routes. %v", errDefaultPath)
}
routes = append(routes, defaultRoutes...)
}
}
}
}

Expand Down Expand Up @@ -1108,39 +1155,42 @@ func createRoutes(params *routeCreateParams) (routes []*routev3.Route, err error
}
routeConfig := resource.GetEndpoints().Config

// Extract weighted clusters if they are present
// Extract Weighted Cluster if present
var isWeightedClusters = false
routeEndpoints := resource.GetEndpoints().Endpoints
var weightedCluster routev3.WeightedCluster_ClusterWeight
routeEndpoints := resource.GetEndpoints()
var weightedCluster routev3.WeightedCluster

// Weightmap is used in this case to check if the weights of the endpoints are different from each other.
// If the weights are different, then the weighted cluster configuration would be created.
// But this would ignore scenarios when the weights are the same.
// This logic would need to be modified in order to handle scenarios where the weights of all the endpoints are the same
weightMap := make(map[int32]bool)
for _, endpoint := range routeEndpoints {
for _, endpoint := range routeEndpoints.Endpoints {
weightMap[endpoint.Weight] = true
if len(weightMap) > 1 {
isWeightedClusters = true
break
}
}

// If weighted clusters are present, create the weighted cluster configuration
// Check whether all weights are equal to one
// If all weights are equal to one, then weighted cluster configuration is not applied
allWeightsAreEqualtoOne := true
for _, endpoint := range routeEndpoints.Endpoints {
if endpoint.Weight != 1 {
allWeightsAreEqualtoOne = false
break
}
}
if !allWeightsAreEqualtoOne && len(routeEndpoints.Endpoints) > 1 {
isWeightedClusters = true
}
if isWeightedClusters {
// Extract the host part from the clusterName
parts := strings.Split(clusterName, "_")
hostPart := parts[len(parts)-1]
// Find the matching endpoint and get its weight
var weight uint32
for _, endpoint := range routeEndpoints {
if strings.Contains(hostPart, endpoint.Host) {
weight = uint32(endpoint.Weight)
break
}
for _, endpoint := range routeEndpoints.Endpoints {
clusterName := getClusterName(routeEndpoints.EndpointPrefix, params.organizationID, params.vHost, params.title, params.version, endpoint.Host+params.endpointBasePath)
weightedCluster.Clusters = append(weightedCluster.Clusters, &routev3.WeightedCluster_ClusterWeight{
Name: clusterName,
Weight: &wrapperspb.UInt32Value{Value: uint32(endpoint.Weight)},
})
}
weightedCluster.Name = clusterName
weightedCluster.Weight = &wrapperspb.UInt32Value{Value: weight}
}

// } else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,11 @@ func TestCreateRoutesWithClustersWithMultiplePathPrefixRules(t *testing.T) {
assert.Equal(t, uint32(8081), userServiceClusterPort1, "User Service Cluster's second endpoint port is incorrect.")
assert.Equal(t, uint32(0), userServiceClusterPriority1, "API Level Cluster's second endpoint Priority is incorrect.")

assert.Equal(t, 29, len(routes), "Created number of routes are incorrect.")
assert.Equal(t, 15, len(routes), "Created number of routes are incorrect.") // Changed expected number of routes from 29 to 15
assert.Contains(t, []string{"^/test-api/1\\.0\\.0/orders((?:/.*)*)"}, routes[2].GetMatch().GetSafeRegex().Regex)
assert.Contains(t, []string{"^/test-api/1\\.0\\.0/users((?:/.*)*)"}, routes[18].GetMatch().GetSafeRegex().Regex)
assert.NotEqual(t, routes[1].GetMatch().GetSafeRegex().Regex, routes[17].GetMatch().GetSafeRegex().Regex,
"The route regex for the two paths should not be the same")
assert.Contains(t, []string{"^/test-api/1\\.0\\.0/users((?:/.*)*)"}, routes[14].GetMatch().GetSafeRegex().Regex) // Changed expected route index from 28 to 14
assert.NotEqual(t, routes[1].GetMatch().GetSafeRegex().Regex, routes[12].GetMatch().GetSafeRegex().Regex,
"The route regex for the two paths should not be the same") // Changed expected route index from 17 to 12
}

func TestCreateRoutesWithClustersWithBackendTLSConfigs(t *testing.T) {
Expand Down

0 comments on commit 320a005

Please sign in to comment.