Skip to content

Commit

Permalink
Merge pull request #219 from tbs60/dev_yanafu
Browse files Browse the repository at this point in the history
task创建流程优化
  • Loading branch information
tming authored May 15, 2024
2 parents 01df55c + 966069b commit 44d8868
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 29 deletions.
6 changes: 5 additions & 1 deletion src/backend/booster/server/pkg/api/v1/dcc/distcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,11 @@ func ApplyResource(req *restful.Request, resp *restful.Response) {
Extra: string(extraData),
})
if err != nil {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
if err == engine.ErrorProjectNoFound {
blog.Warnf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
} else {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
}
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrApplyResourceFailed, Message: err.Error()})
return
}
Expand Down
8 changes: 6 additions & 2 deletions src/backend/booster/server/pkg/api/v2/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ func ApplyResource(req *restful.Request, resp *restful.Response) {

tb, err := defaultManager.CreateTask(param)
if err != nil {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
if err == engine.ErrorProjectNoFound {
blog.Warnf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
} else {
blog.Errorf("apply resource: create task failed, url(%s): %v", req.Request.URL.String(), err)
}
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrApplyResourceFailed, Message: err.Error()})
return
}
Expand Down Expand Up @@ -89,7 +93,7 @@ func SendMessage(req *restful.Request, resp *restful.Response) {
return
}
if data, err = defaultManager.SendProjectMessage(param.ProjectID, []byte(param.Extra)); err != nil {
blog.Errorf("send message: send project(%s) message to engine failed, url(%s) message(%s): %v",
blog.Warnf("send message: send project(%s) message to engine failed, url(%s) message(%s): %v",
param.ProjectID, req.Request.URL.String(), param.Extra, err)
api.ReturnRest(&api.RestResponse{Resp: resp, ErrCode: api.ServerErrSendMessageFailed, Message: err.Error()})
return
Expand Down
64 changes: 58 additions & 6 deletions src/backend/booster/server/pkg/manager/normal/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,14 @@ type TaskBasicLayer interface {
// list task basic from cache, return a new pointer
ListTaskBasic(released bool, statusList ...engine.TaskStatusType) ([]*engine.TaskBasic, error)

// init task basic, create task basic table in database
InitTaskBasic(tb *engine.TaskBasic) error
// create task basic, create task basic table in database
CreateTaskBasic(tb *engine.TaskBasic) error

//insert task basic to cache first time
InsertTB(tb *engine.TaskBasic) error

//delete task basic from cache
DeleteTB(tb *engine.TaskBasic)

// update task basic, both database and cache, just update the field implements in task basic
UpdateTaskBasic(tb *engine.TaskBasic) error
Expand Down Expand Up @@ -237,9 +243,23 @@ func (tc *taskBasicLayer) ListTaskBasic(
return rl, nil
}

// InitTaskBasic init a task basic into layer cache and databases.
func (tc *taskBasicLayer) InitTaskBasic(tb *engine.TaskBasic) error {
return tc.updateTaskBasic(tb, true)
// CreateTaskBasic create a task basic in databases.
func (tc *taskBasicLayer) CreateTaskBasic(tb *engine.TaskBasic) error {
egn, err := tc.GetEngineByTypeName(tb.Client.EngineName)
if err != nil {
blog.Errorf("layer: try updating task basic(%s), get engine(%s) failed: %v", tb.ID, tb.Client.EngineName, err)
return err
}
err = engine.CreateTaskBasic(egn, tb)

if err != nil {
blog.Errorf("layer: update task basic(%s) via engine(%s) failed: %v", tb.ID, tb.Client.EngineName, err)
return err

}
blog.Infof("layer: success to init task basic(%s) in status(%s) with engine(%s) and queue(%s)",
tb.ID, tb.Status.Status, tb.Client.EngineName, tb.Client.QueueName)
return nil
}

// UpdateTaskBasic update a existing task basic into layer cache and databases.
Expand Down Expand Up @@ -337,11 +357,43 @@ func (tc *taskBasicLayer) updateTaskBasic(tbRaw *engine.TaskBasic, new bool) err
return nil
}

// InsertTB create a new record of init task in cache, do not need to create in queue
func (tc *taskBasicLayer) InsertTB(tbRaw *engine.TaskBasic) error {
tb := engine.CopyTaskBasic(tbRaw)
blog.Infof("layer: going to insertTB(%s) status(%s) to cache", tb.ID, tb.Status.Status)
tc.tbmLock.Lock()
defer tc.tbmLock.Unlock()
if tb.Status.Status != engine.TaskStatusInit {
return fmt.Errorf("taskId %s is not init status,can not insert into cache", tb.ID)
}
if _, ok := tc.tbm[tb.ID]; ok {
return fmt.Errorf("taskId %s already exist in cache", tb.ID)
}
selfMetric.TaskNumController.Inc(
tb.Client.EngineName.String(), tb.Client.QueueName, string(tb.Status.Status), "")
tc.tbm[tb.ID] = tb
return nil
}

// DeleteTB delete task from cache and queue if task exsited
func (tc *taskBasicLayer) DeleteTB(tb *engine.TaskBasic) {
blog.Infof("layer: going to deleteTB(%s) status(%s) from cache and queue", tb.ID, tb.Status.Status)
tc.tbmLock.Lock()
defer tc.tbmLock.Unlock()

if oldTask, ok := tc.tbm[tb.ID]; ok {
selfMetric.TaskNumController.Dec(
tb.Client.EngineName.String(), oldTask.Client.QueueName, string(oldTask.Status.Status), "")
}
tc.deleteTBFromQueue(tb)
delete(tc.tbm, tb.ID)
}

func (tc *taskBasicLayer) putTB(tb *engine.TaskBasic) {
tc.tbmLock.Lock()
defer tc.tbmLock.Unlock()

blog.Debugf("layer: get lock and going to putTB(%s) to cache and queue", tb.ID)
blog.Debugf("layer: get lock and going to putTB(%s) status(%s) to cache and queue", tb.ID, tb.Status.Status)

// update metric data of task num
// decrease last status num and add current status num, if the status is same as last one, then do nothing
Expand Down
48 changes: 29 additions & 19 deletions src/backend/booster/server/pkg/manager/normal/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (m *manager) createTask(param *mgr.TaskCreateParam) (*engine.TaskBasic, err

pb, egn, err := m.getBasicProject(param.ProjectID)
if err != nil {
blog.Errorf("manager: try creating task, get project(%s) failed: %v", param.ProjectID, err)
blog.Warnf("manager: try creating task, get project(%s) failed: %v", param.ProjectID, err)
return nil, err
}

Expand All @@ -332,18 +332,19 @@ func (m *manager) createTask(param *mgr.TaskCreateParam) (*engine.TaskBasic, err

// lock project when creating task for controlling concurrency
m.layer.LockProject(param.ProjectID)
defer m.layer.UnLockProject(param.ProjectID)

if err = m.invalidConcurrency(pb); err != nil {
blog.Errorf("manager: try creating task, check concurrency for project(%s) in engine(%s) failed: %v",
param.ProjectID, pb.EngineName.String(), err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}

taskID, err := m.generateTaskID(egn, param.ProjectID)
if err != nil {
blog.Errorf("manager: try creating task, generate taskID for project(%s) in engine(%s) failed: %v",
param.ProjectID, pb.EngineName.String(), err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}

Expand Down Expand Up @@ -379,14 +380,36 @@ func (m *manager) createTask(param *mgr.TaskCreateParam) (*engine.TaskBasic, err
}
if err = tb.Check(); err != nil {
blog.Errorf("manager: create task basic(%s) check failed: %v", taskID, err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}
tb.Status.Init()
tb.Status.Message = messageTaskInit
//creat task to cache, if task exsited, return error
if err = m.layer.InsertTB(tb); err != nil {
blog.Errorf("manager: create task basic(%s) insert db failed: %v", taskID, err)
m.layer.UnLockProject(param.ProjectID)
return nil, err
}
m.layer.UnLockProject(param.ProjectID)

ok, err = engine.CheckTaskIDValid(egn, taskID)
if !ok {
if err == nil {
err = fmt.Errorf("task %s is already exsit in db", taskID)
}
blog.Errorf("manager: check task valid(%s) for project(%s) in engine(%s) failed: %v",
taskID, param.ProjectID, pb.EngineName.String(), err)
//check task id failed, now task not in db, delete task from cache directly
m.layer.DeleteTB(tb)
return nil, err
}

if err = m.layer.InitTaskBasic(tb); err != nil {
if err = m.layer.CreateTaskBasic(tb); err != nil {
blog.Errorf("manager: create task basic(%s) for project(%s) in engine(%s) failed: %v",
taskID, param.ProjectID, pb.EngineName.String(), err)
//insert task to db failed, delete task from cache directly
m.layer.DeleteTB(tb)
return nil, err
}
if err = egn.CreateTaskExtension(tb, []byte(param.Extra)); err != nil {
Expand Down Expand Up @@ -417,7 +440,7 @@ func (m *manager) sendProjectMessage(projectID string, data []byte) ([]byte, err

_, egn, err := m.getBasicProject(projectID)
if err != nil {
blog.Errorf("manager: try sending project message, get project(%s) failed: %v", projectID, err)
blog.Warnf("manager: try sending project message, get project(%s) failed: %v", projectID, err)
return nil, err
}

Expand Down Expand Up @@ -535,7 +558,7 @@ func (m *manager) getBasicProject(projectID string) (*engine.ProjectBasic, engin
return pb, egn, nil
}

blog.Errorf("manager: get project(%s) no found", projectID)
blog.Warnf("manager: get project(%s) no found", projectID)
return nil, nil, engine.ErrorProjectNoFound
}

Expand Down Expand Up @@ -619,20 +642,7 @@ func (m *manager) invalidConcurrency(pb *engine.ProjectBasic) error {
}

func (m *manager) generateTaskID(egn engine.Engine, projectID string) (string, error) {
for i := 0; i < 3; i++ {
taskID := generateTaskID(egn.Name().String(), projectID)

ok, err := engine.CheckTaskIDValid(egn, taskID)
if err != nil {
return "", err
}

if ok {
return taskID, nil
}
}

return "", types.ErrorGenerateTaskIDFailed
return generateTaskID(egn.Name().String(), projectID), nil
}

func generateTaskID(egnName string, projectID string) string {
Expand Down
2 changes: 1 addition & 1 deletion src/backend/booster/server/pkg/manager/normal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const (
trackerCheckGapTime = 1 * time.Second
trackerTrackGapTime = 1 * time.Second

keeperHealthCheckGapTime = 10 * time.Second
keeperHealthCheckGapTime = 5 * time.Second
keeperFirstStartGraceTime = 1 * time.Minute
keeperInitTimeout = 20 * time.Second
keeperStartingTimeout = 120 * time.Second
Expand Down

0 comments on commit 44d8868

Please sign in to comment.