Skip to content

Commit

Permalink
POC: single transaction for inserting to system_package2 per partition
Browse files Browse the repository at this point in the history
  • Loading branch information
psegedy committed Nov 27, 2023
1 parent 46dafd4 commit 2b7cd4e
Showing 1 changed file with 49 additions and 41 deletions.
90 changes: 49 additions & 41 deletions tasks/migration/migrate_system_package_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package migration

import (
"app/base/core"
"app/base/database"
"app/base/models"
"app/base/utils"
"app/evaluator"
Expand Down Expand Up @@ -49,7 +50,6 @@ type Package struct {
}

func MigrateSystemPackageData() {
var wg sync.WaitGroup
var partitions []string

err := tasks.WithReadReplicaTx(func(db *gorm.DB) error {
Expand All @@ -63,49 +63,57 @@ func MigrateSystemPackageData() {
}

for i, part := range partitions {
utils.LogInfo("#", i, "partition", part, "Migrating partition")
accSys := getAccSys(part, i)

// process at most 4 systems at once
guard := make(chan struct{}, 4)

for _, as := range accSys {
guard <- struct{}{}
wg.Add(1)
go func(as AccSys, i int, part string) {
defer func() {
<-guard
wg.Done()
}()
updates := getUpdates(as, part, i)
for _, u := range updates {
updateData := getUpdateData(u, as, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 && installableID != 0 {
// insert ids to system_package2
err = tasks.WithTx(func(db *gorm.DB) error {
return db.Table("system_package2").
Where("installable_id IS NULL AND applicable_id IS NULL").
Save(models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
InstallableID: &installableID,
ApplicableID: &applicableID,
}).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to update system_package2")
}
processPartition(part, i)
}
}

func processPartition(part string, i int) {
utils.LogInfo("#", i, "partition", part, "Migrating partition")

var wg sync.WaitGroup
tx := database.Db.Begin()
defer tx.Commit()

accSys := getAccSys(part, i)
// process at most 4 systems at once
guard := make(chan struct{}, 4)

for _, as := range accSys {
guard <- struct{}{}
wg.Add(1)
go func(as AccSys, i int, part string) {
defer func() {
<-guard
wg.Done()
}()
updates := getUpdates(as, part, i)
for _, u := range updates {
updateData := getUpdateData(u, as, part, i)
latestApplicable, latestInstallable := getEvraApplicability(updateData)
applicableID, installableID := getPackageIDs(u, i, latestApplicable, latestInstallable)
if applicableID != 0 && installableID != 0 {
// insert ids to system_package2
err := tasks.WithTx(func(db *gorm.DB) error {
return db.Table("system_package2").
Where("installable_id IS NULL AND applicable_id IS NULL").
Save(models.SystemPackage{
RhAccountID: as.RhAccountID,
SystemID: as.SystemID,
PackageID: u.PackageID,
NameID: u.NameID,
InstallableID: &installableID,
ApplicableID: &applicableID,
}).Error
})
if err != nil {
utils.LogWarn("#", i, "Failed to update system_package2")
}
}
}(as, i, part)
}
wg.Wait()
utils.LogInfo("#", i, "partition", part, "Partition migrated")
}
}(as, i, part)
}
wg.Wait()
utils.LogInfo("#", i, "partition", part, "Partition migrated")
}

func getAccSys(part string, i int) []AccSys {
Expand Down

0 comments on commit 2b7cd4e

Please sign in to comment.