Skip to content

Commit

Permalink
Merge pull request #1793 from actiontech/feishu-5
Browse files Browse the repository at this point in the history
Feishu 5
  • Loading branch information
ColdWaterLW authored Sep 7, 2023
2 parents 5b5f6ca + 091be2c commit c54e165
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 197 deletions.
4 changes: 3 additions & 1 deletion spelling_dict.txt
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,6 @@ Tjwvz046g
slowlogs
tjwvz
ejwvz
wiru
wiru
larkapproval
ccer
3 changes: 2 additions & 1 deletion sqle/api/controller/v1/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/actiontech/sqle/sqle/pkg/im/feishu"

"github.com/labstack/echo/v4"
larkContact "github.com/larksuite/oapi-sdk-go/v3/service/contact/v3"
)

type UpdateSMTPConfigurationReqV1 struct {
Expand Down Expand Up @@ -564,7 +565,7 @@ func TestFeishuConfigV1(c echo.Context) error {
}

client := feishu.NewFeishuClient(feishuCfg.AppKey, feishuCfg.AppSecret)
feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(email, phone)
feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(email, phone, larkContact.UserIdTypeGetUserUserId)
if err != nil {
return c.JSON(http.StatusOK, &TestFeishuConfigResV1{
BaseRes: controller.NewBaseReq(nil),
Expand Down
75 changes: 0 additions & 75 deletions sqle/api/controller/v1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"time"

"github.com/actiontech/sqle/sqle/api/controller"
Expand Down Expand Up @@ -317,24 +316,6 @@ type WorkflowStepResV1 struct {
Reason string `json:"reason,omitempty"`
}

func CheckUserCanOperateStep(user *model.User, workflow *model.Workflow, stepId int) error {
if workflow.Record.Status != model.WorkflowStatusWaitForAudit && workflow.Record.Status != model.WorkflowStatusWaitForExecution {
return fmt.Errorf("workflow status is %s, not allow operate it", workflow.Record.Status)
}
currentStep := workflow.CurrentStep()
if currentStep == nil {
return fmt.Errorf("workflow current step not found")
}
if uint(stepId) != workflow.CurrentStep().ID {
return fmt.Errorf("workflow current step is not %d", stepId)
}

if !workflow.IsOperationUser(user) {
return fmt.Errorf("you are not allow to operate the workflow")
}
return nil
}

// @Deprecated
// @Summary 审批通过
// @Description approve workflow
Expand Down Expand Up @@ -493,34 +474,6 @@ func IsTaskCanExecute(s *model.Storage, taskId string) (bool, error) {
return true, nil
}

func GetNeedExecTaskIds(s *model.Storage, workflow *model.Workflow, user *model.User) (taskIds map[uint] /*task id*/ uint /*user id*/, err error) {
instances, err := s.GetInstancesByWorkflowID(workflow.ID)
if err != nil {
return nil, err
}
// 有不在运维时间内的instances报错
var cannotExecuteInstanceNames []string
for _, inst := range instances {
if len(inst.MaintenancePeriod) != 0 && !inst.MaintenancePeriod.IsWithinScope(time.Now()) {
cannotExecuteInstanceNames = append(cannotExecuteInstanceNames, inst.Name)
}
}
if len(cannotExecuteInstanceNames) > 0 {
return nil, errors.New(errors.TaskActionInvalid,
fmt.Errorf("please go online during instance operation and maintenance time. these instances are not in maintenance time[%v]", strings.Join(cannotExecuteInstanceNames, ",")))
}

// 定时的instances和已上线的跳过
needExecTaskIds := make(map[uint]uint)
for _, instRecord := range workflow.Record.InstanceRecords {
if instRecord.ScheduledAt != nil || instRecord.IsSQLExecuted {
continue
}
needExecTaskIds[instRecord.TaskId] = user.ID
}
return needExecTaskIds, nil
}

func PrepareForTaskExecution(c echo.Context, projectName string, workflow *model.Workflow, user *model.User, TaskId int) error {
if workflow.Record.Status != model.WorkflowStatusWaitForExecution {
return errors.New(errors.DataInvalid, e.New("workflow need to be approved first"))
Expand All @@ -546,29 +499,6 @@ func PrepareForTaskExecution(c echo.Context, projectName string, workflow *model
return e.New("you are not allow to execute the task")
}

func PrepareForWorkflowExecution(c echo.Context, projectName string, workflow *model.Workflow, user *model.User) error {
err := CheckCurrentUserCanOperateWorkflow(c, &model.Project{Name: projectName}, workflow, []uint{})
if err != nil {
return err
}

currentStep := workflow.CurrentStep()
if currentStep == nil {
return errors.New(errors.DataInvalid, fmt.Errorf("workflow current step not found"))
}

if workflow.Record.Status != model.WorkflowStatusWaitForExecution {
return errors.New(errors.DataInvalid,
fmt.Errorf("workflow need to be approved first"))
}

err = CheckUserCanOperateStep(user, workflow, int(currentStep.ID))
if err != nil {
return errors.New(errors.DataInvalid, err)
}
return nil
}

type GetWorkflowTasksResV1 struct {
controller.BaseRes
Data []*GetWorkflowTasksItemV1 `json:"data"`
Expand Down Expand Up @@ -686,7 +616,6 @@ func CheckWorkflowCanCommit(template *model.WorkflowTemplate, tasks []*model.Tas
type GetWorkflowsReqV1 struct {
FilterSubject string `json:"filter_subject" query:"filter_subject"`
FilterWorkflowID string `json:"filter_workflow_id" query:"filter_workflow_id"`
FuzzySearchWorkflowDesc string `json:"fuzzy_search_workflow_desc" query:"fuzzy_search_workflow_desc"`
FilterCreateTimeFrom string `json:"filter_create_time_from" query:"filter_create_time_from"`
FilterCreateTimeTo string `json:"filter_create_time_to" query:"filter_create_time_to"`
FilterCreateUserName string `json:"filter_create_user_name" query:"filter_create_user_name"`
Expand Down Expand Up @@ -805,7 +734,6 @@ func GetGlobalWorkflowsV1(c echo.Context) error {
// @Security ApiKeyAuth
// @Param filter_subject query string false "filter subject"
// @Param filter_workflow_id query string false "filter by workflow_id"
// @Param fuzzy_search_workflow_desc query string false "fuzzy search by workflow description"
// @Param filter_create_time_from query string false "filter create time from"
// @Param filter_create_time_to query string false "filter create time to"
// @Param filter_task_execute_start_time_from query string false "filter_task_execute_start_time_from"
Expand Down Expand Up @@ -853,7 +781,6 @@ func GetWorkflowsV1(c echo.Context) error {

data := map[string]interface{}{
"filter_workflow_id": req.FilterWorkflowID,
"fuzzy_search_workflow_desc": req.FuzzySearchWorkflowDesc,
"filter_subject": req.FilterSubject,
"filter_create_time_from": req.FilterCreateTimeFrom,
"filter_create_time_to": req.FilterCreateTimeTo,
Expand Down Expand Up @@ -1001,7 +928,6 @@ func GetWorkflowV1(c echo.Context) error {

type ExportWorkflowReqV1 struct {
FilterSubject string `json:"filter_subject" query:"filter_subject"`
FuzzySearchWorkflowDesc string `json:"fuzzy_search_workflow_desc" query:"fuzzy_search_workflow_desc"`
FilterCreateTimeFrom string `json:"filter_create_time_from" query:"filter_create_time_from"`
FilterCreateTimeTo string `json:"filter_create_time_to" query:"filter_create_time_to"`
FilterCreateUserName string `json:"filter_create_user_name" query:"filter_create_user_name"`
Expand All @@ -1019,7 +945,6 @@ type ExportWorkflowReqV1 struct {
// @Tags workflow
// @Security ApiKeyAuth
// @Param filter_subject query string false "filter subject"
// @Param fuzzy_search_workflow_desc query string false "fuzzy search by workflow description"
// @Param filter_create_time_from query string false "filter create time from"
// @Param filter_create_time_to query string false "filter create time to"
// @Param filter_task_execute_start_time_from query string false "filter_task_execute_start_time_from"
Expand Down
45 changes: 18 additions & 27 deletions sqle/api/controller/v2/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func ApproveWorkflowV2(c echo.Context) error {

nextStep := workflow.NextStep()

err = v1.CheckUserCanOperateStep(user, workflow, stepId)
err = server.CheckUserCanOperateStep(user, workflow, stepId)
if err != nil {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataInvalid, err))
}
Expand All @@ -108,9 +108,9 @@ func ApproveWorkflowV2(c echo.Context) error {
return controller.JSONBaseErrorReq(c, err)
}

go im.UpdateApprove(workflow.ID, user.Phone, model.ApproveStatusAgree, "")
go im.UpdateApprove(workflow.ID, user, model.ApproveStatusAgree, "")

if nextStep.Template.Typ != model.WorkflowStepTypeSQLExecute {
if nextStep != nil {
go im.CreateApprove(strconv.Itoa(int(workflow.ID)))
}

Expand Down Expand Up @@ -184,7 +184,7 @@ func RejectWorkflowV2(c echo.Context) error {
return controller.JSONBaseErrorReq(c, v1.ErrWorkflowNoAccess)
}

err = v1.CheckUserCanOperateStep(user, workflow, stepId)
err = server.CheckUserCanOperateStep(user, workflow, stepId)
if err != nil {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataInvalid, err))
}
Expand All @@ -202,7 +202,7 @@ func RejectWorkflowV2(c echo.Context) error {
return controller.JSONBaseErrorReq(c, err)
}

go im.UpdateApprove(workflow.ID, user.Phone, model.ApproveStatusRefuse, req.Reason)
go im.UpdateApprove(workflow.ID, user, model.ApproveStatusRefuse, req.Reason)

return c.JSON(http.StatusOK, controller.NewBaseReq(nil))
}
Expand Down Expand Up @@ -262,7 +262,7 @@ func CancelWorkflowV2(c echo.Context) error {
fmt.Errorf("you are not allow to operate the workflow")))
}

go im.CancelApprove(workflow.ID)
go im.BatchCancelApprove([]uint{workflow.ID}, user)

workflow.Record.Status = model.WorkflowStatusCancel
workflow.Record.CurrentWorkflowStepId = 0
Expand Down Expand Up @@ -313,8 +313,12 @@ func BatchCancelWorkflowsV2(c echo.Context) error {
}

projectName := c.Param("project_name")
userName := controller.GetUserName(c)
if err := v1.CheckIsProjectManager(userName, projectName); err != nil {
user, err := controller.GetCurrentUser(c)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

if err := v1.CheckIsProjectManager(user.Name, projectName); err != nil {
return controller.JSONBaseErrorReq(c, err)
}

Expand All @@ -331,7 +335,7 @@ func BatchCancelWorkflowsV2(c echo.Context) error {
workflow.Record.CurrentWorkflowStepId = 0
}

go im.BatchCancelApprove(workflowIds)
go im.BatchCancelApprove(workflowIds, user)

if err := model.GetStorage().BatchUpdateWorkflowStatus(workflows); err != nil {
return controller.JSONBaseErrorReq(c, err)
Expand Down Expand Up @@ -985,7 +989,7 @@ func UpdateWorkflowScheduleV2(c echo.Context) error {
fmt.Errorf("workflow need to be approved first")))
}

err = v1.CheckUserCanOperateStep(user, workflow, int(currentStep.ID))
err = server.CheckUserCanOperateStep(user, workflow, int(currentStep.ID))
if err != nil {
return controller.JSONBaseErrorReq(c, errors.New(errors.DataInvalid, err))
}
Expand Down Expand Up @@ -1052,33 +1056,20 @@ func ExecuteTasksOnWorkflowV2(c echo.Context) error {
return controller.JSONBaseErrorReq(c, v1.ErrWorkflowNoAccess)
}

workflowId = fmt.Sprintf("%v", workflow.ID)

workflow, exist, err = s.GetWorkflowDetailById(workflowId)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if !exist {
return controller.JSONBaseErrorReq(c, v1.ErrWorkflowNoAccess)
}
user, err := controller.GetCurrentUser(c)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}
if err := v1.PrepareForWorkflowExecution(c, projectName, workflow, user); err != nil {
return err
}

needExecTaskIds, err := v1.GetNeedExecTaskIds(s, workflow, user)
if err != nil {
return err
}
workflowId = fmt.Sprintf("%v", workflow.ID)

err = server.ExecuteWorkflow(workflow, needExecTaskIds)
err = server.ExecuteTasksProcess(workflowId, projectName, user)
if err != nil {
return controller.JSONBaseErrorReq(c, err)
}

im.UpdateApprove(workflow.ID, user, model.ApproveStatusAgree, "")

return c.JSON(http.StatusOK, controller.NewBaseReq(nil))
}

Expand Down
58 changes: 55 additions & 3 deletions sqle/model/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package model

import (
"database/sql"
e "errors"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -388,8 +389,9 @@ func (s *Storage) GetWorkflowExpiredHoursOrDefault() (int64, error) {
}

const (
ImTypeDingTalk = "dingTalk"
ImTypeFeishu = "feishu"
ImTypeDingTalk = "dingTalk"
ImTypeFeishu = "feishu"
ImTypeFeishuAudit = "feishu_audit"
)

type IM struct {
Expand Down Expand Up @@ -508,7 +510,7 @@ func (s *Storage) GetDingTalkInstanceListByWorkflowIDs(workflowIds []uint) ([]Di
}

// batch updates ding_talk_instances'status into input status by workflow_ids, the status should be like ApproveStatusXXX in model package.
func (s *Storage) BatchUptateStatusOfDingTalkInstance(workflowIds []uint, status string) error {
func (s *Storage) BatchUpdateStatusOfDingTalkInstance(workflowIds []uint, status string) error {
err := s.db.Model(&DingTalkInstance{}).Where("workflow_id IN (?)", workflowIds).Updates(map[string]interface{}{"status": status}).Error
if err != nil {
return err
Expand All @@ -525,6 +527,56 @@ func (s *Storage) GetDingTalkInstByStatus(status string) ([]DingTalkInstance, er
return dingTalkInstances, nil
}

const (
FeishuAuditStatusInitialized = "INITIALIZED"
FeishuAuditStatusApprove = "APPROVED"
FeishuAuditStatusRejected = "REJECTED"
)

type FeishuInstance struct {
Model
ApproveInstanceCode string `json:"approve_instance" gorm:"column:approve_instance"`
WorkflowId uint `json:"workflow_id" gorm:"column:workflow_id"`
// 审批实例 taskID
TaskID string `json:"task_id" gorm:"column:task_id"`
Status string `json:"status" gorm:"default:\"INITIALIZED\""`
}

func (s *Storage) GetFeishuInstanceListByWorkflowIDs(workflowIds []uint) ([]FeishuInstance, error) {
var feishuInstList []FeishuInstance
err := s.db.Model(&FeishuInstance{}).Where("workflow_id IN (?)", workflowIds).Find(&feishuInstList).Error
if err != nil {
return nil, err
}
return feishuInstList, nil
}

func (s *Storage) BatchUpdateStatusOfFeishuInstance(workflowIds []uint, status string) error {
err := s.db.Model(&FeishuInstance{}).Where("workflow_id IN (?)", workflowIds).Updates(map[string]interface{}{"status": status}).Error
if err != nil {
return err
}
return nil
}

func (s *Storage) GetFeishuInstanceByWorkflowID(workflowId uint) (*FeishuInstance, bool, error) {
fi := new(FeishuInstance)
err := s.db.Where("workflow_id = ?", workflowId).Last(&fi).Error
if e.Is(err, gorm.ErrRecordNotFound) {
return fi, false, nil
}
return fi, true, errors.New(errors.ConnectStorageError, err)
}

func (s *Storage) GetFeishuInstByStatus(status string) ([]FeishuInstance, error) {
var feishuInst []FeishuInstance
err := s.db.Where("status = ?", status).Find(&feishuInst).Error
if err != nil {
return nil, err
}
return feishuInst, nil
}

type PersonaliseConfig struct {
Model
Title string `json:"title" gorm:"column:title"`
Expand Down
1 change: 1 addition & 0 deletions sqle/model/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ var autoMigrateList = []interface{}{
&ManagementPermission{},
&IM{},
&DingTalkInstance{},
&FeishuInstance{},
&SyncInstanceTask{},
&OperationRecord{},
&PersonaliseConfig{},
Expand Down
3 changes: 2 additions & 1 deletion sqle/notification/feishu.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/actiontech/sqle/sqle/log"
"github.com/actiontech/sqle/sqle/model"
"github.com/actiontech/sqle/sqle/pkg/im/feishu"
larkContact "github.com/larksuite/oapi-sdk-go/v3/service/contact/v3"
larkIm "github.com/larksuite/oapi-sdk-go/v3/service/im/v1"
)

Expand Down Expand Up @@ -55,7 +56,7 @@ func (n *FeishuNotifier) Notify(notification Notification, users []*model.User)
}

client := feishu.NewFeishuClient(cfg.AppKey, cfg.AppSecret)
feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(emails, mobiles)
feishuUsers, err := client.GetUsersByEmailOrMobileWithLimitation(emails, mobiles, larkContact.UserIdTypeGetUserUserId)
if err != nil {
return fmt.Errorf("get user_ids from feishu failed: %v", err)
}
Expand Down
Loading

0 comments on commit c54e165

Please sign in to comment.