Skip to content

Commit

Permalink
improve config_changes lookup for existing ones
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe committed May 17, 2024
1 parent fe21c62 commit defccec
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 41 deletions.
32 changes: 0 additions & 32 deletions api/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,38 +78,6 @@ func (t TempCache) Insert(item models.ConfigItem) {
t.items[strings.ToLower(item.ID)] = item
}

func (t TempCache) IsChangePersisted(configID, externalChangeID string) (bool, error) {
if configID == "" || externalChangeID == "" {
return false, nil
}

configID = strings.ToLower(configID)
externalChangeID = strings.ToLower(externalChangeID)

if t.changes == nil {
t.changes = make(map[string]struct{})
}

if _, ok := t.changes[configID+externalChangeID]; ok {
return true, nil
}

var result models.ConfigChange
if err := t.ctx.DB().Select("id").Where("config_id = ?", configID).
Where("external_change_id = ?", externalChangeID).
Limit(1).
Find(&result).Error; err != nil {
return false, err
}

if result.ID != "" {
t.changes[configID+externalChangeID] = struct{}{}
return true, nil
}

return false, nil
}

func (t TempCache) Get(id string) (*models.ConfigItem, error) {
id = strings.ToLower(id)
if id == "" {
Expand Down
57 changes: 57 additions & 0 deletions db/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"time"

sw "github.com/RussellLuo/slidingwindow"
"github.com/patrickmn/go-cache"
"github.com/samber/lo"

"github.com/flanksource/commons/collections"
"github.com/flanksource/config-db/api"
Expand All @@ -19,6 +21,8 @@ const (
ChangeTypeTooManyChanges = "TooManyChanges"
)

var configChangesCache = cache.New(time.Hour*24, time.Hour*24)

func GetWorkflowRunCount(ctx api.ScrapeContext, workflowID string) (int64, error) {
var count int64
err := ctx.DB().Table("config_changes").
Expand Down Expand Up @@ -122,6 +126,7 @@ func syncCurrentlyRateLimitedConfigs(ctx api.ScrapeContext, window time.Duration
if err != nil {
return nil, err
}
defer rows.Close()

output := make(map[string]struct{})
for rows.Next() {
Expand Down Expand Up @@ -156,6 +161,7 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {
if err != nil {
return err
}
defer rows.Close()

for rows.Next() {
var configID string
Expand All @@ -179,3 +185,54 @@ func syncWindow(ctx api.ScrapeContext, max int, window time.Duration) error {

return rows.Err()
}

// filterOutPersistedChanges returns only those changes that weren't seen in the db.
func filterOutPersistedChanges(ctx api.ScrapeContext, changes []*models.ConfigChange) ([]*models.ConfigChange, error) {
// use cache to filter out ones that we've already seen before
changes = lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
_, found := configChangesCache.Get(c.ConfigID + c.ExternalChangeId)
if found {
_ = found
}
return !found
})

if len(changes) == 0 {
return nil, nil
}

query := `SELECT config_id, external_change_id
FROM config_changes
WHERE (config_id, external_change_id) IN ?`
args := lo.Map(changes, func(c *models.ConfigChange, _ int) []string {
return []string{c.ConfigID, c.ExternalChangeId}
})

rows, err := ctx.DB().Raw(query, args).Rows()
if err != nil {
return nil, err
}
defer rows.Close()

existing := make(map[string]struct{})
for rows.Next() {
var configID, externalChangeID string
if err := rows.Scan(&configID, &externalChangeID); err != nil {
return nil, err
}

configChangesCache.SetDefault(configID+externalChangeID, struct{}{})
existing[configID+externalChangeID] = struct{}{}
}

newOnes := lo.Filter(changes, func(c *models.ConfigChange, _ int) bool {
_, found := existing[c.ConfigID+c.ExternalChangeId]
return !found
})

if len(newOnes) > 0 {
_ = query
}

return newOnes, nil
}
20 changes: 11 additions & 9 deletions db/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func shouldExcludeChange(result *v1.ScrapeResult, changeResult v1.ChangeResult)

func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.ConfigItem) ([]*models.ConfigChange, []*models.ConfigChange, error) {
var (
newOnes = []*models.ConfigChange{}
updates = []*models.ConfigChange{}
toInsert = []*models.ConfigChange{}
toUpdate = []*models.ConfigChange{}
)

changes.ProcessRules(result, result.BaseScraper.Transform.Change.Mapping...)
Expand Down Expand Up @@ -274,17 +274,19 @@ func extractChanges(ctx api.ScrapeContext, result *v1.ScrapeResult, ci *models.C
}

if changeResult.UpdateExisting {
updates = append(updates, change)
toUpdate = append(toUpdate, change)
} else {
if ok, err := ctx.TempCache().IsChangePersisted(change.ConfigID, change.ExternalChangeId); err != nil {
return nil, nil, fmt.Errorf("failed to check if change is persisted: %w", err)
} else if !ok {
newOnes = append(newOnes, change)
}
toInsert = append(toInsert, change)
}
}

return newOnes, updates, nil
// Remove the changes that have already been inserted.
newOnes, err := filterOutPersistedChanges(ctx, toInsert)
if err != nil {
return nil, nil, err
}

return newOnes, toUpdate, nil
}

func upsertAnalysis(ctx api.ScrapeContext, result *v1.ScrapeResult) error {
Expand Down

0 comments on commit defccec

Please sign in to comment.