Skip to content

Commit

Permalink
rfac: Improve cron scaler (#6166)
Browse files Browse the repository at this point in the history
Signed-off-by: rickbrouwer <[email protected]>
  • Loading branch information
rickbrouwer authored Oct 21, 2024
1 parent 1a49f5f commit cf4026f
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 56 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ New deprecation(s):

### Other

- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))
- **Cron scaler**: Simplify cron scaler code ([#6056](https://github.com/kedacore/keda/issues/6056))

## v2.15.1

Expand Down
92 changes: 38 additions & 54 deletions pkg/scalers/cron_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ import (
)

const (
defaultDesiredReplicas = 1
cronMetricType = "External"
cronMetricType = "External"
)

type cronScaler struct {
metricType v2.MetricTargetType
metadata cronMetadata
logger logr.Logger
metricType v2.MetricTargetType
metadata cronMetadata
logger logr.Logger
startSchedule cron.Schedule
endSchedule cron.Schedule
}

type cronMetadata struct {
Expand All @@ -35,21 +36,11 @@ type cronMetadata struct {
}

func (m *cronMetadata) Validate() error {
if m.Timezone == "" {
return fmt.Errorf("no timezone specified")
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
if m.Start == "" {
return fmt.Errorf("no start schedule specified")
}
if _, err := parser.Parse(m.Start); err != nil {
return fmt.Errorf("error parsing start schedule: %w", err)
}

if m.End == "" {
return fmt.Errorf("no end schedule specified")
}
if _, err := parser.Parse(m.End); err != nil {
return fmt.Errorf("error parsing end schedule: %w", err)
}
Expand All @@ -65,7 +56,6 @@ func (m *cronMetadata) Validate() error {
return nil
}

// NewCronScaler creates a new cronScaler
func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
metricType, err := GetMetricTargetType(config)
if err != nil {
Expand All @@ -77,25 +67,23 @@ func NewCronScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("error parsing cron metadata: %w", err)
}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)

startSchedule, _ := parser.Parse(meta.Start)
endSchedule, _ := parser.Parse(meta.End)

return &cronScaler{
metricType: metricType,
metadata: meta,
logger: InitializeLogger(config, "cron_scaler"),
metricType: metricType,
metadata: meta,
logger: InitializeLogger(config, "cron_scaler"),
startSchedule: startSchedule,
endSchedule: endSchedule,
}, nil
}

func getCronTime(location *time.Location, spec string) (int64, error) {
c := cron.New(cron.WithLocation(location))
_, err := c.AddFunc(spec, func() { _ = fmt.Sprintf("Cron initialized for location %s", location.String()) })
if err != nil {
return 0, err
}

c.Start()
cronTime := c.Entries()[0].Next.Unix()
c.Stop()

return cronTime, nil
func getCronTime(location *time.Location, schedule cron.Schedule) time.Time {
// Use the pre-parsed cron schedule directly to get the next time
return schedule.Next(time.Now().In(location))
}

func parseCronMetadata(config *scalersconfig.ScalerConfig) (cronMetadata, error) {
Expand Down Expand Up @@ -131,37 +119,33 @@ func (s *cronScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
return []v2.MetricSpec{metricSpec}
}

// GetMetricsAndActivity returns value for a supported metric and an error if there is a problem getting the metric
func (s *cronScaler) GetMetricsAndActivity(_ context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
var defaultDesiredReplicas = int64(defaultDesiredReplicas)

location, err := time.LoadLocation(s.metadata.Timezone)
if err != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone. Error: %w", err)
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("unable to load timezone: %w", err)
}

// Since we are considering the timestamp here and not the exact time, timezone does matter.
currentTime := time.Now().Unix()
currentTime := time.Now().In(location)

nextStartTime, startTimecronErr := getCronTime(location, s.metadata.Start)
if startTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error initializing start cron: %w", startTimecronErr)
}
// Use the pre-parsed schedules to get the next start and end times
nextStartTime := getCronTime(location, s.startSchedule)
nextEndTime := getCronTime(location, s.endSchedule)

isWithinInterval := false

nextEndTime, endTimecronErr := getCronTime(location, s.metadata.End)
if endTimecronErr != nil {
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error intializing end cron: %w", endTimecronErr)
if nextStartTime.Before(nextEndTime) {
// Interval within the same day
isWithinInterval = currentTime.After(nextStartTime) && currentTime.Before(nextEndTime)
} else {
// Interval spans midnight
isWithinInterval = currentTime.After(nextStartTime) || currentTime.Before(nextEndTime)
}

switch {
case nextStartTime < nextEndTime && currentTime < nextStartTime:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
case currentTime <= nextEndTime:
metric := GenerateMetricInMili(metricName, float64(s.metadata.DesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, true, nil
default:
metric := GenerateMetricInMili(metricName, float64(defaultDesiredReplicas))
return []external_metrics.ExternalMetricValue{metric}, false, nil
metricValue := float64(1)
if isWithinInterval {
metricValue = float64(s.metadata.DesiredReplicas)
}

metric := GenerateMetricInMili(metricName, metricValue)
return []external_metrics.ExternalMetricValue{metric}, isWithinInterval, nil
}
14 changes: 13 additions & 1 deletion pkg/scalers/cron_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"

"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
Expand Down Expand Up @@ -121,7 +122,18 @@ func TestCronGetMetricSpecForScaling(t *testing.T) {
if err != nil {
t.Fatal("Could not parse metadata:", err)
}
mockCronScaler := cronScaler{"", meta, logr.Discard()}

parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
startSchedule, _ := parser.Parse(meta.Start)
endSchedule, _ := parser.Parse(meta.End)

mockCronScaler := cronScaler{
metricType: "",
metadata: meta,
logger: logr.Discard(),
startSchedule: startSchedule,
endSchedule: endSchedule,
}

metricSpec := mockCronScaler.GetMetricSpecForScaling(context.Background())
metricName := metricSpec[0].External.Metric.Name
Expand Down

0 comments on commit cf4026f

Please sign in to comment.