From 0d73cf4d337e5461bddfaaf454169c42d1e98c72 Mon Sep 17 00:00:00 2001 From: googs1025 Date: Sun, 11 Jun 2023 18:23:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E6=94=B9=E5=85=A8=E5=B1=80?= =?UTF-8?q?=E9=98=9F=E5=88=97=E5=90=AF=E5=8A=A8=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 159 +++++++++++++++++++-------------------- example/example1_test.go | 16 ++-- example/example2_test.go | 17 ++--- pkg/workerpool/pool.go | 22 +++++- 4 files changed, 113 insertions(+), 101 deletions(-) diff --git a/README.md b/README.md index af4c3f0..bf5ef94 100644 --- a/README.md +++ b/README.md @@ -6,109 +6,108 @@ ```go /* 使用方法: - 1. 准备全局的任务队列,用于存放任务 + 1. 创建工作池 2. 定义需要的任务func 3. 遍历任务数,放入全局队列 - 4. 创建且启动工作池 - */ + 4. 启动工作池 +*/ func TestTaskPool1(t *testing.T) { - - // 准备存放任务的地方,全局任务队列 - var allTask []*workerpool.Task - - // 需要处理的任务 - tt := func(data interface{}) error { - taskID := data.(int) - // 业务逻辑 - - time.Sleep(100 * time.Millisecond) - klog.Info("Task ", taskID, " processed") - return nil - } - - // 准备多个个任务 - for i := 1; i <= 1000; i++ { - - // 需要做的任务 - task := workerpool.NewTask(tt, i) - - // 所有的任务放入全局队列中 - allTask = append(allTask, task) - } - - // 建立一个工作池 - // input:待处理的任务对列;池数量 - pool := workerpool.NewPool(allTask, 5) - pool.Run() // 启动 + // 建立一个工作池 + // input:池数量 + pool := workerpool.NewPool(5) + + + // 需要处理的任务 + tt := func(data interface{}) error { + taskID := data.(int) + // 业务逻辑 + + time.Sleep(100 * time.Millisecond) + klog.Info("Task ", taskID, " processed") + return nil + } + + // 准备多个个任务 + for i := 1; i <= 1000; i++ { + + // 需要做的任务 + task := workerpool.NewTask(tt, i) + + // 所有的任务放入全局队列中 + pool.AddGlobalQueue(task) + } + pool.Run() // 启动 } + ``` ### 示例2 ```go /* 使用方法: - 1. 准备全局的任务队列,用于存放任务 + 1. 创建工作池 2. 定义需要的任务func 3. 遍历任务数,放入全局队列 - 4. 创建且启动工作池 + 4. 启动工作池 */ func TestTaskPool2(t *testing.T) { - // 存放任务的全局队列 - var allTask []*workerpool.Task - // 准备100个任务 - for i := 1; i <= 100; i++ { - - // 需要做的任务 - task := workerpool.NewTask(func(data interface{}) error { - taskID := data.(int) - - /* - 业务逻辑 - */ - time.Sleep(100 * time.Millisecond) - klog.Info("Task ", taskID, " processed") - return nil - }, i) - - // 所有的任务放入list中 - allTask = append(allTask, task) - } - - // 建立一个池, - // input:待处理的任务对列;池数量 + // 建立一个池, + // input:池数量 + + pool := workerpool.NewPool(5) + + // 准备100个任务 + for i := 1; i <= 100; i++ { + + // 需要做的任务 + task := workerpool.NewTask(func(data interface{}) error { + taskID := data.(int) - pool := workerpool.NewPool(allTask, 5) - // 启动在后台等待执行 - go pool.RunBackground() + /* + 业务逻辑 + */ + time.Sleep(100 * time.Millisecond) + klog.Info("Task ", taskID, " processed") + return nil + }, i) - for { - taskID := rand.Intn(100) + 20 - - // 模拟一个退出条件 - if taskID%7 == 0 { - klog.Info("taskID: ", taskID, "pool stop!") - pool.StopBackground() - break - } - - time.Sleep(time.Duration(rand.Intn(5)) * time.Second) - // 模拟后续加入pool - task := workerpool.NewTask(func(data interface{}) error { - taskID := data.(int) - time.Sleep(100 * time.Millisecond) - klog.Info("Task ", taskID, " processed") - return nil + // 所有的任务放入list中 + pool.AddGlobalQueue(task) + } + + + // 启动在后台等待执行 + go pool.RunBackground() + + + for { + taskID := rand.Intn(100) + 20 + + // 模拟一个退出条件 + if taskID%7 == 0 { + klog.Info("taskID: ", taskID, "pool stop!") + pool.StopBackground() + break + } + + time.Sleep(time.Duration(rand.Intn(5)) * time.Second) + // 模拟后续加入pool + task := workerpool.NewTask(func(data interface{}) error { + taskID := data.(int) + time.Sleep(100 * time.Millisecond) + klog.Info("Task ", taskID, " processed") + return nil }, taskID) - pool.AddTask(task) - } - - fmt.Println("finished...") + pool.AddTask(task) + } + + fmt.Println("finished...") } diff --git a/example/example1_test.go b/example/example1_test.go index 6f470ec..0565f7b 100644 --- a/example/example1_test.go +++ b/example/example1_test.go @@ -9,16 +9,18 @@ import ( /* 使用方法: - 1. 准备全局的任务队列,用于存放任务 + 1. 创建工作池 2. 定义需要的任务func 3. 遍历任务数,放入全局队列 - 4. 创建且启动工作池 + 4. 启动工作池 */ func TestTaskPool1(t *testing.T) { - // 准备存放任务的地方,全局任务队列 - var allTask []*workerpool.Task + // 建立一个工作池 + // input:池数量 + pool := workerpool.NewPool(5) + // 需要处理的任务 tt := func(data interface{}) error { @@ -37,12 +39,8 @@ func TestTaskPool1(t *testing.T) { task := workerpool.NewTask(tt, i) // 所有的任务放入全局队列中 - allTask = append(allTask, task) + pool.AddGlobalQueue(task) } - - // 建立一个工作池 - // input:待处理的任务对列;池数量 - pool := workerpool.NewPool(allTask, 5) pool.Run() // 启动 } diff --git a/example/example2_test.go b/example/example2_test.go index 532c976..f704a9a 100644 --- a/example/example2_test.go +++ b/example/example2_test.go @@ -11,16 +11,19 @@ import ( /* 使用方法: - 1. 准备全局的任务队列,用于存放任务 + 1. 创建工作池 2. 定义需要的任务func 3. 遍历任务数,放入全局队列 - 4. 创建且启动工作池 + 4. 启动工作池 */ func TestTaskPool2(t *testing.T) { - // 存放任务的全局队列 - var allTask []*workerpool.Task + // 建立一个池, + // input:池数量 + + pool := workerpool.NewPool(5) + // 准备100个任务 for i := 1; i <= 100; i++ { @@ -37,13 +40,9 @@ func TestTaskPool2(t *testing.T) { }, i) // 所有的任务放入list中 - allTask = append(allTask, task) + pool.AddGlobalQueue(task) } - // 建立一个池, - // input:待处理的任务对列;池数量 - - pool := workerpool.NewPool(allTask, 5) // 启动在后台等待执行 go pool.RunBackground() diff --git a/pkg/workerpool/pool.go b/pkg/workerpool/pool.go index ba0e510..ad55308 100644 --- a/pkg/workerpool/pool.go +++ b/pkg/workerpool/pool.go @@ -21,15 +21,21 @@ type Pool struct { } // NewPool 建立一个pool -func NewPool(tasks []*Task, concurrency int) *Pool { +func NewPool(concurrency int) *Pool { return &Pool{ - Tasks: tasks, + Tasks: make([]*Task, 0), concurrency: concurrency, collector: make(chan *Task, 10), runBackground: make(chan bool), } } +// AddGlobalQueue 加入工作池的全局队列,静态加入,用于启动工作池前的任务加入时使用, +// 在工作池启动后,推荐使用AddTask() 方法动态加入工作池 +func (p *Pool) AddGlobalQueue(task *Task) { + p.Tasks = append(p.Tasks, task) +} + func (p *Pool) Run() { // 总共会开启p.concurrency个goroutine (因为Start函数) for i := 1; i <= p.concurrency; i++ { @@ -37,6 +43,11 @@ func (p *Pool) Run() { worker.start(&p.wg) } + for len(p.Tasks) == 0 { + klog.Error("no task in global queue...") + time.Sleep(time.Millisecond) + } + // 把好的任务放入collector for i := range p.Tasks { p.collector <- p.Tasks[i] @@ -50,7 +61,7 @@ func (p *Pool) Run() { p.wg.Wait() } -// AddTask 把任务放入chan +// AddTask 把任务放入chan,当工作池启动后,动态加入使用 func (p *Pool) AddTask(task *Task) { // 放入chan p.collector <- task @@ -74,6 +85,11 @@ func (p *Pool) RunBackground() { go workers.startBackground() } + for len(p.Tasks) == 0 { + klog.Error("no task in global queue...") + time.Sleep(time.Millisecond) + } + for i := range p.Tasks { p.collector <- p.Tasks[i] }