-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathcrontask.go
221 lines (203 loc) · 6.12 KB
/
crontask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package task
import (
"errors"
"fmt"
"strings"
"time"
)
type (
//CronTask cron task info define
CronTask struct {
TaskInfo
RawExpress string `json:"express"` //运行周期表达式,当TaskType==TaskType_Cron时有效
time_WeekDay *ExpressSet
time_Month *ExpressSet
time_Day *ExpressSet
time_Hour *ExpressSet
time_Minute *ExpressSet
time_Second *ExpressSet
}
)
// GetConfig get task config info
func (task *CronTask) GetConfig() *TaskConfig {
return &TaskConfig{
TaskID: task.taskID,
TaskType: task.TaskType,
IsRun: task.IsRun,
Handler: task.handler,
DueTime: task.DueTime,
Interval: 0,
Express: task.RawExpress,
TaskData: task.TaskData,
}
}
//Reset first check conf, then reload conf & restart task
//special, TaskID can not be reset
//special, if TaskData is nil, it can not be reset
//special, if Handler is nil, it can not be reset
func (task *CronTask) Reset(conf *TaskConfig) error {
expresslist := strings.Split(conf.Express, " ")
//basic check
if conf.Express == "" {
errmsg := "express is empty"
task.taskService.Logger().Debug(fmt.Sprint("TaskInfo:Reset ", task, conf, "error", errmsg))
return errors.New(errmsg)
}
if len(expresslist) != 6 {
errmsg := "express is wrong format => not 6 parts"
task.taskService.Logger().Debug(fmt.Sprint("TaskInfo:Reset ", task, conf, "error", errmsg))
return errors.New("express is wrong format => not 6 parts")
}
//restart task
task.Stop()
task.IsRun = conf.IsRun
if conf.TaskData != nil {
task.TaskData = conf.TaskData
}
if conf.Handler != nil {
task.handler = conf.Handler
}
task.DueTime = conf.DueTime
task.RawExpress = conf.Express
if task.TaskType == TaskType_Cron {
task.time_WeekDay = parseExpress(expresslist[5], ExpressType_WeekDay)
task.taskService.debugExpress(task.time_WeekDay)
task.time_Month = parseExpress(expresslist[4], ExpressType_Month)
task.taskService.debugExpress(task.time_Month)
task.time_Day = parseExpress(expresslist[3], ExpressType_Day)
task.taskService.debugExpress(task.time_Day)
task.time_Hour = parseExpress(expresslist[2], ExpressType_Hour)
task.taskService.debugExpress(task.time_Hour)
task.time_Minute = parseExpress(expresslist[1], ExpressType_Minute)
task.taskService.debugExpress(task.time_Minute)
task.time_Second = parseExpress(expresslist[0], ExpressType_Second)
task.taskService.debugExpress(task.time_Second)
}
task.Start()
task.taskService.Logger().Debug(fmt.Sprint("TaskInfo:Reset ", task, conf, "success"))
return nil
}
//Start start task
func (task *CronTask) Start() {
if !task.IsRun {
return
}
task.mutex.Lock()
defer task.mutex.Unlock()
if task.State == TaskState_Init || task.State == TaskState_Stop {
task.State = TaskState_Run
startCronTask(task)
}
}
// RunOnce do task only once
// no match Express or Interval
// no recover panic
// support for #6 新增RunOnce方法建议
func (task *CronTask) RunOnce() error {
err := task.handler(task.getTaskContext())
return err
}
// NewCronTask create new cron task
func NewCronTask(taskID string, isRun bool, express string, handler TaskHandle, taskData interface{}) (Task, error) {
task := new(CronTask)
task.initCounters()
task.taskID = taskID
task.TaskType = TaskType_Cron
task.IsRun = isRun
task.handler = handler
task.RawExpress = express
task.TaskData = taskData
expressList := strings.Split(express, " ")
if len(expressList) != 6 {
return nil, errors.New("express is wrong format => not 6 parts")
}
task.time_WeekDay = parseExpress(expressList[5], ExpressType_WeekDay)
task.time_Month = parseExpress(expressList[4], ExpressType_Month)
task.time_Day = parseExpress(expressList[3], ExpressType_Day)
task.time_Hour = parseExpress(expressList[2], ExpressType_Hour)
task.time_Minute = parseExpress(expressList[1], ExpressType_Minute)
task.time_Second = parseExpress(expressList[0], ExpressType_Second)
task.State = TaskState_Init
return task, nil
}
//start cron task
func startCronTask(task *CronTask) {
now := time.Now()
nowSecond := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), now.Second(), 0, time.Local)
afterTime := nowSecond.Add(time.Second).Sub(time.Now().Local())
task.TimeTicker = time.NewTicker(DefaultPeriod)
go func() {
time.Sleep(afterTime)
for {
select {
case <-task.TimeTicker.C:
doCronTask(task)
}
}
}()
}
func doCronTask(task *CronTask) {
taskCtx := task.getTaskContext()
defer func() {
if taskCtx.TimeoutCancel != nil {
taskCtx.TimeoutCancel()
}
task.putTaskContext(taskCtx)
if err := recover(); err != nil {
task.CounterInfo().ErrorCounter.Inc(1)
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), " cron handler recover error => ", err))
if task.taskService.ExceptionHandler != nil {
task.taskService.ExceptionHandler(taskCtx, fmt.Errorf("%v", err))
}
//goroutine panic, restart cron task
startCronTask(task)
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), " goroutine panic, restart CronTask"))
}
}()
handler := func() {
defer func() {
if task.Timeout > 0 {
taskCtx.doneChan <- struct{}{}
}
}()
now := time.Now()
if task.time_WeekDay.IsMatch(now) &&
task.time_Month.IsMatch(now) &&
task.time_Day.IsMatch(now) &&
task.time_Hour.IsMatch(now) &&
task.time_Minute.IsMatch(now) &&
task.time_Second.IsMatch(now) {
//inc run counter
task.CounterInfo().RunCounter.Inc(1)
//do log
if task.taskService != nil && task.taskService.OnBeforeHandler != nil {
task.taskService.OnBeforeHandler(taskCtx)
}
var err error
if !taskCtx.IsEnd {
err = task.handler(taskCtx)
}
if err != nil {
taskCtx.Error = err
task.CounterInfo().ErrorCounter.Inc(1)
if task.taskService != nil && task.taskService.ExceptionHandler != nil {
task.taskService.ExceptionHandler(taskCtx, err)
}
}
if task.taskService != nil && task.taskService.OnEndHandler != nil {
task.taskService.OnEndHandler(taskCtx)
}
}
}
if task.Timeout > 0 {
go handler()
select {
case <-taskCtx.TimeoutContext.Done():
task.taskService.Logger().Debug(fmt.Sprint(task.TaskID(), "do handler timeout."))
case <-taskCtx.doneChan:
return
}
} else {
handler()
}
}