From 60b6490778948f008346b414862373c9633dc1de Mon Sep 17 00:00:00 2001 From: googs1025 Date: Mon, 31 Jul 2023 23:08:25 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BB=A3=E7=A0=81=E6=B3=A8=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/example1_test.go | 4 +++- example/example2_test.go | 2 -- pkg/workerpool/pool.go | 18 +++++++++++------- pkg/workerpool/task.go | 9 ++++----- pkg/workerpool/worker.go | 10 ++++++---- 5 files changed, 24 insertions(+), 19 deletions(-) diff --git a/example/example1_test.go b/example/example1_test.go index 0565f7b..aad7f5e 100644 --- a/example/example1_test.go +++ b/example/example1_test.go @@ -21,7 +21,6 @@ func TestTaskPool1(t *testing.T) { // input:池数量 pool := workerpool.NewPool(5) - // 需要处理的任务 tt := func(data interface{}) error { taskID := data.(int) @@ -43,4 +42,7 @@ func TestTaskPool1(t *testing.T) { } pool.Run() // 启动 + + + } diff --git a/example/example2_test.go b/example/example2_test.go index f704a9a..195f6e1 100644 --- a/example/example2_test.go +++ b/example/example2_test.go @@ -43,11 +43,9 @@ func TestTaskPool2(t *testing.T) { pool.AddGlobalQueue(task) } - // 启动在后台等待执行 go pool.RunBackground() - for { taskID := rand.Intn(100) + 20 diff --git a/pkg/workerpool/pool.go b/pkg/workerpool/pool.go index ad55308..eecdf38 100644 --- a/pkg/workerpool/pool.go +++ b/pkg/workerpool/pool.go @@ -11,11 +11,11 @@ type Pool struct { // list 装task Tasks []*Task Workers []*worker - // 工作池数量 concurrency int - // 用来装 - collector chan *Task + // collector 用来输入所有Task对象的chan + collector chan *Task + // runBackground 后台运行时,结束时需要传入的标示 runBackground chan bool wg sync.WaitGroup } @@ -24,6 +24,7 @@ type Pool struct { func NewPool(concurrency int) *Pool { return &Pool{ Tasks: make([]*Task, 0), + //Workers: make([]*worker, 0), concurrency: concurrency, collector: make(chan *Task, 10), runBackground: make(chan bool), @@ -36,8 +37,12 @@ func (p *Pool) AddGlobalQueue(task *Task) { p.Tasks = append(p.Tasks, task) } +// Run 启动pool,使用Run()方法调用时,只能使用AddGlobalQueue加入全局队列, +// 一旦Run启动后,就不允许调用AddTask加入Task,如果需动态加入pool,可以使用 +// RunBackground方法 func (p *Pool) Run() { - // 总共会开启p.concurrency个goroutine (因为Start函数) + // 总共会开启p.concurrency个goroutine + // 启动pool中的每个worker都传入collector chan for i := 1; i <= p.concurrency; i++ { worker := newWorker(p.collector, i) worker.start(&p.wg) @@ -48,15 +53,14 @@ func (p *Pool) Run() { time.Sleep(time.Millisecond) } - // 把好的任务放入collector + // 把放在tasks列表的的任务放入collector for i := range p.Tasks { p.collector <- p.Tasks[i] } - // 关闭通道 + // 注意,这里需要close chan。 close(p.collector) - // 阻塞,等待所有的goroutine执行完毕 p.wg.Wait() } diff --git a/pkg/workerpool/task.go b/pkg/workerpool/task.go index 2dce7c4..82a3ed9 100644 --- a/pkg/workerpool/task.go +++ b/pkg/workerpool/task.go @@ -5,8 +5,7 @@ import ( ) /* - 本质:用全局的切片分配任务给多个workers并发处理。 - + 本质:用全局的切片分配任务给多个workers并发处理。 */ // Task 一个具体任务需求 @@ -25,7 +24,7 @@ func NewTask(f func(interface{}) error, data interface{}) *Task { } // process 执行任务的函数。 -func process(workerID int, task *Task) { - klog.Info("worker: ", workerID, ", processes task: ", task.Data) - task.Err = task.f(task.Data) // 执行任务。如果任务执行错误,赋值err +func (t *Task) process(workerID int) { + klog.Info("worker: ", workerID, ", processes task: ", t.Data) + t.Err = t.f(t.Data) // 执行任务。如果任务执行错误,赋值err } diff --git a/pkg/workerpool/worker.go b/pkg/workerpool/worker.go index f1e432a..f382b7e 100644 --- a/pkg/workerpool/worker.go +++ b/pkg/workerpool/worker.go @@ -14,7 +14,7 @@ type worker struct { quit chan bool } -// 建立新的消费者 +// newWorker 建立新的消费者 func newWorker(channel chan *Task, ID int) *worker { return &worker{ ID: ID, @@ -23,25 +23,27 @@ func newWorker(channel chan *Task, ID int) *worker { } } -// Start 执行,遍历taskChan,每个任务都启一个goroutine执行。 +// start 执行任务,遍历taskChan,每个worker都启一个goroutine执行。 func (wr *worker) start(wg *sync.WaitGroup) { klog.Info("Starting worker: ", wr.ID) wg.Add(1) go func() { defer wg.Done() + // 不断从chan中取出task执行 for task := range wr.taskChan { - process(wr.ID, task) + task.process(wr.ID) } }() } +// startBackground 后台执行 func (wr *worker) startBackground() { klog.Info("Starting worker background: ", wr.ID) for { select { case task := <-wr.taskChan: - process(wr.ID, task) + task.process(wr.ID) case <-wr.quit: return }