diff --git a/sqle/model/instance_audit_plan.go b/sqle/model/instance_audit_plan.go index cdc43faff..a1c5ccc73 100644 --- a/sqle/model/instance_audit_plan.go +++ b/sqle/model/instance_audit_plan.go @@ -370,8 +370,7 @@ func (s *Storage) DeleteInstanceAuditPlan(instanceAuditPlanId string) error { return s.Tx(func(txDB *gorm.DB) error { // 删除队列表中数据 err := txDB.Exec(`DELETE FROM sql_manage_queues USING sql_manage_queues - JOIN audit_plans_v2 ap ON ap.id=sql_manage_queues.source_id - JOIN instance_audit_plans iap ON iap.id = ap.instance_audit_plan_id + JOIN instance_audit_plans iap ON iap.id = sql_manage_queues.source_id WHERE iap.ID = ?`, instanceAuditPlanId).Error if err != nil { return err @@ -398,7 +397,7 @@ func (s *Storage) DeleteAuditPlan(auditPlanID int) error { return s.Tx(func(txDB *gorm.DB) error { // 删除队列表中数据 err := txDB.Exec(`DELETE FROM sql_manage_queues USING sql_manage_queues - JOIN audit_plans_v2 ap ON ap.id=sql_manage_queues.source_id + JOIN audit_plans_v2 ap ON ap.instance_audit_plan_id=sql_manage_queues.source_id WHERE ap.id = ?`, auditPlanID).Error if err != nil { return err @@ -419,13 +418,15 @@ func (s *Storage) DeleteAuditPlan(auditPlanID int) error { }) } +var ErrAuditPlanNotFound = errors.New(errors.DataNotExist, fmt.Errorf("cant find audit plan")) + func (s *Storage) GetAuditPlanDetailByInstAuditPlanIdAndType(instAuditPlanId string, auditPlanType string) (*AuditPlanDetail, error) { ap, exist, err := s.GetAuditPlanDetailByType(instAuditPlanId, auditPlanType) if err != nil { return nil, err } if !exist { - return nil, fmt.Errorf("cant find audit plan by id %s", instAuditPlanId) + return nil, ErrAuditPlanNotFound } return ap, errors.New(errors.ConnectStorageError, err) } @@ -436,10 +437,13 @@ func (s *Storage) GetAuditPlanDetailByType(InstanceAuditPlanId, auditPlanType st Where("instance_audit_plans.id = ? AND audit_plans_v2.type = ?", InstanceAuditPlanId, auditPlanType). Select("audit_plans_v2.*,instance_audit_plans.project_id,instance_audit_plans.db_type,instance_audit_plans.token,instance_audit_plans.instance_id,instance_audit_plans.create_user_id"). Scan(&auditPlanDetail).Error - if err == gorm.ErrRecordNotFound { - return auditPlanDetail, false, nil + if err != nil { + return auditPlanDetail, false, errors.New(errors.ConnectStorageError, err) } - return auditPlanDetail, true, errors.New(errors.ConnectStorageError, err) + if auditPlanDetail == nil { + return nil, false, nil + } + return auditPlanDetail, true, nil } func (s *Storage) GetInstanceAuditPlanByInstanceID(instanceID int64) (*InstanceAuditPlan, bool, error) { @@ -474,7 +478,7 @@ func (s *Storage) PushSQLToManagerSQLQueue(sqls []*SQLManageQueue) error { func (s *Storage) PullSQLFromManagerSQLQueue() ([]*SQLManageQueue, error) { sqls := []*SQLManageQueue{} - err := s.db.Find(&sqls).Limit(100).Error + err := s.db.Limit(1000).Find(&sqls).Error return sqls, err } @@ -482,6 +486,21 @@ func (s *Storage) RemoveSQLFromQueue(txDB *gorm.DB, sql *SQLManageQueue) error { return txDB.Unscoped().Delete(sql).Error } +func (s *Storage) DeleteSQLManageRecordBySourceId(sourceId string) error { + return s.Tx(func(txDB *gorm.DB) error { + err := txDB.Exec(`UPDATE sql_manage_record smr + LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id + SET smr.deleted_at = now(), + smrp.deleted_at = now() + WHERE smr.source_id = ?`, sourceId).Error + + if err != nil { + return err + } + return nil + }) +} + func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error { const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " + "VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " + diff --git a/sqle/server/auditplan/job_task_handler.go b/sqle/server/auditplan/job_task_handler.go index e4fca607e..745d0e5af 100644 --- a/sqle/server/auditplan/job_task_handler.go +++ b/sqle/server/auditplan/job_task_handler.go @@ -2,7 +2,9 @@ package auditplan import ( "database/sql" + "errors" "fmt" + "strings" "time" driverV2 "github.com/actiontech/sqle/sqle/driver/v2" @@ -124,7 +126,7 @@ func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) { } func BatchAuditSQLs(sqlList []*model.SQLManageRecord, isSkipAuditedSql bool) ([]*model.SQLManageRecord, error) { - + s := model.GetStorage() // SQL聚合 sqlMap := make(map[string][]*model.SQLManageRecord) for _, sql := range sqlList { @@ -153,6 +155,20 @@ func BatchAuditSQLs(sqlList []*model.SQLManageRecord, isSkipAuditedSql bool) ([] return nil, err } resp, err := meta.Handler.Audit(sqls) + // 当管控队列表中sql出栈审核时扫描任务被删除,则清空已经save到管控表的sql。 + if err != nil && errors.Is(err, model.ErrAuditPlanNotFound) { + log.NewEntry().Warnf("audit sqls in task fail %v,cant find audit plan by id %s", err, sqls[0].SourceId) + err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId) + if err != nil { + log.NewEntry().Errorf("delete sql manage record fail %v", err) + } + for k := range sqlMap { + if strings.HasPrefix(k, sqls[0].SourceId+":") { + delete(sqlMap, k) + } + } + continue + } if err != nil { log.NewEntry().Errorf("audit sqls in task fail %v,ignore audit result", err) auditSQLs = append(auditSQLs, sqls...)