Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changes rate limiter #549

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions db/changes_rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package db

import (
"sync"
"time"

"github.com/flanksource/commons/collections"
sw "github.com/flanksource/slidingwindow"

"github.com/flanksource/config-db/api"
"github.com/flanksource/config-db/db/models"
)

const (
rateLimitWindow = time.Hour * 4
maxChangesInWindow = 100

ChangeTypeTooManyChanges = "TooManyChanges"
)

var (
scraperLocks = sync.Map{}
configRateLimiters = map[string]*sw.Limiter{}

// List of configs that are currently in being rate limited.
// It's used to avoid inserting multiple "TooManyChanges" changes for the same config.
currentlyRateLimitedConfigsPerScraper = sync.Map{}
)

func rateLimitChanges(ctx api.ScrapeContext, newChanges []*models.ConfigChange) ([]*models.ConfigChange, []string, error) {
if len(newChanges) == 0 || ctx.ScrapeConfig().GetPersistedID() == nil {
return newChanges, nil, nil
}

window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
scraperID := ctx.ScrapeConfig().GetPersistedID().String()

lock, loaded := scraperLocks.LoadOrStore(scraperID, &sync.Mutex{})
lock.(*sync.Mutex).Lock()
defer lock.(*sync.Mutex).Unlock()

_rateLimitedConfigs, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(scraperID, map[string]struct{}{})
rateLimitedConfigs := _rateLimitedConfigs.(map[string]struct{})

if !loaded {
// This is the initial sync of the rate limiter with the database.
// Happens only once for each scrape config.
if err := syncWindow(ctx, max, window); err != nil {
return nil, nil, err
}

if rlConfigs, err := syncCurrentlyRateLimitedConfigs(ctx, window); err != nil {
return nil, nil, err
} else {
rateLimitedConfigs = rlConfigs
}
}

var rateLimitedConfigsThisRun = make(map[string]struct{})
var passingNewChanges = make([]*models.ConfigChange, 0, len(newChanges))
for _, change := range newChanges {
if _, ok := rateLimitedConfigs[change.ConfigID]; ok {
rateLimitedConfigsThisRun[change.ConfigID] = struct{}{}
} else {
passingNewChanges = append(passingNewChanges, change)
}
}

// Find those changes that were rate limited only this run but
// weren't previously in the rate limited state.
var newlyRateLimited []string
for configID := range rateLimitedConfigsThisRun {
if _, ok := rateLimitedConfigs[configID]; !ok {
newlyRateLimited = append(newlyRateLimited, configID)
}
}

rateLimitedConfigs = collections.MergeMap(rateLimitedConfigs, rateLimitedConfigsThisRun)
currentlyRateLimitedConfigsPerScraper.Store(scraperID, rateLimitedConfigs)

return passingNewChanges, newlyRateLimited, nil
}

func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration) (map[string]struct{}, error) {
query := `WITH latest_changes AS (
SELECT
DISTINCT ON (config_id) config_id,
change_type
FROM
config_changes
LEFT JOIN config_items ON config_items.id = config_changes.config_id
WHERE
scraper_id = ?
AND NOW() - config_changes.created_at <= ?
ORDER BY
config_id,
config_changes.created_at DESC
) SELECT config_id FROM latest_changes WHERE change_type = ?`
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), window, ChangeTypeTooManyChanges).Rows()
if err != nil {
return nil, err
}
defer rows.Close()

output := make(map[string]struct{})
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}

ctx.Logger.V(3).Infof("config %s is currently found to be rate limited", id)
output[id] = struct{}{}
}

return output, rows.Err()
}

// syncWindow syncs the rate limit window for the scraper with the changes in the db.
func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
query := `SELECT
config_id,
COUNT(*),
MIN(config_changes.created_at) AS min_created_at
FROM
config_changes
LEFT JOIN config_items ON config_items.id = config_changes.config_id
WHERE
scraper_id = ?
AND change_type != ?
AND NOW() - config_changes.created_at <= ?
GROUP BY
config_id`
rows, err := ctx.DB().Raw(query, ctx.ScrapeConfig().GetPersistedID(), ChangeTypeTooManyChanges, window).Rows()
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var configID string
var count int
var earliest time.Time
if err := rows.Scan(&configID, &count, &earliest); err != nil {
return err
}
ctx.Logger.V(3).Infof("config %s has %d changes in the last %s", configID, count, window)

