Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Selenium Grid scaler exposes sum of pending and ongoing sessions to KDEA #6368

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ Here is an overview of all new **experimental** features:
- **General**: Centralize and improve automaxprocs configuration with proper structured logging ([#5970](https://github.com/kedacore/keda/issues/5970))
- **General**: Paused ScaledObject count is reported correctly after operator restart ([#6321](https://github.com/kedacore/keda/issues/6321))
- **General**: ScaledJobs ready status set to true when recoverred problem ([#6329](https://github.com/kedacore/keda/pull/6329))
- **Selenium Grid Scaler**: Exposes sum of pending and ongoing sessions to KDEA ([#6368](https://github.com/kedacore/keda/pull/6368))

### Deprecations

Expand Down
95 changes: 47 additions & 48 deletions pkg/scalers/selenium_grid_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ type seleniumGridScalerMetadata struct {
BrowserName string `keda:"name=browserName, order=triggerMetadata"`
SessionBrowserName string `keda:"name=sessionBrowserName, order=triggerMetadata, optional"`
ActivationThreshold int64 `keda:"name=activationThreshold, order=triggerMetadata, optional"`
BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, optional, default=latest"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional, default=false"`
PlatformName string `keda:"name=platformName, order=triggerMetadata, optional, default=linux"`
NodeMaxSessions int `keda:"name=nodeMaxSessions, order=triggerMetadata, optional, default=1"`
BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, default=latest"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
PlatformName string `keda:"name=platformName, order=triggerMetadata, default=linux"`
NodeMaxSessions int64 `keda:"name=nodeMaxSessions, order=triggerMetadata, default=1"`

TargetValue int64
}
Expand All @@ -55,9 +55,9 @@ type Data struct {
}

type Grid struct {
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
TotalSlots int `json:"totalSlots"`
SessionCount int64 `json:"sessionCount"`
MaxSession int64 `json:"maxSession"`
TotalSlots int64 `json:"totalSlots"`
}

type NodesInfo struct {
Expand All @@ -71,17 +71,17 @@ type SessionsInfo struct {
type Nodes []struct {
ID string `json:"id"`
Status string `json:"status"`
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
SessionCount int64 `json:"sessionCount"`
MaxSession int64 `json:"maxSession"`
SlotCount int64 `json:"slotCount"`
Stereotypes string `json:"stereotypes"`
Sessions Sessions `json:"sessions"`
}

type ReservedNodes struct {
ID string `json:"id"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
MaxSession int64 `json:"maxSession"`
SlotCount int64 `json:"slotCount"`
}

type Sessions []struct {
Expand All @@ -102,7 +102,7 @@ type Capability struct {
}

type Stereotypes []struct {
Slots int `json:"slots"`
Slots int64 `json:"slots"`
Stereotype Capability `json:"stereotype"`
}

Expand Down Expand Up @@ -148,6 +148,7 @@ func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*selen
if meta.SessionBrowserName == "" {
meta.SessionBrowserName = meta.BrowserName
}

return meta, nil
}

Expand All @@ -160,18 +161,18 @@ func (s *seleniumGridScaler) Close(context.Context) error {
}

func (s *seleniumGridScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
sessions, err := s.getSessionsCount(ctx, s.logger)
newRequestNodes, onGoingSessions, err := s.getSessionsQueueLength(ctx, s.logger)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error requesting selenium grid endpoint: %w", err)
}

metric := GenerateMetricInMili(metricName, float64(sessions))
metric := GenerateMetricInMili(metricName, float64(newRequestNodes+onGoingSessions))

return []external_metrics.ExternalMetricValue{metric}, sessions > s.metadata.ActivationThreshold, nil
return []external_metrics.ExternalMetricValue{metric}, (newRequestNodes + onGoingSessions) > s.metadata.ActivationThreshold, nil
}

func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
metricName := kedautil.NormalizeString(fmt.Sprintf("seleniumgrid-%s", s.metadata.BrowserName))
metricName := kedautil.NormalizeString(fmt.Sprintf("selenium-grid-%s-%s-%s", s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.PlatformName))
externalMetric := &v2.ExternalMetricSource{
Metric: v2.MetricIdentifier{
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),
Expand All @@ -184,18 +185,18 @@ func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.Metri
return []v2.MetricSpec{metricSpec}
}

func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.Logger) (int64, error) {
func (s *seleniumGridScaler) getSessionsQueueLength(ctx context.Context, logger logr.Logger) (int64, int64, error) {
body, err := json.Marshal(map[string]string{
"query": "{ grid { sessionCount, maxSession, totalSlots }, nodesInfo { nodes { id, status, sessionCount, maxSession, slotCount, stereotypes, sessions { id, capabilities, slot { id, stereotype } } } }, sessionsInfo { sessionQueueRequests } }",
})

if err != nil {
return -1, err
return -1, -1, err
}

req, err := http.NewRequestWithContext(ctx, "POST", s.metadata.URL, bytes.NewBuffer(body))
if err != nil {
return -1, err
return -1, -1, err
}

if (s.metadata.AuthType == "" || strings.EqualFold(s.metadata.AuthType, "Basic")) && s.metadata.Username != "" && s.metadata.Password != "" {
Expand All @@ -206,28 +207,28 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L

res, err := s.httpClient.Do(req)
if err != nil {
return -1, err
return -1, -1, err
}

if res.StatusCode != http.StatusOK {
msg := fmt.Sprintf("selenium grid returned %d", res.StatusCode)
return -1, errors.New(msg)
return -1, -1, errors.New(msg)
}

defer res.Body.Close()
b, err := io.ReadAll(res.Body)
if err != nil {
return -1, err
return -1, -1, err
}
v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
newRequestNodes, onGoingSession, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
if err != nil {
return -1, err
return -1, -1, err
}
return v, nil
return newRequestNodes, onGoingSession, nil
}

func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int {
var matchingSlots int
func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int64 {
var matchingSlots int64
for _, stereotype := range stereotypes {
if checkCapabilitiesMatch(stereotype.Stereotype, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSlots += stereotype.Slots
Expand All @@ -236,8 +237,8 @@ func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability,
return matchingSlots
}

func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int {
var matchingSessions int
func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int64 {
var matchingSessions int64
for _, session := range sessions {
var capability = Capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
Expand Down Expand Up @@ -274,7 +275,7 @@ func checkCapabilitiesMatch(capability Capability, requestCapability Capability,
return browserNameMatches && browserVersionMatches && platformNameMatches
}

func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int) int {
func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int64) int64 {
for _, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
return reservedNode.SlotCount
Expand All @@ -283,7 +284,7 @@ func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availa
return availableSlots
}

func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int, maxSession int) []ReservedNodes {
func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int64, maxSession int64) []ReservedNodes {
for i, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
// Update remaining available slots for the reserved node
Expand All @@ -295,17 +296,15 @@ func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotC
return append(reservedNodes, ReservedNodes{ID: nodeID, SlotCount: slotCount, MaxSession: maxSession})
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int, logger logr.Logger) (int64, error) {
// The returned count of the number of new Nodes will be scaled up
var count int64
func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int64, logger logr.Logger) (int64, int64, error) {
// Track number of available slots of existing Nodes in the Grid can be reserved for the matched requests
var availableSlots int
var availableSlots int64
// Track number of matched requests in the sessions queue will be served by this scaler
var queueSlots int
var queueSlots int64

var seleniumResponse = SeleniumResponse{}
if err := json.Unmarshal(b, &seleniumResponse); err != nil {
return 0, err
return 0, 0, err
}

var sessionQueueRequests = seleniumResponse.Data.SessionsInfo.SessionQueueRequests
Expand All @@ -314,6 +313,7 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
var reservedNodes []ReservedNodes
// Track list of new Nodes will be scaled up with number of available slots following scaler parameter `nodeMaxSessions`
var newRequestNodes []ReservedNodes
var onGoingSessions int64
for requestIndex, sessionQueueRequest := range sessionQueueRequests {
var isRequestMatched bool
var requestCapability = Capability{}
Expand All @@ -332,20 +332,22 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}

var isRequestReserved bool
var sumOfCurrentSessionsMatch int64
// Check if the matched request can be assigned to available slots of existing Nodes in the Grid
for _, node := range nodes {
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
sumOfCurrentSessionsMatch += currentSessionsMatch
// Check if node is UP and has available slots (maxSession > sessionCount)
if strings.EqualFold(node.Status, "UP") && checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) > 0 {
var stereotypes = Stereotypes{}
var availableSlotsMatch int
var availableSlotsMatch int64
if err := json.Unmarshal([]byte(node.Stereotypes), &stereotypes); err == nil {
// Count available slots that match the request capability and scaler metadata
availableSlotsMatch += countMatchingSlotsStereotypes(stereotypes, requestCapability, browserName, browserVersion, sessionBrowserName, platformName)
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling node stereotypes: %s", err))
}
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
// Count remaining available slots can be reserved for this request
var availableSlotsCanBeReserved = checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount)
// Reserve one available slot for the request if available slots match is greater than current sessions match
Expand All @@ -357,6 +359,9 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}
}
}
if sumOfCurrentSessionsMatch > onGoingSessions {
onGoingSessions = sumOfCurrentSessionsMatch
}
// Check if the matched request can be assigned to available slots of new Nodes will be scaled up, since the scaler parameter `nodeMaxSessions` can be greater than 1
if !isRequestReserved {
for _, newRequestNode := range newRequestNodes {
Expand All @@ -373,11 +378,5 @@ func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion s
}
}

if queueSlots > availableSlots {
count = int64(len(newRequestNodes))
} else {
count = 0
}

return count, nil
return int64(len(newRequestNodes)), onGoingSessions, nil
}
Loading