Skip to content

Commit

Permalink
fix: tagrecorder fails to synchronize ch_app_table
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhengYa-0110 committed Mar 4, 2025
1 parent a778e6a commit a2cade0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
8 changes: 6 additions & 2 deletions server/controller/tagrecorder/ch_app_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ func (l *ChAPPLabel) generateNewData(db *mysql.DB) (map[PrometheusAPPLabelKey]my
for _, prometheusLabel := range prometheusLabels {
labelName := prometheusLabel.Name
if slices.Contains(appLabelSlice, labelName) {
labelNameID := labelNameIDMap[labelName]
labelNameID, nameOK := labelNameIDMap[labelName]
labelValue := prometheusLabel.Value
labelValueID := valueNameIDMap[labelValue]
labelValueID, valueOK := valueNameIDMap[labelValue]
if !nameOK || !valueOK {
log.Warningf("label name or value not found in db, labelName: %s, labelValue: %s", labelName, labelValue)
continue
}
keyToItem[PrometheusAPPLabelKey{LabelNameID: labelNameID, LabelValueID: labelValueID}] = mysqlmodel.ChAPPLabel{
LabelNameID: labelNameID,
LabelValue: labelValue,
Expand Down
31 changes: 19 additions & 12 deletions server/controller/tagrecorder/db_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ import (
)

type operator[MT MySQLChModel, KT ChModelKey] interface {
batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *mysql.DB), db *mysql.DB)
add(keys []KT, dbItems []MT, db *mysql.DB)
update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *mysql.DB)
delete(keys []KT, dbItems []MT, db *mysql.DB)
batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *mysql.DB) error, db *mysql.DB) error
add(keys []KT, dbItems []MT, db *mysql.DB) error
update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *mysql.DB) error
delete(keys []KT, dbItems []MT, db *mysql.DB) error
setConfig(config.ControllerConfig)
}

Expand All @@ -46,7 +46,7 @@ func (b *operatorComponent[MT, KT]) setConfig(cfg config.ControllerConfig) {
b.cfg = cfg
}

func (b *operatorComponent[MT, KT]) batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *mysql.DB), db *mysql.DB) {
func (b *operatorComponent[MT, KT]) batchPage(keys []KT, items []MT, operateFunc func([]KT, []MT, *mysql.DB) error, db *mysql.DB) error {
count := len(items)
offset := b.cfg.TagRecorderCfg.MySQLBatchSize
var pages int
Expand All @@ -61,35 +61,42 @@ func (b *operatorComponent[MT, KT]) batchPage(keys []KT, items []MT, operateFunc
if end > count {
end = count
}
operateFunc(keys[start:end], items[start:end], db)
err := operateFunc(keys[start:end], items[start:end], db)
if err != nil {
return err
}
}
return nil
}

func (b *operatorComponent[MT, KT]) add(keys []KT, dbItems []MT, db *mysql.DB) {
func (b *operatorComponent[MT, KT]) add(keys []KT, dbItems []MT, db *mysql.DB) error {
err := db.Clauses(clause.OnConflict{
UpdateAll: true,
}).Create(&dbItems).Error
if err != nil {
log.Errorf("add %s (keys: %+v values: %+v) failed: %s", b.resourceTypeName, keys, dbItems, err.Error(), db.LogPrefixORGID) // TODO is key needed?
return
return err
}
log.Infof("add %s (keys: %+v values: %+v) success", b.resourceTypeName, keys, dbItems, db.LogPrefixORGID)
return nil
}

func (b *operatorComponent[MT, KT]) update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *mysql.DB) {
func (b *operatorComponent[MT, KT]) update(oldDBItem MT, updateInfo map[string]interface{}, key KT, db *mysql.DB) error {
err := db.Model(&oldDBItem).Updates(updateInfo).Error
if err != nil {
log.Errorf("update %s (key: %+v value: %+v) failed: %s", b.resourceTypeName, key, oldDBItem, err.Error(), db.LogPrefixORGID)
return
return err
}
log.Infof("update %s (key: %+v value: %+v, update info: %v) success", b.resourceTypeName, key, oldDBItem, updateInfo, db.LogPrefixORGID)
return nil
}

func (b *operatorComponent[MT, KT]) delete(keys []KT, dbItems []MT, db *mysql.DB) {
func (b *operatorComponent[MT, KT]) delete(keys []KT, dbItems []MT, db *mysql.DB) error {
err := db.Delete(&dbItems).Error
if err != nil {
log.Errorf("delete %s (keys: %+v values: %+v) failed: %s", b.resourceTypeName, keys, dbItems, err.Error(), db.LogPrefixORGID)
return
return err
}
log.Infof("delete %s (keys: %+v values: %+v) success", b.resourceTypeName, keys, dbItems, db.LogPrefixORGID)
return nil
}
15 changes: 12 additions & 3 deletions server/controller/tagrecorder/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,19 @@ func (b *UpdaterComponent[MT, KT]) Refresh() bool {
} else {
updateInfo, ok := b.updaterDG.generateUpdateInfo(oldDBItem, newDBItem)
if ok {
b.dbOperator.update(oldDBItem, updateInfo, key, db)
err := b.dbOperator.update(oldDBItem, updateInfo, key, db)
if err != nil {
log.Errorf("failed to update %s: %s", b.resourceTypeName, err, db.LogPrefixORGID)
}
isUpdate = true
}
}
}
if len(itemsToAdd) > 0 {
b.dbOperator.batchPage(keysToAdd, itemsToAdd, b.dbOperator.add, db) // 1是个占位符
err := b.dbOperator.batchPage(keysToAdd, itemsToAdd, b.dbOperator.add, db) // 1是个占位符
if err != nil {
log.Errorf("failed to add %s: %s", b.resourceTypeName, err, db.LogPrefixORGID)
}
}

for key, oldDBItem := range oldKeyToDBItem {
Expand All @@ -205,7 +211,10 @@ func (b *UpdaterComponent[MT, KT]) Refresh() bool {
}
}
if len(itemsToDelete) > 0 {
b.dbOperator.batchPage(keysToDelete, itemsToDelete, b.dbOperator.delete, db) // 1是个占位符
err := b.dbOperator.batchPage(keysToDelete, itemsToDelete, b.dbOperator.delete, db) // 1是个占位符
if err != nil {
log.Errorf("failed to delete %s: %s", b.resourceTypeName, err, db.LogPrefixORGID)
}
}

if len(itemsToDelete) > 0 && len(itemsToAdd) == 0 && !isUpdate {
Expand Down

0 comments on commit a2cade0

Please sign in to comment.