rateLimiter, _ := sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
win, stopper := sw.NewLocalWindow()
if count > 0 {
win.SetStart(earliest)
win.AddCount(int64(count))
}
return win, stopper
})
configRateLimiters[configID] = rateLimiter
}

return rows.Err()
}
9 changes: 2 additions & 7 deletions db/models/config_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

v1 "github.com/flanksource/config-db/api/v1"
"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
Expand All @@ -15,8 +14,8 @@ type ConfigChange struct {
ExternalID string `gorm:"-"`
ConfigType string `gorm:"-"`
ExternalChangeId string `gorm:"column:external_change_id" json:"external_change_id"`
ID string `gorm:"primaryKey;unique_index;not null;column:id" json:"id"`
ConfigID string `gorm:"column:config_id;default:''" json:"config_id"`
ID string `gorm:"primaryKey;unique_index;not null;column:id;default:generate_ulid()" json:"id"`
ConfigID string `gorm:"column:config_id" json:"config_id"`
ChangeType string `gorm:"column:change_type" json:"change_type"`
Diff *string `gorm:"column:diff" json:"diff,omitempty"`
Severity string `gorm:"column:severity" json:"severity"`
Expand Down Expand Up @@ -62,10 +61,6 @@ func NewConfigChangeFromV1(result v1.ScrapeResult, change v1.ChangeResult) *Conf
}

func (c *ConfigChange) BeforeCreate(tx *gorm.DB) (err error) {
if c.ID == "" {
c.ID = uuid.New().String()
}

tx.Statement.AddClause(clause.OnConflict{DoNothing: true})
return
}
92 changes: 89 additions & 3 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"slices"
"strconv"
"strings"
"sync"
"time"

