diff --git a/spelling_dict.txt b/spelling_dict.txt index 44c59a8736..67f8dbd455 100644 --- a/spelling_dict.txt +++ b/spelling_dict.txt @@ -426,4 +426,6 @@ Tjwvz046g slowlogs tjwvz ejwvz -wiru \ No newline at end of file +wiru +larkapproval +ccer \ No newline at end of file diff --git a/sqle/api/controller/v1/configuration.go b/sqle/api/controller/v1/configuration.go index ede9d7835e..7b4f177988 100644 --- a/sqle/api/controller/v1/configuration.go +++ b/sqle/api/controller/v1/configuration.go @@ -17,6 +17,7 @@ import ( "github.com/actiontech/sqle/sqle/pkg/im/feishu" "github.com/labstack/echo/v4" + larkContact "github.com/larksuite/oapi-sdk-go/v3/service/contact/v3" ) type UpdateSMTPConfigurationReqV1 struct { @@ -564,7 +565,7 @@ func TestFeishuConfigV1(c echo.Context) error { } client := feishu.NewFeishuClient(feishuCfg.AppKey, feishuCfg.AppSecret) - feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(email, phone) + feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(email, phone, larkContact.UserIdTypeGetUserUserId) if err != nil { return c.JSON(http.StatusOK, &TestFeishuConfigResV1{ BaseRes: controller.NewBaseReq(nil), diff --git a/sqle/api/controller/v1/workflow.go b/sqle/api/controller/v1/workflow.go index e4688ac511..6414e689b0 100644 --- a/sqle/api/controller/v1/workflow.go +++ b/sqle/api/controller/v1/workflow.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "strconv" - "strings" "time" "github.com/actiontech/sqle/sqle/api/controller" @@ -317,24 +316,6 @@ type WorkflowStepResV1 struct { Reason string `json:"reason,omitempty"` } -func CheckUserCanOperateStep(user *model.User, workflow *model.Workflow, stepId int) error { - if workflow.Record.Status != model.WorkflowStatusWaitForAudit && workflow.Record.Status != model.WorkflowStatusWaitForExecution { - return fmt.Errorf("workflow status is %s, not allow operate it", workflow.Record.Status) - } - currentStep := workflow.CurrentStep() - if currentStep == nil { - return fmt.Errorf("workflow current step not found") - } - if uint(stepId) != workflow.CurrentStep().ID { - return fmt.Errorf("workflow current step is not %d", stepId) - } - - if !workflow.IsOperationUser(user) { - return fmt.Errorf("you are not allow to operate the workflow") - } - return nil -} - // @Deprecated // @Summary 审批通过 // @Description approve workflow @@ -493,34 +474,6 @@ func IsTaskCanExecute(s *model.Storage, taskId string) (bool, error) { return true, nil } -func GetNeedExecTaskIds(s *model.Storage, workflow *model.Workflow, user *model.User) (taskIds map[uint] /*task id*/ uint /*user id*/, err error) { - instances, err := s.GetInstancesByWorkflowID(workflow.ID) - if err != nil { - return nil, err - } - // 有不在运维时间内的instances报错 - var cannotExecuteInstanceNames []string - for _, inst := range instances { - if len(inst.MaintenancePeriod) != 0 && !inst.MaintenancePeriod.IsWithinScope(time.Now()) { - cannotExecuteInstanceNames = append(cannotExecuteInstanceNames, inst.Name) - } - } - if len(cannotExecuteInstanceNames) > 0 { - return nil, errors.New(errors.TaskActionInvalid, - fmt.Errorf("please go online during instance operation and maintenance time. these instances are not in maintenance time[%v]", strings.Join(cannotExecuteInstanceNames, ","))) - } - - // 定时的instances和已上线的跳过 - needExecTaskIds := make(map[uint]uint) - for _, instRecord := range workflow.Record.InstanceRecords { - if instRecord.ScheduledAt != nil || instRecord.IsSQLExecuted { - continue - } - needExecTaskIds[instRecord.TaskId] = user.ID - } - return needExecTaskIds, nil -} - func PrepareForTaskExecution(c echo.Context, projectName string, workflow *model.Workflow, user *model.User, TaskId int) error { if workflow.Record.Status != model.WorkflowStatusWaitForExecution { return errors.New(errors.DataInvalid, e.New("workflow need to be approved first")) @@ -546,29 +499,6 @@ func PrepareForTaskExecution(c echo.Context, projectName string, workflow *model return e.New("you are not allow to execute the task") } -func PrepareForWorkflowExecution(c echo.Context, projectName string, workflow *model.Workflow, user *model.User) error { - err := CheckCurrentUserCanOperateWorkflow(c, &model.Project{Name: projectName}, workflow, []uint{}) - if err != nil { - return err - } - - currentStep := workflow.CurrentStep() - if currentStep == nil { - return errors.New(errors.DataInvalid, fmt.Errorf("workflow current step not found")) - } - - if workflow.Record.Status != model.WorkflowStatusWaitForExecution { - return errors.New(errors.DataInvalid, - fmt.Errorf("workflow need to be approved first")) - } - - err = CheckUserCanOperateStep(user, workflow, int(currentStep.ID)) - if err != nil { - return errors.New(errors.DataInvalid, err) - } - return nil -} - type GetWorkflowTasksResV1 struct { controller.BaseRes Data []*GetWorkflowTasksItemV1 `json:"data"` @@ -686,7 +616,6 @@ func CheckWorkflowCanCommit(template *model.WorkflowTemplate, tasks []*model.Tas type GetWorkflowsReqV1 struct { FilterSubject string `json:"filter_subject" query:"filter_subject"` FilterWorkflowID string `json:"filter_workflow_id" query:"filter_workflow_id"` - FuzzySearchWorkflowDesc string `json:"fuzzy_search_workflow_desc" query:"fuzzy_search_workflow_desc"` FilterCreateTimeFrom string `json:"filter_create_time_from" query:"filter_create_time_from"` FilterCreateTimeTo string `json:"filter_create_time_to" query:"filter_create_time_to"` FilterCreateUserName string `json:"filter_create_user_name" query:"filter_create_user_name"` @@ -805,7 +734,6 @@ func GetGlobalWorkflowsV1(c echo.Context) error { // @Security ApiKeyAuth // @Param filter_subject query string false "filter subject" // @Param filter_workflow_id query string false "filter by workflow_id" -// @Param fuzzy_search_workflow_desc query string false "fuzzy search by workflow description" // @Param filter_create_time_from query string false "filter create time from" // @Param filter_create_time_to query string false "filter create time to" // @Param filter_task_execute_start_time_from query string false "filter_task_execute_start_time_from" @@ -853,7 +781,6 @@ func GetWorkflowsV1(c echo.Context) error { data := map[string]interface{}{ "filter_workflow_id": req.FilterWorkflowID, - "fuzzy_search_workflow_desc": req.FuzzySearchWorkflowDesc, "filter_subject": req.FilterSubject, "filter_create_time_from": req.FilterCreateTimeFrom, "filter_create_time_to": req.FilterCreateTimeTo, @@ -1001,7 +928,6 @@ func GetWorkflowV1(c echo.Context) error { type ExportWorkflowReqV1 struct { FilterSubject string `json:"filter_subject" query:"filter_subject"` - FuzzySearchWorkflowDesc string `json:"fuzzy_search_workflow_desc" query:"fuzzy_search_workflow_desc"` FilterCreateTimeFrom string `json:"filter_create_time_from" query:"filter_create_time_from"` FilterCreateTimeTo string `json:"filter_create_time_to" query:"filter_create_time_to"` FilterCreateUserName string `json:"filter_create_user_name" query:"filter_create_user_name"` @@ -1019,7 +945,6 @@ type ExportWorkflowReqV1 struct { // @Tags workflow // @Security ApiKeyAuth // @Param filter_subject query string false "filter subject" -// @Param fuzzy_search_workflow_desc query string false "fuzzy search by workflow description" // @Param filter_create_time_from query string false "filter create time from" // @Param filter_create_time_to query string false "filter create time to" // @Param filter_task_execute_start_time_from query string false "filter_task_execute_start_time_from" diff --git a/sqle/api/controller/v2/workflow.go b/sqle/api/controller/v2/workflow.go index 226c1d2e8a..42890833d0 100644 --- a/sqle/api/controller/v2/workflow.go +++ b/sqle/api/controller/v2/workflow.go @@ -99,7 +99,7 @@ func ApproveWorkflowV2(c echo.Context) error { nextStep := workflow.NextStep() - err = v1.CheckUserCanOperateStep(user, workflow, stepId) + err = server.CheckUserCanOperateStep(user, workflow, stepId) if err != nil { return controller.JSONBaseErrorReq(c, errors.New(errors.DataInvalid, err)) } @@ -108,9 +108,9 @@ func ApproveWorkflowV2(c echo.Context) error { return controller.JSONBaseErrorReq(c, err) } - go im.UpdateApprove(workflow.ID, user.Phone, model.ApproveStatusAgree, "") + go im.UpdateApprove(workflow.ID, user, model.ApproveStatusAgree, "") - if nextStep.Template.Typ != model.WorkflowStepTypeSQLExecute { + if nextStep != nil { go im.CreateApprove(strconv.Itoa(int(workflow.ID))) } @@ -184,7 +184,7 @@ func RejectWorkflowV2(c echo.Context) error { return controller.JSONBaseErrorReq(c, v1.ErrWorkflowNoAccess) } - err = v1.CheckUserCanOperateStep(user, workflow, stepId) + err = server.CheckUserCanOperateStep(user, workflow, stepId) if err != nil { return controller.JSONBaseErrorReq(c, errors.New(errors.DataInvalid, err)) } @@ -202,7 +202,7 @@ func RejectWorkflowV2(c echo.Context) error { return controller.JSONBaseErrorReq(c, err) } - go im.UpdateApprove(workflow.ID, user.Phone, model.ApproveStatusRefuse, req.Reason) + go im.UpdateApprove(workflow.ID, user, model.ApproveStatusRefuse, req.Reason) return c.JSON(http.StatusOK, controller.NewBaseReq(nil)) } @@ -262,7 +262,7 @@ func CancelWorkflowV2(c echo.Context) error { fmt.Errorf("you are not allow to operate the workflow"))) } - go im.CancelApprove(workflow.ID) + go im.BatchCancelApprove([]uint{workflow.ID}, user) workflow.Record.Status = model.WorkflowStatusCancel workflow.Record.CurrentWorkflowStepId = 0 @@ -313,8 +313,12 @@ func BatchCancelWorkflowsV2(c echo.Context) error { } projectName := c.Param("project_name") - userName := controller.GetUserName(c) - if err := v1.CheckIsProjectManager(userName, projectName); err != nil { + user, err := controller.GetCurrentUser(c) + if err != nil { + return controller.JSONBaseErrorReq(c, err) + } + + if err := v1.CheckIsProjectManager(user.Name, projectName); err != nil { return controller.JSONBaseErrorReq(c, err) } @@ -331,7 +335,7 @@ func BatchCancelWorkflowsV2(c echo.Context) error { workflow.Record.CurrentWorkflowStepId = 0 } - go im.BatchCancelApprove(workflowIds) + go im.BatchCancelApprove(workflowIds, user) if err := model.GetStorage().BatchUpdateWorkflowStatus(workflows); err != nil { return controller.JSONBaseErrorReq(c, err) @@ -985,7 +989,7 @@ func UpdateWorkflowScheduleV2(c echo.Context) error { fmt.Errorf("workflow need to be approved first"))) } - err = v1.CheckUserCanOperateStep(user, workflow, int(currentStep.ID)) + err = server.CheckUserCanOperateStep(user, workflow, int(currentStep.ID)) if err != nil { return controller.JSONBaseErrorReq(c, errors.New(errors.DataInvalid, err)) } @@ -1052,33 +1056,20 @@ func ExecuteTasksOnWorkflowV2(c echo.Context) error { return controller.JSONBaseErrorReq(c, v1.ErrWorkflowNoAccess) } - workflowId = fmt.Sprintf("%v", workflow.ID) - - workflow, exist, err = s.GetWorkflowDetailById(workflowId) - if err != nil { - return controller.JSONBaseErrorReq(c, err) - } - if !exist { - return controller.JSONBaseErrorReq(c, v1.ErrWorkflowNoAccess) - } user, err := controller.GetCurrentUser(c) if err != nil { return controller.JSONBaseErrorReq(c, err) } - if err := v1.PrepareForWorkflowExecution(c, projectName, workflow, user); err != nil { - return err - } - needExecTaskIds, err := v1.GetNeedExecTaskIds(s, workflow, user) - if err != nil { - return err - } + workflowId = fmt.Sprintf("%v", workflow.ID) - err = server.ExecuteWorkflow(workflow, needExecTaskIds) + err = server.ExecuteTasksProcess(workflowId, projectName, user) if err != nil { return controller.JSONBaseErrorReq(c, err) } + im.UpdateApprove(workflow.ID, user, model.ApproveStatusAgree, "") + return c.JSON(http.StatusOK, controller.NewBaseReq(nil)) } diff --git a/sqle/model/configuration.go b/sqle/model/configuration.go index de6e10a238..2cbf603efd 100644 --- a/sqle/model/configuration.go +++ b/sqle/model/configuration.go @@ -2,6 +2,7 @@ package model import ( "database/sql" + e "errors" "fmt" "strconv" "strings" @@ -388,8 +389,9 @@ func (s *Storage) GetWorkflowExpiredHoursOrDefault() (int64, error) { } const ( - ImTypeDingTalk = "dingTalk" - ImTypeFeishu = "feishu" + ImTypeDingTalk = "dingTalk" + ImTypeFeishu = "feishu" + ImTypeFeishuAudit = "feishu_audit" ) type IM struct { @@ -508,7 +510,7 @@ func (s *Storage) GetDingTalkInstanceListByWorkflowIDs(workflowIds []uint) ([]Di } // batch updates ding_talk_instances'status into input status by workflow_ids, the status should be like ApproveStatusXXX in model package. -func (s *Storage) BatchUptateStatusOfDingTalkInstance(workflowIds []uint, status string) error { +func (s *Storage) BatchUpdateStatusOfDingTalkInstance(workflowIds []uint, status string) error { err := s.db.Model(&DingTalkInstance{}).Where("workflow_id IN (?)", workflowIds).Updates(map[string]interface{}{"status": status}).Error if err != nil { return err @@ -525,6 +527,56 @@ func (s *Storage) GetDingTalkInstByStatus(status string) ([]DingTalkInstance, er return dingTalkInstances, nil } +const ( + FeishuAuditStatusInitialized = "INITIALIZED" + FeishuAuditStatusApprove = "APPROVED" + FeishuAuditStatusRejected = "REJECTED" +) + +type FeishuInstance struct { + Model + ApproveInstanceCode string `json:"approve_instance" gorm:"column:approve_instance"` + WorkflowId uint `json:"workflow_id" gorm:"column:workflow_id"` + // 审批实例 taskID + TaskID string `json:"task_id" gorm:"column:task_id"` + Status string `json:"status" gorm:"default:\"INITIALIZED\""` +} + +func (s *Storage) GetFeishuInstanceListByWorkflowIDs(workflowIds []uint) ([]FeishuInstance, error) { + var feishuInstList []FeishuInstance + err := s.db.Model(&FeishuInstance{}).Where("workflow_id IN (?)", workflowIds).Find(&feishuInstList).Error + if err != nil { + return nil, err + } + return feishuInstList, nil +} + +func (s *Storage) BatchUpdateStatusOfFeishuInstance(workflowIds []uint, status string) error { + err := s.db.Model(&FeishuInstance{}).Where("workflow_id IN (?)", workflowIds).Updates(map[string]interface{}{"status": status}).Error + if err != nil { + return err + } + return nil +} + +func (s *Storage) GetFeishuInstanceByWorkflowID(workflowId uint) (*FeishuInstance, bool, error) { + fi := new(FeishuInstance) + err := s.db.Where("workflow_id = ?", workflowId).Last(&fi).Error + if e.Is(err, gorm.ErrRecordNotFound) { + return fi, false, nil + } + return fi, true, errors.New(errors.ConnectStorageError, err) +} + +func (s *Storage) GetFeishuInstByStatus(status string) ([]FeishuInstance, error) { + var feishuInst []FeishuInstance + err := s.db.Where("status = ?", status).Find(&feishuInst).Error + if err != nil { + return nil, err + } + return feishuInst, nil +} + type PersonaliseConfig struct { Model Title string `json:"title" gorm:"column:title"` diff --git a/sqle/model/utils.go b/sqle/model/utils.go index bd37bc5041..a5aac2e898 100644 --- a/sqle/model/utils.go +++ b/sqle/model/utils.go @@ -144,6 +144,7 @@ var autoMigrateList = []interface{}{ &ManagementPermission{}, &IM{}, &DingTalkInstance{}, + &FeishuInstance{}, &SyncInstanceTask{}, &OperationRecord{}, &PersonaliseConfig{}, diff --git a/sqle/notification/feishu.go b/sqle/notification/feishu.go index 4826be105e..3cf5594fee 100644 --- a/sqle/notification/feishu.go +++ b/sqle/notification/feishu.go @@ -7,6 +7,7 @@ import ( "github.com/actiontech/sqle/sqle/log" "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/pkg/im/feishu" + larkContact "github.com/larksuite/oapi-sdk-go/v3/service/contact/v3" larkIm "github.com/larksuite/oapi-sdk-go/v3/service/im/v1" ) @@ -55,7 +56,7 @@ func (n *FeishuNotifier) Notify(notification Notification, users []*model.User) } client := feishu.NewFeishuClient(cfg.AppKey, cfg.AppSecret) - feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(emails, mobiles) + feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(emails, mobiles, larkContact.UserIdTypeGetUserUserId) if err != nil { return fmt.Errorf("get user_ids from feishu failed: %v", err) } diff --git a/sqle/pkg/im/feishu/feishu.go b/sqle/pkg/im/feishu/feishu.go index d3ea72b969..dbcfa1af67 100644 --- a/sqle/pkg/im/feishu/feishu.go +++ b/sqle/pkg/im/feishu/feishu.go @@ -3,7 +3,10 @@ package feishu import ( "context" "fmt" + "time" + "github.com/actiontech/sqle/sqle/log" + "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/utils" lark "github.com/larksuite/oapi-sdk-go/v3" @@ -18,7 +21,7 @@ type FeishuClient struct { } func NewFeishuClient(appId, appSecret string) *FeishuClient { - return &FeishuClient{client: lark.NewClient(appId, appSecret)} + return &FeishuClient{client: lark.NewClient(appId, appSecret, lark.WithReqTimeout(30*time.Second))} } type UserContactInfo struct { @@ -30,7 +33,7 @@ const MaxCountOfIdThatUsedToFindUser = 50 // 查询限制每次最多50条emails和mobiles,https://open.feishu.cn/document/uAjLw4CM/ukTMukTMukTM/reference/contact-v3/user/batch_get_id // 每次最多查询50个邮箱和50个手机号,如果超出50个,只查询前50个 -func (f *FeishuClient) GetUsersByEmailOrMobileWithLimitation(emails, mobiles []string) (map[string]*UserContactInfo, error) { +func (f *FeishuClient) GetUsersByEmailOrMobileWithLimitation(emails, mobiles []string, userType string) (map[string]*UserContactInfo, error) { tempEmails, tempMobiles := emails, mobiles if len(emails) > MaxCountOfIdThatUsedToFindUser { tempEmails = emails[:MaxCountOfIdThatUsedToFindUser] @@ -40,7 +43,7 @@ func (f *FeishuClient) GetUsersByEmailOrMobileWithLimitation(emails, mobiles []s } req := larkContact.NewBatchGetIdUserReqBuilder(). - UserIdType(`user_id`). + UserIdType(userType). Body(larkContact.NewBatchGetIdUserReqBodyBuilder(). Emails(tempEmails). Mobiles(tempMobiles). @@ -112,3 +115,54 @@ func (f FeishuClient) SendMessage(receiveIdType, receiveId, msgType, content str return nil } + +func (f FeishuClient) GetFeishuUserIdList(users []*model.User, userType string) ([]string, error) { + var emails, mobiles []string + userCount := 0 + for _, u := range users { + if u.Email == "" && u.Phone == "" { + continue + } + if u.Email != "" { + emails = append(emails, u.Email) + } + if u.Phone != "" { + mobiles = append(mobiles, u.Phone) + } + userCount++ + if userCount == MaxCountOfIdThatUsedToFindUser { + log.NewEntry().Infof("user %v exceed max count %v", u.Name, MaxCountOfIdThatUsedToFindUser) + break + } + } + + feishuUserMap, err := f.GetUsersByEmailOrMobileWithLimitation(emails, mobiles, userType) + if err != nil { + return nil, err + } + + userIDs := make([]string, 0, len(feishuUserMap)) + for feishuUserID := range feishuUserMap { + userIDs = append(userIDs, feishuUserID) + } + + return userIDs, nil +} + +func (f FeishuClient) GetFeishuUserInfo(userID string, userType string) (*larkContact.User, error) { + req := larkContact.NewGetUserReqBuilder(). + UserId(userID). + UserIdType(userType). + Build() + + resp, err := f.client.Contact.User.Get(context.Background(), req) + if err != nil { + return nil, err + } + + if !resp.Success() { + return nil, fmt.Errorf("get user failed: respCode=%v, respMsg=%v", resp.Code, resp.Msg) + } + + return resp.Data.User, nil +} diff --git a/sqle/pkg/im/im.go b/sqle/pkg/im/im.go index ba48804aa0..b2c0c5ae49 100644 --- a/sqle/pkg/im/im.go +++ b/sqle/pkg/im/im.go @@ -1,13 +1,13 @@ package im import ( + "context" "fmt" "strings" "github.com/actiontech/sqle/sqle/log" "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/pkg/im/dingding" - "github.com/sirupsen/logrus" ) var ( @@ -48,6 +48,11 @@ func CreateApprovalTemplate(imType string) { log.NewEntry().Errorf("create approval template error: %v", err) return } + case model.ImTypeFeishuAudit: + if err := CreateFeishuAuditTemplate(context.TODO(), im); err != nil { + log.NewEntry().Errorf("create feishu audit template error: %v", err) + return + } } } @@ -64,13 +69,8 @@ func CreateApprove(id string) { return } - if workflow.CreateUser.Phone == "" { - newLog.Error("create user phone is empty") - return - } - - if len(workflow.Record.Steps) == 1 || workflow.CurrentStep() == workflow.Record.Steps[len(workflow.Record.Steps)-1] { - newLog.Infof("workflow %v only has one approve step or has been approved, no need to create approve instance", workflow.ID) + if workflow.CurrentStep() == nil { + newLog.Infof("workflow %v has no current step, no need to create approve instance", workflow.ID) return } @@ -83,10 +83,28 @@ func CreateApprove(id string) { } for _, im := range ims { + if !im.IsEnable { + continue + } + + systemVariables, err := s.GetAllSystemVariables() + if err != nil { + newLog.Errorf("get sqle url system variables error: %v", err) + continue + } + + sqleUrl := systemVariables[model.SystemVariableSqleUrl].Value + workflowUrl := fmt.Sprintf("%v/project/%s/order/%s", sqleUrl, workflow.Project.Name, workflow.WorkflowId) + if sqleUrl == "" { + newLog.Errorf("sqle url is empty") + workflowUrl = "" + } + switch im.Type { case model.ImTypeDingTalk: - if !im.IsEnable { - continue + if workflow.CreateUser.Phone == "" { + newLog.Error("create user phone is empty") + return } var tableRows []string @@ -110,7 +128,7 @@ func CreateApprove(id string) { continue } - var userIds []*string + userIds := make([]*string, 0, len(users)) for _, user := range users { if user.Phone == "" { newLog.Infof("user %v phone is empty, skip", user.ID) @@ -126,30 +144,22 @@ func CreateApprove(id string) { userIds = append(userIds, userId) } - systemVariables, err := s.GetAllSystemVariables() - if err != nil { - newLog.Errorf("get sqle url system variables error: %v", err) - continue - } - - sqleUrl := systemVariables[model.SystemVariableSqleUrl].Value - workflowUrl := fmt.Sprintf("%v/project/%s/order/%s", sqleUrl, workflow.Project.Name, workflow.WorkflowId) - if sqleUrl == "" { - newLog.Errorf("sqle url is empty") - workflowUrl = "" - } - if err := dingTalk.CreateApprovalInstance(workflow.Subject, workflow.ID, createUserId, userIds, auditResult, workflow.Project.Name, workflow.Desc, workflowUrl); err != nil { newLog.Errorf("create dingtalk approval instance error: %v", err) continue } + case model.ImTypeFeishuAudit: + if err := CreateFeishuAuditInst(context.TODO(), im, workflow, users, workflowUrl); err != nil { + newLog.Errorf("create feishu audit instance error: %v", err) + continue + } default: newLog.Errorf("im type %s not found", im.Type) } } } -func UpdateApprove(workflowId uint, phone, status, reason string) { +func UpdateApprove(workflowId uint, user *model.User, status, reason string) { newLog := log.NewEntry() s := model.GetStorage() @@ -160,18 +170,18 @@ func UpdateApprove(workflowId uint, phone, status, reason string) { } for _, im := range ims { + if !im.IsEnable { + continue + } + switch im.Type { case model.ImTypeDingTalk: - if !im.IsEnable { - continue - } - dingTalk := &dingding.DingTalk{ AppKey: im.AppKey, AppSecret: im.AppSecret, } - userID, err := dingTalk.GetUserIDByPhone(phone) + userID, err := dingTalk.GetUserIDByPhone(user.Phone) if err != nil { newLog.Errorf("get user id by phone error: %v", err) continue @@ -181,61 +191,18 @@ func UpdateApprove(workflowId uint, phone, status, reason string) { newLog.Errorf("update approval status error: %v", err) continue } + case model.ImTypeFeishuAudit: + if err := UpdateFeishuAuditStatus(context.Background(), im, workflowId, user, status, reason); err != nil { + newLog.Errorf("update feishu audit status error: %v", err) + continue + } } } } -func CancelApprove(workflowID uint) { - newLog := log.NewEntry() - s := model.GetStorage() - dingTalkInst, exist, err := s.GetDingTalkInstanceByWorkflowID(workflowID) - if err != nil { - newLog.Errorf("get dingtalk instance by workflow id error: %v", err) - return - } - if !exist { - newLog.Infof("dingtalk instance not exist, workflow id: %v", workflowID) - return - } - // 如果在钉钉上已经同意或者拒绝<=>dingtalk instance的status不为initialized - // 则只修改钉钉工单状态为取消,不调用取消钉钉工单的API - if dingTalkInst.Status != model.ApproveStatusInitialized { - newLog.Infof("the dingtalk instance cannot be canceled if its status is not initialized, workflow id: %v", workflowID) - } else { - go DingTalkCancelApprove(s, newLog, dingTalkInst.ApproveInstanceCode) - } - // 关闭工单需要修改工单下的钉钉工单的状态 - dingTalkInst.Status = model.ApproveStatusCancel - if err := s.Save(&dingTalkInst); err != nil { - newLog.Errorf("save ding talk instance error: %v", err) - } -} - -func BatchCancelApprove(workflowIds []uint) { +func BatchCancelApprove(workflowIds []uint, user *model.User) { newLog := log.NewEntry() s := model.GetStorage() - instances, err := s.GetDingTalkInstanceListByWorkflowIDs(workflowIds) - if err != nil { - newLog.Errorf("get dingtalk instance list by workflowid slice error: %v", err) - return - } - // batch update ding_talk_instances'status into canceled - err = s.BatchUptateStatusOfDingTalkInstance(workflowIds, model.ApproveStatusCancel) - if err != nil { - newLog.Errorf("batch update ding_talk_instances'status into canceled, error: %v", err) - } - for idx, instance := range instances { - // 如果在钉钉上已经同意或者拒绝<=>dingtalk instance的status不为initialized - // 则只修改钉钉工单状态为取消,不调用取消钉钉工单的API - if instances[idx].Status != model.ApproveStatusInitialized { - newLog.Infof("the dingtalk instance cannot be canceled if its status is not initialized, workflow id: %v", instance.WorkflowId) - continue - } - go DingTalkCancelApprove(s, newLog, instance.ApproveInstanceCode) - } -} - -func DingTalkCancelApprove(s *model.Storage, newLog *logrus.Entry, approveInstanceCode string) { ims, err := s.GetAllIMConfig() if err != nil { newLog.Errorf("get im config error: %v", err) @@ -243,19 +210,49 @@ func DingTalkCancelApprove(s *model.Storage, newLog *logrus.Entry, approveInstan } for _, im := range ims { + if !im.IsEnable { + continue + } + switch im.Type { case model.ImTypeDingTalk: - if !im.IsEnable { - continue - } - dingTalk := &dingding.DingTalk{ AppKey: im.AppKey, AppSecret: im.AppSecret, } - if err := dingTalk.CancelApprovalInstance(approveInstanceCode); err != nil { - newLog.Errorf("cancel dingtalk approval instance error: %v", err) + // batch update ding_talk_instances'status into canceled + err = s.BatchUpdateStatusOfDingTalkInstance(workflowIds, model.ApproveStatusCancel) + if err != nil { + newLog.Errorf("batch update ding_talk_instances'status into canceled, error: %v", err) + return + } + + dingTalkInstList, err := s.GetDingTalkInstanceListByWorkflowIDs(workflowIds) + if err != nil { + newLog.Errorf("get dingtalk dingTalkInst list by workflow id slice error: %v", err) + return + } + + for _, dingTalkInst := range dingTalkInstList { + inst := dingTalkInst + // 如果在钉钉上已经同意或者拒绝<=>dingtalk instance的status不为initialized + // 则只修改钉钉工单状态为取消,不调用取消钉钉工单的API + if inst.Status != model.ApproveStatusInitialized { + newLog.Infof("the dingtalk dingTalkInst cannot be canceled if its status is not initialized, workflow id: %v", dingTalkInst.WorkflowId) + continue + } + + go func() { + if err := dingTalk.CancelApprovalInstance(inst.ApproveInstanceCode); err != nil { + newLog.Errorf("cancel dingtalk approval instance error: %v,instant id: %v", err, inst.ID) + } + }() + } + case model.ImTypeFeishuAudit: + err = CancelFeishuAuditInst(context.TODO(), im, workflowIds, user) + if err != nil { + newLog.Errorf("cancel feishu audit instance error: %v", err) return } default: diff --git a/sqle/pkg/im/im_ce.go b/sqle/pkg/im/im_ce.go new file mode 100644 index 0000000000..803bd748d8 --- /dev/null +++ b/sqle/pkg/im/im_ce.go @@ -0,0 +1,29 @@ +//go:build !enterprise +// +build !enterprise + +package im + +import ( + "context" + e "errors" + + "github.com/actiontech/sqle/sqle/model" +) + +var ErrCommunityEditionNotSupportFeishuAudit = e.New("community edition not support feishu audit") + +func CreateFeishuAuditTemplate(ctx context.Context, im model.IM) error { + return ErrCommunityEditionNotSupportFeishuAudit +} + +func CreateFeishuAuditInst(ctx context.Context, im model.IM, workflow *model.Workflow, assignUsers []*model.User, url string) error { + return ErrCommunityEditionNotSupportFeishuAudit +} + +func UpdateFeishuAuditStatus(ctx context.Context, im model.IM, workflowId uint, user *model.User, status string, reason string) error { + return ErrCommunityEditionNotSupportFeishuAudit +} + +func CancelFeishuAuditInst(ctx context.Context, im model.IM, workflowIDs []uint, user *model.User) error { + return ErrCommunityEditionNotSupportFeishuAudit +} diff --git a/sqle/server/feishu_ce.go b/sqle/server/feishu_ce.go new file mode 100644 index 0000000000..1fc49b7fca --- /dev/null +++ b/sqle/server/feishu_ce.go @@ -0,0 +1,24 @@ +//go:build !enterprise +// +build !enterprise + +package server + +import ( + "time" + + "github.com/sirupsen/logrus" +) + +type FeishuJob struct { + BaseJob +} + +func NewFeishuJob(entry *logrus.Entry) ServerJob { + f := new(FeishuJob) + f.BaseJob = *NewBaseJob(entry, 60*time.Second, f.feishuRotation) + return f +} + +func (j *FeishuJob) feishuRotation(entry *logrus.Entry) { + // do nothing +} diff --git a/sqle/server/manager.go b/sqle/server/manager.go index 6fca28ca3c..b0f67b19ef 100644 --- a/sqle/server/manager.go +++ b/sqle/server/manager.go @@ -16,6 +16,7 @@ type ServerJob interface { var OnlyRunOnLeaderJobs = []func(entry *logrus.Entry) ServerJob{ NewCleanJob, NewDingTalkJob, + NewFeishuJob, } var RunOnAllJobs = []func(entry *logrus.Entry) ServerJob{ diff --git a/sqle/server/workflow_schedule.go b/sqle/server/workflow_schedule.go index 70cb5e83ce..ebec281615 100644 --- a/sqle/server/workflow_schedule.go +++ b/sqle/server/workflow_schedule.go @@ -3,6 +3,7 @@ package server import ( "fmt" "strconv" + "strings" "sync" "time" @@ -11,10 +12,11 @@ import ( "github.com/actiontech/sqle/sqle/log" "github.com/actiontech/sqle/sqle/model" "github.com/actiontech/sqle/sqle/notification" - "github.com/sirupsen/logrus" ) +var ErrWorkflowNoAccess = errors.New(errors.DataNotExist, fmt.Errorf("workflow is not exist or you can't access it")) + type WorkflowScheduleJob struct { BaseJob } @@ -259,3 +261,140 @@ func RejectWorkflowProcess(workflow *model.Workflow, reason string, user *model. return nil } + +func ExecuteTasksProcess(workflowId string, projectName string, user *model.User) error { + s := model.GetStorage() + workflow, exist, err := s.GetWorkflowDetailById(workflowId) + if err != nil { + return err + } + if !exist { + return err + } + + if err := PrepareForWorkflowExecution(projectName, workflow, user); err != nil { + return err + } + + needExecTaskIds, err := GetNeedExecTaskIds(s, workflow, user) + if err != nil { + return err + } + + err = ExecuteWorkflow(workflow, needExecTaskIds) + if err != nil { + return err + } + + return nil +} + +func PrepareForWorkflowExecution(projectName string, workflow *model.Workflow, user *model.User) error { + err := CheckCurrentUserCanOperateWorkflowByUser(user, &model.Project{Name: projectName}, workflow, []uint{}) + if err != nil { + return err + } + + currentStep := workflow.CurrentStep() + if currentStep == nil { + return errors.New(errors.DataInvalid, fmt.Errorf("workflow current step not found")) + } + + if workflow.Record.Status != model.WorkflowStatusWaitForExecution { + return errors.New(errors.DataInvalid, + fmt.Errorf("workflow need to be approved first")) + } + + err = CheckUserCanOperateStep(user, workflow, int(currentStep.ID)) + if err != nil { + return errors.New(errors.DataInvalid, err) + } + return nil +} + +func GetNeedExecTaskIds(s *model.Storage, workflow *model.Workflow, user *model.User) (taskIds map[uint] /*task id*/ uint /*user id*/, err error) { + instances, err := s.GetInstancesByWorkflowID(workflow.ID) + if err != nil { + return nil, err + } + // 有不在运维时间内的instances报错 + var cannotExecuteInstanceNames []string + for _, inst := range instances { + if len(inst.MaintenancePeriod) != 0 && !inst.MaintenancePeriod.IsWithinScope(time.Now()) { + cannotExecuteInstanceNames = append(cannotExecuteInstanceNames, inst.Name) + } + } + if len(cannotExecuteInstanceNames) > 0 { + return nil, errors.New(errors.TaskActionInvalid, + fmt.Errorf("please go online during instance operation and maintenance time. these instances are not in maintenance time[%v]", strings.Join(cannotExecuteInstanceNames, ","))) + } + + // 定时的instances和已上线的跳过 + needExecTaskIds := make(map[uint]uint) + for _, instRecord := range workflow.Record.InstanceRecords { + if instRecord.ScheduledAt != nil || instRecord.IsSQLExecuted { + continue + } + needExecTaskIds[instRecord.TaskId] = user.ID + } + return needExecTaskIds, nil +} + +func CheckCurrentUserCanOperateWorkflowByUser(user *model.User, project *model.Project, workflow *model.Workflow, ops []uint) error { + if user.Name == model.DefaultAdminUser { + return nil + } + + s := model.GetStorage() + + isManager, err := s.IsProjectManager(user.Name, project.Name) + if err != nil { + return err + } + if isManager { + return nil + } + + access, err := s.UserCanAccessWorkflow(user, workflow) + if err != nil { + return err + } + if access { + return nil + } + if len(ops) > 0 { + instances, err := s.GetInstancesByWorkflowID(workflow.ID) + if err != nil { + return err + } + ok, err := s.CheckUserHasOpToInstances(user, instances, ops) + if err != nil { + return err + } + if ok { + return nil + } + } + + return ErrWorkflowNoAccess +} + +func CheckUserCanOperateStep(user *model.User, workflow *model.Workflow, stepId int) error { + if workflow.Record.Status != model.WorkflowStatusWaitForAudit && workflow.Record.Status != model.WorkflowStatusWaitForExecution { + return fmt.Errorf("workflow status is %s, not allow operate it", workflow.Record.Status) + } + + currentStep := workflow.CurrentStep() + if currentStep == nil { + return fmt.Errorf("workflow current step not found") + } + if uint(stepId) != workflow.CurrentStep().ID { + return fmt.Errorf("workflow current step is not %d", stepId) + } + + if !workflow.IsOperationUser(user) { + return fmt.Errorf("you are not allow to operate the workflow") + } + + return nil +}