Skip to content

Commit

Permalink
Merge pull request #2763 from actiontech/fix_audit_plan_delete_panic
Browse files Browse the repository at this point in the history
fix: fix the audit plan being deleted, causing sql audit panic
  • Loading branch information
winfredLIN authored Nov 15, 2024
2 parents 79d5650 + 2389562 commit d4ec1e4
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 9 deletions.
35 changes: 27 additions & 8 deletions sqle/model/instance_audit_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,7 @@ func (s *Storage) DeleteInstanceAuditPlan(instanceAuditPlanId string) error {
return s.Tx(func(txDB *gorm.DB) error {
// 删除队列表中数据
err := txDB.Exec(`DELETE FROM sql_manage_queues USING sql_manage_queues
JOIN audit_plans_v2 ap ON ap.id=sql_manage_queues.source_id
JOIN instance_audit_plans iap ON iap.id = ap.instance_audit_plan_id
JOIN instance_audit_plans iap ON iap.id = sql_manage_queues.source_id
WHERE iap.ID = ?`, instanceAuditPlanId).Error
if err != nil {
return err
Expand All @@ -398,7 +397,7 @@ func (s *Storage) DeleteAuditPlan(auditPlanID int) error {
return s.Tx(func(txDB *gorm.DB) error {
// 删除队列表中数据
err := txDB.Exec(`DELETE FROM sql_manage_queues USING sql_manage_queues
JOIN audit_plans_v2 ap ON ap.id=sql_manage_queues.source_id
JOIN audit_plans_v2 ap ON ap.instance_audit_plan_id=sql_manage_queues.source_id
WHERE ap.id = ?`, auditPlanID).Error
if err != nil {
return err
Expand All @@ -419,13 +418,15 @@ func (s *Storage) DeleteAuditPlan(auditPlanID int) error {
})
}

var ErrAuditPlanNotFound = errors.New(errors.DataNotExist, fmt.Errorf("cant find audit plan"))

func (s *Storage) GetAuditPlanDetailByInstAuditPlanIdAndType(instAuditPlanId string, auditPlanType string) (*AuditPlanDetail, error) {
ap, exist, err := s.GetAuditPlanDetailByType(instAuditPlanId, auditPlanType)
if err != nil {
return nil, err
}
if !exist {
return nil, fmt.Errorf("cant find audit plan by id %s", instAuditPlanId)
return nil, ErrAuditPlanNotFound
}
return ap, errors.New(errors.ConnectStorageError, err)
}
Expand All @@ -436,10 +437,13 @@ func (s *Storage) GetAuditPlanDetailByType(InstanceAuditPlanId, auditPlanType st
Where("instance_audit_plans.id = ? AND audit_plans_v2.type = ?", InstanceAuditPlanId, auditPlanType).
Select("audit_plans_v2.*,instance_audit_plans.project_id,instance_audit_plans.db_type,instance_audit_plans.token,instance_audit_plans.instance_id,instance_audit_plans.create_user_id").
Scan(&auditPlanDetail).Error
if err == gorm.ErrRecordNotFound {
return auditPlanDetail, false, nil
if err != nil {
return auditPlanDetail, false, errors.New(errors.ConnectStorageError, err)
}
return auditPlanDetail, true, errors.New(errors.ConnectStorageError, err)
if auditPlanDetail == nil {
return nil, false, nil
}
return auditPlanDetail, true, nil
}

func (s *Storage) GetInstanceAuditPlanByInstanceID(instanceID int64) (*InstanceAuditPlan, bool, error) {
Expand Down Expand Up @@ -474,14 +478,29 @@ func (s *Storage) PushSQLToManagerSQLQueue(sqls []*SQLManageQueue) error {

func (s *Storage) PullSQLFromManagerSQLQueue() ([]*SQLManageQueue, error) {
sqls := []*SQLManageQueue{}
err := s.db.Find(&sqls).Limit(100).Error
err := s.db.Limit(1000).Find(&sqls).Error
return sqls, err
}

func (s *Storage) RemoveSQLFromQueue(txDB *gorm.DB, sql *SQLManageQueue) error {
return txDB.Unscoped().Delete(sql).Error
}

func (s *Storage) DeleteSQLManageRecordBySourceId(sourceId string) error {
return s.Tx(func(txDB *gorm.DB) error {
err := txDB.Exec(`UPDATE sql_manage_record smr
LEFT JOIN sql_manage_record_processes smrp ON smrp.sql_manage_record_id = smr.id
SET smr.deleted_at = now(),
smrp.deleted_at = now()
WHERE smr.source_id = ?`, sourceId).Error

if err != nil {
return err
}
return nil
})
}

func (s *Storage) SaveManagerSQL(txDB *gorm.DB, sql *SQLManageRecord) error {
const query = "INSERT INTO `sql_manage_records` (`sql_id`,`source`,`source_id`,`project_id`,`instance_id`,`schema_name`,`sql_fingerprint`, `sql_text`, `info`) " +
"VALUES (?,?,?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE `source` = VALUES(source),`source_id` = VALUES(source_id),`project_id` = VALUES(project_id), `instance_id` = VALUES(instance_id), " +
Expand Down
18 changes: 17 additions & 1 deletion sqle/server/auditplan/job_task_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package auditplan

import (
"database/sql"
"errors"
"fmt"
"strings"
"time"

driverV2 "github.com/actiontech/sqle/sqle/driver/v2"
Expand Down Expand Up @@ -124,7 +126,7 @@ func handlerSQLAudit(entry *logrus.Entry, sqlList []*model.SQLManageRecord) {
}

func BatchAuditSQLs(sqlList []*model.SQLManageRecord, isSkipAuditedSql bool) ([]*model.SQLManageRecord, error) {

s := model.GetStorage()
// SQL聚合
sqlMap := make(map[string][]*model.SQLManageRecord)
for _, sql := range sqlList {
Expand Down Expand Up @@ -153,6 +155,20 @@ func BatchAuditSQLs(sqlList []*model.SQLManageRecord, isSkipAuditedSql bool) ([]
return nil, err
}
resp, err := meta.Handler.Audit(sqls)
// 当管控队列表中sql出栈审核时扫描任务被删除,则清空已经save到管控表的sql。
if err != nil && errors.Is(err, model.ErrAuditPlanNotFound) {
log.NewEntry().Warnf("audit sqls in task fail %v,cant find audit plan by id %s", err, sqls[0].SourceId)
err := s.DeleteSQLManageRecordBySourceId(sqls[0].SourceId)
if err != nil {
log.NewEntry().Errorf("delete sql manage record fail %v", err)
}
for k := range sqlMap {
if strings.HasPrefix(k, sqls[0].SourceId+":") {
delete(sqlMap, k)
}
}
continue
}
if err != nil {
log.NewEntry().Errorf("audit sqls in task fail %v,ignore audit result", err)
auditSQLs = append(auditSQLs, sqls...)
Expand Down

0 comments on commit d4ec1e4

Please sign in to comment.