"github.com/aws/smithy-go/ptr"
Expand All @@ -21,6 +22,7 @@ import (
dutyContext "github.com/flanksource/duty/context"
dutyModels "github.com/flanksource/duty/models"
"github.com/flanksource/gomplate/v3"
sw "github.com/flanksource/slidingwindow"
"github.com/google/uuid"
"github.com/hexops/gotextdiff"
"github.com/hexops/gotextdiff/myers"
Expand All @@ -31,7 +33,11 @@ import (
"gorm.io/gorm/clause"
)

const configItemsBulkInsertSize = 200
const (
configItemsBulkInsertSize = 200

configChangesBulkInsertSize = 200
)

func deleteChangeHandler(ctx api.ScrapeContext, change v1.ChangeResult) error {
var deletedAt interface{}
Expand Down Expand Up @@ -356,8 +362,67 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
return fmt.Errorf("failed to update last scraped time: %w", err)
}

if err := ctx.DB().CreateInBatches(newChanges, configItemsBulkInsertSize).Error; err != nil {
return fmt.Errorf("failed to create config changes: %w", err)
newChanges, rateLimitedThisRun, err := rateLimitChanges(ctx, newChanges)
if err != nil {
return fmt.Errorf("failed to rate limit changes: %w", err)
}

// NOTE: Returning just the needed columns didn't work in gorm for some reason.
// So, returning * for now.
// returnClause := clause.Returning{Columns: []clause.Column{{Name: "id"}, {Name: "config_id"}, {Name: "external_change_id"}}}
returnClause := clause.Returning{}

// NOTE: Ran into a weird gorm behavior where CreateInBatches combined with a return clause
// panics. So, handling batching manually.
var rateLimitedAfterInsertion = map[string]struct{}{}
for _, batch := range lo.Chunk(newChanges, configChangesBulkInsertSize) {
if err := ctx.DB().Clauses(returnClause).Create(&batch).Error; err != nil {
return fmt.Errorf("failed to create config changes: %w", err)
}

if len(batch) != 0 {
// the `batch` slice now only contains those changes that were actually inserted.
// we increase the usage count on the rate limiter for those changes.
_l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{})
lock := _l.(*sync.Mutex)
lock.Lock()
for _, b := range batch {
rateLimiter, ok := configRateLimiters[b.ConfigID]
if !ok {
window := ctx.Properties().Duration("changes.max.window", rateLimitWindow)
max := ctx.Properties().Int("changes.max.count", maxChangesInWindow)
rateLimiter, _ = sw.NewLimiter(window, int64(max), func() (sw.Window, sw.StopFunc) {
return sw.NewLocalWindow()
})
configRateLimiters[b.ConfigID] = rateLimiter
}

if rateLimiter.Allow() {
rateLimitedAfterInsertion[b.ConfigID] = struct{}{}
}
}
lock.Unlock()
}
}

syncCurrentlyRatelimitedConfigMap(ctx, newChanges, rateLimitedAfterInsertion)

// For all the rate limited configs, we add a new "TooManyChanges" change.
// This is intentionally inserted in a different batch from the new changes
// as "ChangeTypeTooManyChanges" will have the same created_at timestamp.
// We want these changes to be newer than the actual changes.
var rateLimitedChanges []*models.ConfigChange
for _, configID := range rateLimitedThisRun {
rateLimitedChanges = append(rateLimitedChanges, &models.ConfigChange{
ConfigID: configID,
Summary: "Changes on this config has been rate limited",
ChangeType: ChangeTypeTooManyChanges,
ExternalChangeId: uuid.New().String(),
})
}

if err := ctx.DB().CreateInBatches(rateLimitedChanges, configChangesBulkInsertSize).Error; err != nil {
return fmt.Errorf("failed to create rate limited config changes: %w", err)
}

if len(changesToUpdate) != 0 {
Expand Down Expand Up @@ -411,6 +476,27 @@ func SaveResults(ctx api.ScrapeContext, results []v1.ScrapeResult) error {
return nil
}

func syncCurrentlyRatelimitedConfigMap(ctx api.ScrapeContext, insertedChanged []*models.ConfigChange, rateLimitedAfterInsertion map[string]struct{}) {
_l, _ := scraperLocks.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), &sync.Mutex{})
lock := _l.(*sync.Mutex)
lock.Lock()

m, _ := currentlyRateLimitedConfigsPerScraper.LoadOrStore(ctx.ScrapeConfig().GetPersistedID().String(), map[string]struct{}{})
mm := m.(map[string]struct{})
for _, c := range insertedChanged {
if _, ok := rateLimitedAfterInsertion[c.ConfigID]; !ok {
// All the configs that weren't rate limited, will need to be removed from the
// "currently rate limited" map
delete(mm, c.ConfigID)
} else {
mm[c.ConfigID] = struct{}{}
}
}

currentlyRateLimitedConfigsPerScraper.Store(ctx.ScrapeConfig().GetPersistedID().String(), mm)
lock.Unlock()
}

func updateLastScrapedTime(ctx api.ScrapeContext, ids []string) error {
if len(ids) == 0 {
return nil
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ require (
github.com/flanksource/is-healthy v1.0.7
github.com/flanksource/ketall v1.1.6
github.com/flanksource/mapstructure v1.6.0
github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b
github.com/go-logr/zapr v1.2.4
github.com/gobwas/glob v0.2.3
github.com/gomarkdown/markdown v0.0.0-20230322041520-c84983bdbf2a
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ github.com/flanksource/mapstructure v1.6.0 h1:+1kJ+QsO1SxjAgktfLlpZXetsVSJ0uCLhG
github.com/flanksource/mapstructure v1.6.0/go.mod h1:dttg5+FFE2sp4D/CrcPCVqufNDrBggDaM+08nk5S8Ps=
github.com/flanksource/postq v0.1.3 h1:eTslG04hwxAvntZv8gIUsXKQPLGeLiRPNkZC+kQdL7c=
github.com/flanksource/postq v0.1.3/go.mod h1:AAuaPRhpqxvyF7JPs8X1NMsJVenh80ldpJPDVgWvFf8=
github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b h1:zB2nVrRAUgrZQb2eutgKzWd6ld7syPacrY/XQmz7Wks=
github.com/flanksource/slidingwindow v0.0.0-20240526171711-1e13c04e057b/go.mod h1:+hHPT8Yx+8I6i4SF6WwvwRso532uHlPJ1T029dtHFak=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
Expand Down
Loading