Skip to content

Commit

Permalink
fix(redis): dts_server修复task可能被两个goroutine执行的bug #1599
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemakeit committed Nov 1, 2023
1 parent 4678b76 commit ee21148
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
2 changes: 1 addition & 1 deletion dbm-services/redis/redis-dts/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
SRV_NAME= redis_dts_server

build:clean
build:
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o ./build/$(SRV_NAME) -v main.go

clean:
Expand Down
30 changes: 16 additions & 14 deletions dbm-services/redis/redis-dts/pkg/dtsJob/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,26 +97,27 @@ func (job *DtsJobBase) BgDtsTaskRunnerWithConcurrency(taskType, dbType string) {
}
go func() {
defer close(genChan)
var toExecuteTasks []*tendisdb.TbTendisDTSTask
var toExecuteRows []*tendisdb.TbTendisDTSTask
for {
if !tendisdb.IsAllDtsTasksToForceKill(toExecuteTasks) {
if !tendisdb.IsAllDtsTasksToForceKill(toExecuteRows) {
// 如果所有dts tasks都是 ForceKillTaskTodo 状态,则大概率该dts job用户已强制终止, 无需sleep
// 否则 sleep 10s
time.Sleep(10 * time.Second)
}
toExecuteTasks, err = tendisdb.GetLast30DaysToExecuteTasks(job.BkCloudID, job.ServerIP, taskType, dbType,
toExecuteRows, err = tendisdb.GetLast30DaysToExecuteTasks(job.BkCloudID, job.ServerIP, taskType, dbType,
status, perTaskNum, job.logger)
if err != nil {
continue
}
if len(toExecuteTasks) == 0 {
if len(toExecuteRows) == 0 {
job.logger.Info(fmt.Sprintf("not found to be executed %q task,sleep 10s", taskType),
zap.String("serverIP", job.ServerIP))
continue
}
for _, task01 := range toExecuteTasks {
task02 := task01
genChan <- task02
for _, row := range toExecuteRows {
toDoRow := row
// 将task放入channel,等待消费者goroutine真正处理
genChan <- toDoRow
}
}
}()
Expand Down Expand Up @@ -160,27 +161,28 @@ func (job *DtsJobBase) BgDtsTaskRunnerWithoutLimit(taskType, dbType string) {
go func() {
defer wg.Done()
defer close(genChan)
var toExecuteTasks []*tendisdb.TbTendisDTSTask
var toExecuteRows []*tendisdb.TbTendisDTSTask
for {
// 生产者: 获取task
// 如果所有dts tasks都是 ForceKillTaskTodo 状态,则大概率该dts job用户已强制终止, 无需sleep
// 否则 sleep 10s
if !tendisdb.IsAllDtsTasksToForceKill(toExecuteTasks) {
if !tendisdb.IsAllDtsTasksToForceKill(toExecuteRows) {
time.Sleep(10 * time.Second)
}
toExecuteTasks, err = tendisdb.GetLast30DaysToExecuteTasks(job.BkCloudID, job.ServerIP, taskType, dbType,
toExecuteRows, err = tendisdb.GetLast30DaysToExecuteTasks(job.BkCloudID, job.ServerIP, taskType, dbType,
status, perTaskNum, job.logger)
if err != nil {
continue
}
if len(toExecuteTasks) == 0 {
if len(toExecuteRows) == 0 {
job.logger.Info(fmt.Sprintf("not found to be executed %q task,sleep 10s", taskType),
zap.String("serverIP", job.ServerIP))
continue
}
for _, task01 := range toExecuteTasks {
task02 := task01
genChan <- task02
for _, row := range toExecuteRows {
toDoRow := row
// 将task放入channel,等待消费者goroutine真正处理
genChan <- toDoRow
}
}
}()
Expand Down
4 changes: 2 additions & 2 deletions dbm-services/redis/redis-dts/pkg/scrdbclient/scrdbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (c *Client) DoNew(method, url string, params interface{}, others map[string
"an error occur while invoking client.Do, error:%v,url:%s,params:%s,resp:%s,retry...",
err, req.URL.String(), util.ToString(params), util.ToString(resp))
c.logger.Error(err.Error())
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)
continue
}
if resp.StatusCode != http.StatusOK {
Expand All @@ -193,7 +193,7 @@ func (c *Client) DoNew(method, url string, params interface{}, others map[string
string(bodyBytes), resp.StatusCode, method, req.URL.String(), string(body))
c.logger.Error(err.Error(), zap.String("Authorization", req.Header.Get("Authorization")))
resp.Body.Close()
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)
continue
}
break
Expand Down

0 comments on commit ee21148

Please sign in to comment.