Skip to content

Commit

Permalink
fix: 修改全局队列启动方式
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Jun 11, 2023
1 parent 2a82783 commit 0d73cf4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 101 deletions.
159 changes: 79 additions & 80 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,109 +6,108 @@
```go
/*
使用方法:
1. 准备全局的任务队列,用于存放任务
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 创建且启动工作池
*/
4. 启动工作池
*/

func TestTaskPool1(t *testing.T) {


// 准备存放任务的地方,全局任务队列
var allTask []*workerpool.Task

// 需要处理的任务
tt := func(data interface{}) error {
taskID := data.(int)
// 业务逻辑

time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}

// 准备多个个任务
for i := 1; i <= 1000; i++ {

// 需要做的任务
task := workerpool.NewTask(tt, i)

// 所有的任务放入全局队列中
allTask = append(allTask, task)
}

// 建立一个工作池
// input:待处理的任务对列;池数量
pool := workerpool.NewPool(allTask, 5)
pool.Run() // 启动
// 建立一个工作池
// input:池数量
pool := workerpool.NewPool(5)


// 需要处理的任务
tt := func(data interface{}) error {
taskID := data.(int)
// 业务逻辑

time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}

// 准备多个个任务
for i := 1; i <= 1000; i++ {

// 需要做的任务
task := workerpool.NewTask(tt, i)

// 所有的任务放入全局队列中
pool.AddGlobalQueue(task)
}
pool.Run() // 启动

}

```

### 示例2
```go
/*
使用方法:
1. 准备全局的任务队列,用于存放任务
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 创建且启动工作池
4. 启动工作池
*/

func TestTaskPool2(t *testing.T) {

// 存放任务的全局队列
var allTask []*workerpool.Task
// 准备100个任务
for i := 1; i <= 100; i++ {

// 需要做的任务
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)

/*
业务逻辑
*/
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}, i)

// 所有的任务放入list中
allTask = append(allTask, task)
}

// 建立一个池,
// input:待处理的任务对列;池数量
// 建立一个池,
// input:池数量

pool := workerpool.NewPool(5)

// 准备100个任务
for i := 1; i <= 100; i++ {

// 需要做的任务
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)

pool := workerpool.NewPool(allTask, 5)
// 启动在后台等待执行
go pool.RunBackground()
/*
业务逻辑
*/
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}, i)

for {
taskID := rand.Intn(100) + 20

// 模拟一个退出条件
if taskID%7 == 0 {
klog.Info("taskID: ", taskID, "pool stop!")
pool.StopBackground()
break
}

time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
// 模拟后续加入pool
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
// 所有的任务放入list中
pool.AddGlobalQueue(task)
}


// 启动在后台等待执行
go pool.RunBackground()


for {
taskID := rand.Intn(100) + 20

// 模拟一个退出条件
if taskID%7 == 0 {
klog.Info("taskID: ", taskID, "pool stop!")
pool.StopBackground()
break
}

time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
// 模拟后续加入pool
task := workerpool.NewTask(func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}, taskID)

pool.AddTask(task)
}
fmt.Println("finished...")
pool.AddTask(task)
}

fmt.Println("finished...")
}


Expand Down
16 changes: 7 additions & 9 deletions example/example1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (

/*
使用方法:
1. 准备全局的任务队列,用于存放任务
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 创建且启动工作池
4. 启动工作池
*/

func TestTaskPool1(t *testing.T) {

// 准备存放任务的地方,全局任务队列
var allTask []*workerpool.Task
// 建立一个工作池
// input:池数量
pool := workerpool.NewPool(5)


// 需要处理的任务
tt := func(data interface{}) error {
Expand All @@ -37,12 +39,8 @@ func TestTaskPool1(t *testing.T) {
task := workerpool.NewTask(tt, i)

// 所有的任务放入全局队列中
allTask = append(allTask, task)
pool.AddGlobalQueue(task)
}

// 建立一个工作池
// input:待处理的任务对列;池数量
pool := workerpool.NewPool(allTask, 5)
pool.Run() // 启动

}
17 changes: 8 additions & 9 deletions example/example2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@ import (

/*
使用方法:
1. 准备全局的任务队列,用于存放任务
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 创建且启动工作池
4. 启动工作池
*/

func TestTaskPool2(t *testing.T) {

// 存放任务的全局队列
var allTask []*workerpool.Task
// 建立一个池,
// input:池数量

pool := workerpool.NewPool(5)

// 准备100个任务
for i := 1; i <= 100; i++ {

Expand All @@ -37,13 +40,9 @@ func TestTaskPool2(t *testing.T) {
}, i)

// 所有的任务放入list中
allTask = append(allTask, task)
pool.AddGlobalQueue(task)
}

// 建立一个池,
// input:待处理的任务对列;池数量

pool := workerpool.NewPool(allTask, 5)

// 启动在后台等待执行
go pool.RunBackground()
Expand Down
22 changes: 19 additions & 3 deletions pkg/workerpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,33 @@ type Pool struct {
}

// NewPool 建立一个pool
func NewPool(tasks []*Task, concurrency int) *Pool {
func NewPool(concurrency int) *Pool {
return &Pool{
Tasks: tasks,
Tasks: make([]*Task, 0),
concurrency: concurrency,
collector: make(chan *Task, 10),
runBackground: make(chan bool),
}
}

// AddGlobalQueue 加入工作池的全局队列,静态加入,用于启动工作池前的任务加入时使用,
// 在工作池启动后,推荐使用AddTask() 方法动态加入工作池
func (p *Pool) AddGlobalQueue(task *Task) {
p.Tasks = append(p.Tasks, task)
}

func (p *Pool) Run() {
// 总共会开启p.concurrency个goroutine (因为Start函数)
for i := 1; i <= p.concurrency; i++ {
worker := newWorker(p.collector, i)
worker.start(&p.wg)
}

for len(p.Tasks) == 0 {
klog.Error("no task in global queue...")
time.Sleep(time.Millisecond)
}

// 把好的任务放入collector
for i := range p.Tasks {
p.collector <- p.Tasks[i]
Expand All @@ -50,7 +61,7 @@ func (p *Pool) Run() {
p.wg.Wait()
}

// AddTask 把任务放入chan
// AddTask 把任务放入chan,当工作池启动后,动态加入使用
func (p *Pool) AddTask(task *Task) {
// 放入chan
p.collector <- task
Expand All @@ -74,6 +85,11 @@ func (p *Pool) RunBackground() {
go workers.startBackground()
}

for len(p.Tasks) == 0 {
klog.Error("no task in global queue...")
time.Sleep(time.Millisecond)
}

for i := range p.Tasks {
p.collector <- p.Tasks[i]
}
Expand Down

0 comments on commit 0d73cf4

Please sign in to comment.