Skip to content

Commit

Permalink
fix: 代码注解
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Jul 31, 2023
1 parent 0d73cf4 commit 60b6490
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 19 deletions.
4 changes: 3 additions & 1 deletion example/example1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ func TestTaskPool1(t *testing.T) {
// input:池数量
pool := workerpool.NewPool(5)


// 需要处理的任务
tt := func(data interface{}) error {
taskID := data.(int)
Expand All @@ -43,4 +42,7 @@ func TestTaskPool1(t *testing.T) {
}
pool.Run() // 启动




}
2 changes: 0 additions & 2 deletions example/example2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ func TestTaskPool2(t *testing.T) {
pool.AddGlobalQueue(task)
}


// 启动在后台等待执行
go pool.RunBackground()


for {
taskID := rand.Intn(100) + 20

Expand Down
18 changes: 11 additions & 7 deletions pkg/workerpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ type Pool struct {
// list 装task
Tasks []*Task
Workers []*worker

// 工作池数量
concurrency int
// 用来装
collector chan *Task
// collector 用来输入所有Task对象的chan
collector chan *Task
// runBackground 后台运行时,结束时需要传入的标示
runBackground chan bool
wg sync.WaitGroup
}
Expand All @@ -24,6 +24,7 @@ type Pool struct {
func NewPool(concurrency int) *Pool {
return &Pool{
Tasks: make([]*Task, 0),
//Workers: make([]*worker, 0),
concurrency: concurrency,
collector: make(chan *Task, 10),
runBackground: make(chan bool),
Expand All @@ -36,8 +37,12 @@ func (p *Pool) AddGlobalQueue(task *Task) {
p.Tasks = append(p.Tasks, task)
}

// Run 启动pool,使用Run()方法调用时,只能使用AddGlobalQueue加入全局队列,
// 一旦Run启动后,就不允许调用AddTask加入Task,如果需动态加入pool,可以使用
// RunBackground方法
func (p *Pool) Run() {
// 总共会开启p.concurrency个goroutine (因为Start函数)
// 总共会开启p.concurrency个goroutine
// 启动pool中的每个worker都传入collector chan
for i := 1; i <= p.concurrency; i++ {
worker := newWorker(p.collector, i)
worker.start(&p.wg)
Expand All @@ -48,15 +53,14 @@ func (p *Pool) Run() {
time.Sleep(time.Millisecond)
}

// 把好的任务放入collector
// 把放在tasks列表的的任务放入collector
for i := range p.Tasks {
p.collector <- p.Tasks[i]

}

// 关闭通道
// 注意,这里需要close chan。
close(p.collector)

// 阻塞,等待所有的goroutine执行完毕
p.wg.Wait()
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/workerpool/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import (
)

/*
本质:用全局的切片分配任务给多个workers并发处理。
本质:用全局的切片分配任务给多个workers并发处理。
*/

// Task 一个具体任务需求
Expand All @@ -25,7 +24,7 @@ func NewTask(f func(interface{}) error, data interface{}) *Task {
}

// process 执行任务的函数。
func process(workerID int, task *Task) {
klog.Info("worker: ", workerID, ", processes task: ", task.Data)
task.Err = task.f(task.Data) // 执行任务。如果任务执行错误,赋值err
func (t *Task) process(workerID int) {
klog.Info("worker: ", workerID, ", processes task: ", t.Data)
t.Err = t.f(t.Data) // 执行任务。如果任务执行错误,赋值err
}
10 changes: 6 additions & 4 deletions pkg/workerpool/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type worker struct {
quit chan bool
}

// 建立新的消费者
// newWorker 建立新的消费者
func newWorker(channel chan *Task, ID int) *worker {
return &worker{
ID: ID,
Expand All @@ -23,25 +23,27 @@ func newWorker(channel chan *Task, ID int) *worker {
}
}

// Start 执行,遍历taskChan,每个任务都启一个goroutine执行
// start 执行任务,遍历taskChan,每个worker都启一个goroutine执行
func (wr *worker) start(wg *sync.WaitGroup) {
klog.Info("Starting worker: ", wr.ID)

wg.Add(1)
go func() {
defer wg.Done()
// 不断从chan中取出task执行
for task := range wr.taskChan {
process(wr.ID, task)
task.process(wr.ID)
}
}()
}

// startBackground 后台执行
func (wr *worker) startBackground() {
klog.Info("Starting worker background: ", wr.ID)
for {
select {
case task := <-wr.taskChan:
process(wr.ID, task)
task.process(wr.ID)
case <-wr.quit:
return
}
Expand Down

0 comments on commit 60b6490

Please sign in to comment.