Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intelligent addition or reduction of workers #193

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 82 additions & 51 deletions runner.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package boomer

import (
"context"
"fmt"
"log"
"math/rand"
"os"
"runtime/debug"
"strings"
Expand All @@ -30,6 +30,7 @@ type runner struct {

tasks []*Task
totalTaskWeight int
runTask []*Task // goroutine execute tasks according to the list
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为啥要维护这个数组?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

维护这个数组里面存放的需要循环执行的任务(按照权重排序。例如:A-2 B-3 C-4,排序后ABCABCBCC),每个活跃的线程都按照这个顺序执行任务。个人理解可能比随机要好点。
能保证任务执行的权重,也能保证任务都能执行。


rateLimiter RateLimiter
rateLimitEnabled bool
Expand All @@ -41,9 +42,8 @@ type runner struct {
numClients int32
spawnRate float64

// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
// Cancellation method for all running workers(goroutines)
cancelFuncs []context.CancelFunc

// close this channel will stop all goroutines used in runner, including running workers.
shutdownChan chan bool
Expand Down Expand Up @@ -120,104 +120,134 @@ func (r *runner) outputOnStop() {
wg.Wait()
}

func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {
log.Println("Spawning", spawnCount, "clients immediately")

for i := 1; i <= spawnCount; i++ {
// addWorkers start the goroutines and add it to cancelFuncs
func (r *runner) addWorkers(gapCount int) {
for i := 0; i < gapCount; i++ {
select {
case <-quit:
// quit spawning goroutine
return
case <-r.shutdownChan:
return
default:
atomic.AddInt32(&r.numClients, 1)
go func() {
ctx, cancel := context.WithCancel(context.TODO())
r.cancelFuncs = append(r.cancelFuncs, cancel)
go func(ctx context.Context) {
index := 0
for {
select {
case <-quit:
case <-ctx.Done():
return
case <-r.shutdownChan:
return
default:
if r.rateLimitEnabled {
blocked := r.rateLimiter.Acquire()
if !blocked {
task := r.getTask()
task := r.getTask(index)
r.safeRun(task.Fn)
index++
if index == r.totalTaskWeight {
index = 0
}
}
} else {
task := r.getTask()
task := r.getTask(index)
r.safeRun(task.Fn)
index++
if index == r.totalTaskWeight {
index = 0
}
}
}
}
}()
}(ctx)
}
}
}

// reduceWorkers Stop the goroutines and remove it from the cancelFuncs
func (r *runner) reduceWorkers(gapCount int) {
if gapCount == 0 {
return
}
num := len(r.cancelFuncs) - gapCount
for _, cancelFunc := range r.cancelFuncs[num:] {
cancelFunc()
}

r.cancelFuncs = r.cancelFuncs[:num]

}

func (r *runner) spawnWorkers(spawnCount int, spawnCompleteFunc func()) {
log.Println("The total number of clients required is ", spawnCount)

var gapCount int
if spawnCount > int(r.numClients) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider the weight of tasks.

Copy link
Contributor Author

@MyNextWeekend MyNextWeekend Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请见谅,怕表达不清晰,采用中文
我是这样理解的
1、spawnWorkers应该专注于活跃线程的增减

func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {

2、getTask应该专注于给活跃线程提供任务(按权重提供)
func (r *runner) getTask() *Task {

此处没有理解关于任务权重的想法,还请明确一下

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在setTask的时候根据权重生成一个runTask []*Task
每个活跃的goroutine将一直getTask 从runTask中获取任务并执行任务
关于权重这块提交了代码,请查阅

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,你是对的,直接用 getTask 来管权重就好。

gapCount = spawnCount - int(r.numClients)
log.Printf("The current number of clients is %v, %v clients will be added\n", r.numClients, gapCount)
r.addWorkers(gapCount)
} else {
gapCount = int(r.numClients) - spawnCount
log.Printf("The current number of clients is %v, %v clients will be removed\n", r.numClients, gapCount)
r.reduceWorkers(gapCount)
}

r.numClients = int32(spawnCount)

if spawnCompleteFunc != nil {
spawnCompleteFunc()
go spawnCompleteFunc() //For faster time
}
}

// setTasks will set the runner's task list AND the total task weight
// which is used to get a random task later
// which is used to get a task later
func (r *runner) setTasks(t []*Task) {
r.tasks = t
if len(r.tasks) == 1 {
r.totalTaskWeight = 1
r.runTask = t
return
}

weightSum := 0
for _, task := range r.tasks {
if task.Weight <= 0 { //Ensure that user input values are legal
task.Weight = 1
}
weightSum += task.Weight
}
r.totalTaskWeight = weightSum
}

func (r *runner) getTask() *Task {
tasksCount := len(r.tasks)
if tasksCount == 1 {
// Fast path
return r.tasks[0]
}

rs := rand.New(rand.NewSource(time.Now().UnixNano()))

totalWeight := r.totalTaskWeight
if totalWeight <= 0 {
// If all the tasks have not weights defined, they have the same chance to run
randNum := rs.Intn(tasksCount)
return r.tasks[randNum]
}

randNum := rs.Intn(totalWeight)
runningSum := 0
for _, task := range r.tasks {
runningSum += task.Weight
if runningSum > randNum {
return task
r.runTask = make([]*Task, r.totalTaskWeight)
index := 0
for weightSum > 0 { //Assign task order according to weight
for _, task := range r.tasks {
if task.Weight > 0 {
r.runTask[index] = task
index++
task.Weight--
weightSum--
}
}
}
}

return nil
func (r *runner) getTask(index int) *Task {
return r.runTask[index]
}

func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
Events.Publish(EVENT_SPAWN, spawnCount, spawnRate)

r.stopChan = make(chan bool)
r.numClients = 0

go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc)
r.spawnWorkers(spawnCount, spawnCompleteFunc)
}

func (r *runner) stop() {
// publish the boomer stop event
// user's code can subscribe to this event and do thins like cleaning up
Events.Publish(EVENT_STOP)

// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
r.reduceWorkers(int(r.numClients)) //Stop all goroutines
r.numClients = 0
}

type localRunner struct {
Expand Down Expand Up @@ -344,6 +374,8 @@ func (r *slaveRunner) shutdown() {
if r.rateLimitEnabled {
r.rateLimiter.Stop()
}
r.cancelFuncs = nil
r.numClients = 0
close(r.shutdownChan)
}

Expand Down Expand Up @@ -460,7 +492,6 @@ func (r *slaveRunner) onMessage(msgInterface message) {
switch msgType {
case "spawn":
r.state = stateSpawning
r.stop()
r.onSpawnMessage(genericMsg)
case "stop":
r.stop()
Expand Down
47 changes: 42 additions & 5 deletions runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,46 @@ var _ = Describe("Test runner", func() {
Expect(hitOutput2.onStop).To(BeTrue())
})

It("test add workers", func() {
taskA := &Task{
Weight: 10,
Fn: func() {
time.Sleep(time.Second)
},
Name: "TaskA",
}

runner := newSlaveRunner("localhost", 5557, []*Task{taskA}, nil)
runner.client = newClient("localhost", 5557, runner.nodeID)
defer runner.shutdown()

runner.addWorkers(10)

currentClients := len(runner.cancelFuncs)
Expect(currentClients).To(BeEquivalentTo(10))
})

It("test reduce workers", func() {
taskA := &Task{
Weight: 10,
Fn: func() {
time.Sleep(time.Second)
},
Name: "TaskA",
}

runner := newSlaveRunner("localhost", 5557, []*Task{taskA}, nil)
runner.client = newClient("localhost", 5557, runner.nodeID)
defer runner.shutdown()

runner.addWorkers(10)
runner.reduceWorkers(5)
runner.reduceWorkers(2)

currentClients := len(runner.cancelFuncs)
Expect(currentClients).To(BeEquivalentTo(3))
})

It("test localrunner", func() {
taskA := &Task{
Weight: 10,
Expand Down Expand Up @@ -120,8 +160,7 @@ var _ = Describe("Test runner", func() {
runner.client = newClient("localhost", 5557, runner.nodeID)
defer runner.shutdown()

go runner.spawnWorkers(10, runner.stopChan, runner.spawnComplete)
time.Sleep(10 * time.Millisecond)
runner.spawnWorkers(10, runner.spawnComplete)

currentClients := atomic.LoadInt32(&runner.numClients)
Expect(currentClients).To(BeEquivalentTo(10))
Expand Down Expand Up @@ -162,7 +201,7 @@ var _ = Describe("Test runner", func() {

const numToSpawn int = 30

runner.spawnWorkers(numToSpawn, runner.stopChan, runner.spawnComplete)
runner.spawnWorkers(numToSpawn, runner.spawnComplete)
time.Sleep(3 * time.Second)

currentClients := atomic.LoadInt32(&runner.numClients)
Expand Down Expand Up @@ -238,7 +277,6 @@ var _ = Describe("Test runner", func() {
}

runner := newSlaveRunner("localhost", 5557, []*Task{taskA}, nil)
runner.stopChan = make(chan bool)

stopped := false
handler := func() {
Expand Down Expand Up @@ -301,7 +339,6 @@ var _ = Describe("Test runner", func() {
Eventually(quitMessages).Should(Receive())

runner.state = stateRunning
runner.stopChan = make(chan bool)
runner.onMessage(newGenericMessage("quit", nil, runner.nodeID))
Eventually(quitMessages).Should(Receive())
Expect(runner.state).Should(BeIdenticalTo(stateInit))
Expand Down
Loading