diff --git a/example/example1_test.go b/example/example1_test.go index aad7f5e..e1e2fbc 100644 --- a/example/example1_test.go +++ b/example/example1_test.go @@ -1,6 +1,7 @@ package example import ( + "fmt" "github.com/myconcurrencytools/workpoolframework/pkg/workerpool" "k8s.io/klog/v2" "testing" @@ -8,11 +9,11 @@ import ( ) /* - 使用方法: - 1. 创建工作池 - 2. 定义需要的任务func - 3. 遍历任务数,放入全局队列 - 4. 启动工作池 + 使用方法: + 1. 创建工作池 + 2. 定义需要的任务func + 3. 遍历任务数,放入全局队列 + 4. 启动工作池 */ func TestTaskPool1(t *testing.T) { @@ -35,14 +36,11 @@ func TestTaskPool1(t *testing.T) { for i := 1; i <= 1000; i++ { // 需要做的任务 - task := workerpool.NewTask(tt, i) + task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, tt) // 所有的任务放入全局队列中 pool.AddGlobalQueue(task) } pool.Run() // 启动 - - - -} +} \ No newline at end of file diff --git a/example/example2_test.go b/example/example2_test.go index 195f6e1..920559d 100644 --- a/example/example2_test.go +++ b/example/example2_test.go @@ -10,11 +10,11 @@ import ( ) /* - 使用方法: - 1. 创建工作池 - 2. 定义需要的任务func - 3. 遍历任务数,放入全局队列 - 4. 启动工作池 + 使用方法: + 1. 创建工作池 + 2. 定义需要的任务func + 3. 遍历任务数,放入全局队列 + 4. 启动工作池 */ func TestTaskPool2(t *testing.T) { @@ -28,16 +28,16 @@ func TestTaskPool2(t *testing.T) { for i := 1; i <= 100; i++ { // 需要做的任务 - task := workerpool.NewTask(func(data interface{}) error { + task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, func(data interface{}) error { taskID := data.(int) /* - 业务逻辑 + 业务逻辑 */ time.Sleep(100 * time.Millisecond) klog.Info("Task ", taskID, " processed") return nil - }, i) + }) // 所有的任务放入list中 pool.AddGlobalQueue(task) @@ -58,15 +58,15 @@ func TestTaskPool2(t *testing.T) { time.Sleep(time.Duration(rand.Intn(5)) * time.Second) // 模拟后续加入pool - task := workerpool.NewTask(func(data interface{}) error { + task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", taskID), taskID, func(data interface{}) error { taskID := data.(int) time.Sleep(100 * time.Millisecond) klog.Info("Task ", taskID, " processed") return nil - }, taskID) + }) pool.AddTask(task) } fmt.Println("finished...") -} +} \ No newline at end of file diff --git a/example/http_example/cmd/cmd.go b/example/http_example/cmd/cmd.go new file mode 100644 index 0000000..c20cecc --- /dev/null +++ b/example/http_example/cmd/cmd.go @@ -0,0 +1,31 @@ +package cmd + +import ( + "fmt" + "github.com/spf13/cobra" + "os" +) + +var runCmd = &cobra.Command{ + Use: "run", + Short: "test-server", + Long: "", +} + +var ( + debug bool + serverPort string +) + +func init() { + runCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "debug mode") + runCmd.PersistentFlags().StringVarP(&serverPort, "port", "p", "8080", "server port") + runCmd.AddCommand(httpServerCmd()) +} + +func Execute() { + if err := runCmd.Execute(); err != nil { + fmt.Printf("cmd err: %s\n", err) + os.Exit(1) + } +} \ No newline at end of file diff --git a/example/http_example/cmd/httpServer.go b/example/http_example/cmd/httpServer.go new file mode 100644 index 0000000..9668f59 --- /dev/null +++ b/example/http_example/cmd/httpServer.go @@ -0,0 +1,24 @@ +package cmd + +import ( + "github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/common" + "github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/server" + "github.com/spf13/cobra" +) + +func httpServerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "httpServer", + Short: "run http server", + Long: "", + Run: func(cmd *cobra.Command, args []string) { + cfg := &common.ServerConfig{ + Debug: debug, + Port: serverPort, + } + // 启动http server + server.HttpServer(cfg) + }, + } + return cmd +} diff --git a/example/http_example/main.go b/example/http_example/main.go new file mode 100644 index 0000000..0ac8e90 --- /dev/null +++ b/example/http_example/main.go @@ -0,0 +1,7 @@ +package main + +import "github.com/myconcurrencytools/workpoolframework/example/http_example/cmd" + +func main() { + cmd.Execute() +} diff --git a/example/http_example/pkg/common/config.go b/example/http_example/pkg/common/config.go new file mode 100644 index 0000000..66a0c19 --- /dev/null +++ b/example/http_example/pkg/common/config.go @@ -0,0 +1,6 @@ +package common + +type ServerConfig struct { + Debug bool + Port string +} diff --git a/example/http_example/pkg/scheduler/scheduler.go b/example/http_example/pkg/scheduler/scheduler.go new file mode 100644 index 0000000..5620ecd --- /dev/null +++ b/example/http_example/pkg/scheduler/scheduler.go @@ -0,0 +1,32 @@ +package scheduler + +import ( + "github.com/myconcurrencytools/workpoolframework/pkg/workerpool" + "log" +) + +type Scheduler struct { + pool *workerpool.Pool +} + +func NewScheduler(workerNum int) *Scheduler { + s := &Scheduler{ + pool: workerpool.NewPool(workerNum), + } + return s +} + +func (s *Scheduler) Start() { + go func() { + log.Printf("start scheduler...") + s.pool.RunBackground() + }() +} + +func (s *Scheduler) Stop() { + s.pool.StopBackground() +} + +func (s *Scheduler) AddTask(task workerpool.Task) { + s.pool.AddTask(task) +} diff --git a/example/http_example/pkg/server/model/mock_map.go b/example/http_example/pkg/server/model/mock_map.go new file mode 100644 index 0000000..3dc0041 --- /dev/null +++ b/example/http_example/pkg/server/model/mock_map.go @@ -0,0 +1,9 @@ +package model + +type MockMap map[string]*MyTask + +var TaskMap MockMap + +func init() { + TaskMap = MockMap{} +} diff --git a/example/http_example/pkg/server/model/task.go b/example/http_example/pkg/server/model/task.go new file mode 100644 index 0000000..d3b4d9c --- /dev/null +++ b/example/http_example/pkg/server/model/task.go @@ -0,0 +1,49 @@ +package model + +import "fmt" + +const ( + TaskRunning = "running" + TaskFail = "fail" + TaskSuccess = "success" +) + +type MyTask struct { + TaskName string `json:"taskName"` + TaskType string `json:"taskType"` + Input interface{} `json:"input"` + f func(data interface{}) error + Err error + Status string +} + +func (my *MyTask) ChooseTaskType() { + if my.TaskType == "string" { + my.f = func(data interface{}) error { + fmt.Println("string task...", data) + return nil + } + } else { + my.f = func(data interface{}) error { + fmt.Println("int task...", data) + return nil + } + } +} + +func (my *MyTask) Execute() error { + + my.Status = TaskRunning + + if err := my.f(my.Input); err != nil { + my.Err = err + my.Status = TaskFail + return err + } + my.Status = TaskSuccess + return nil +} + +func (my *MyTask) GetTaskName() string { + return my.TaskName +} diff --git a/example/http_example/pkg/server/server.go b/example/http_example/pkg/server/server.go new file mode 100644 index 0000000..cf74aea --- /dev/null +++ b/example/http_example/pkg/server/server.go @@ -0,0 +1,64 @@ +package server + +import ( + "fmt" + "github.com/gin-gonic/gin" + "github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/common" + "github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/scheduler" + "github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/server/model" +) + +func HttpServer(c *common.ServerConfig) { + + if !c.Debug { + gin.SetMode(gin.ReleaseMode) + } + + r := gin.New() + + // 启动调度器 + s := scheduler.NewScheduler(6) + s.Start() + + r.GET("/test", func(c *gin.Context) { + c.String(200, "测试用") + }) + + // 执行任务接口 + /* + url: http://127.0.0.1:8080/start + body入参: + { + "taskName": "tttt", + "taskType": "int", + "input": "aaaa" + } + */ + r.POST("/start", func(c *gin.Context) { + var myTask model.MyTask + if err := c.ShouldBindJSON(&myTask); err != nil { + + fmt.Errorf("do operation action error: %s", err) + c.JSON(400, gin.H{"message": "start task error"}) + return + } + myTask.ChooseTaskType() + s.AddTask(&myTask) + + model.TaskMap[myTask.TaskName] = &myTask + c.JSON(200, gin.H{"message": "start task success"}) + }) + + // 查询任务状态接口 + /* + http://127.0.0.1:8080/task?taskName=tttt + */ + r.GET("/task", func(c *gin.Context) { + taskName := c.Query("taskName") + my := model.TaskMap[taskName] + c.JSON(200, gin.H{"message": my.Status}) + }) + + err := r.Run(fmt.Sprintf(":%v", c.Port)) + fmt.Println(err) +} diff --git a/example/scheduler/scheduler.go b/example/scheduler/scheduler.go new file mode 100644 index 0000000..5620ecd --- /dev/null +++ b/example/scheduler/scheduler.go @@ -0,0 +1,32 @@ +package scheduler + +import ( + "github.com/myconcurrencytools/workpoolframework/pkg/workerpool" + "log" +) + +type Scheduler struct { + pool *workerpool.Pool +} + +func NewScheduler(workerNum int) *Scheduler { + s := &Scheduler{ + pool: workerpool.NewPool(workerNum), + } + return s +} + +func (s *Scheduler) Start() { + go func() { + log.Printf("start scheduler...") + s.pool.RunBackground() + }() +} + +func (s *Scheduler) Stop() { + s.pool.StopBackground() +} + +func (s *Scheduler) AddTask(task workerpool.Task) { + s.pool.AddTask(task) +} diff --git a/example/scheduler/scheduler_test.go b/example/scheduler/scheduler_test.go new file mode 100644 index 0000000..1fcc72e --- /dev/null +++ b/example/scheduler/scheduler_test.go @@ -0,0 +1,24 @@ +package scheduler + +import ( + "fmt" + "github.com/myconcurrencytools/workpoolframework/pkg/workerpool" + "testing" + "time" +) + +func TestScheduler(t *testing.T) { + s := NewScheduler(5) + + s.Start() + + tsk := workerpool.NewTaskInstance("task1", "aaa", func(i interface{}) error { + fmt.Println(i) + return nil + }) + + s.AddTask(tsk) + + <-time.After(time.Second * 60) + s.Stop() +} diff --git a/go.mod b/go.mod index c5e56a6..90cff34 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,38 @@ module github.com/myconcurrencytools/workpoolframework go 1.18 -require k8s.io/klog/v2 v2.90.1 +require ( + github.com/gin-gonic/gin v1.9.1 + github.com/spf13/cobra v1.7.0 + k8s.io/klog/v2 v2.90.1 +) -require github.com/go-logr/logr v1.2.0 // indirect +require ( + github.com/bytedance/sonic v1.9.1 // indirect + github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect + github.com/gabriel-vasile/mimetype v1.4.2 // indirect + github.com/gin-contrib/sse v0.1.0 // indirect + github.com/go-logr/logr v1.2.0 // indirect + github.com/go-playground/locales v0.14.1 // indirect + github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/go-playground/validator/v10 v10.14.0 // indirect + github.com/goccy/go-json v0.10.2 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.4 // indirect + github.com/leodido/go-urn v1.2.4 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.0.8 // indirect + github.com/spf13/pflag v1.0.5 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + github.com/ugorji/go/codec v1.2.11 // indirect + golang.org/x/arch v0.3.0 // indirect + golang.org/x/crypto v0.9.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum index c4fea7a..e9f0f0f 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,97 @@ +github.com/bytedance/sonic v1.5.0/go.mod h1:ED5hyg4y6t3/9Ku1R6dU/4KyJ48DZ4jPhfY1O2AihPM= +github.com/bytedance/sonic v1.9.1 h1:6iJ6NqdoxCDr6mbY8h18oSO+cShGSMRGCEo7F2h0x8s= +github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZXU064P/U= +github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= +github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= +github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gabriel-vasile/mimetype v1.4.2 h1:w5qFW6JKBz9Y393Y4q372O9A7cUSequkh1Q7OhCmWKU= +github.com/gabriel-vasile/mimetype v1.4.2/go.mod h1:zApsH/mKG4w07erKIaJPFiX0Tsq9BFQgN3qGY5GnNgA= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= +github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg= +github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU= github.com/go-logr/logr v1.2.0 h1:QK40JKJyMdUDz+h+xvCsru/bJhvG0UxvePV0ufL/AcE= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= +github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= +github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= +github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q= +github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= +github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= +github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= +github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU= +github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg= +golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k= +golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= +golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= +golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= +google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw= k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= diff --git a/pkg/workerpool/pool.go b/pkg/workerpool/pool.go index eecdf38..c443f20 100644 --- a/pkg/workerpool/pool.go +++ b/pkg/workerpool/pool.go @@ -9,12 +9,12 @@ import ( // Pool 工作池 type Pool struct { // list 装task - Tasks []*Task + Tasks []Task Workers []*worker // 工作池数量 concurrency int // collector 用来输入所有Task对象的chan - collector chan *Task + collector chan Task // runBackground 后台运行时,结束时需要传入的标示 runBackground chan bool wg sync.WaitGroup @@ -23,17 +23,17 @@ type Pool struct { // NewPool 建立一个pool func NewPool(concurrency int) *Pool { return &Pool{ - Tasks: make([]*Task, 0), - //Workers: make([]*worker, 0), + Tasks: make([]Task, 0), + Workers: make([]*worker, 0), concurrency: concurrency, - collector: make(chan *Task, 10), + collector: make(chan Task, 10), runBackground: make(chan bool), } } // AddGlobalQueue 加入工作池的全局队列,静态加入,用于启动工作池前的任务加入时使用, // 在工作池启动后,推荐使用AddTask() 方法动态加入工作池 -func (p *Pool) AddGlobalQueue(task *Task) { +func (p *Pool) AddGlobalQueue(task Task) { p.Tasks = append(p.Tasks, task) } @@ -66,7 +66,7 @@ func (p *Pool) Run() { } // AddTask 把任务放入chan,当工作池启动后,动态加入使用 -func (p *Pool) AddTask(task *Task) { +func (p *Pool) AddTask(task Task) { // 放入chan p.collector <- task } @@ -89,7 +89,7 @@ func (p *Pool) RunBackground() { go workers.startBackground() } - for len(p.Tasks) == 0 { + if len(p.Tasks) == 0 { klog.Error("no task in global queue...") time.Sleep(time.Millisecond) } @@ -110,4 +110,4 @@ func (p *Pool) StopBackground() { p.Workers[i].stop() } p.runBackground <- true -} +} \ No newline at end of file diff --git a/pkg/workerpool/task.go b/pkg/workerpool/task.go index 82a3ed9..453f145 100644 --- a/pkg/workerpool/task.go +++ b/pkg/workerpool/task.go @@ -1,30 +1,39 @@ package workerpool -import ( - "k8s.io/klog/v2" -) - /* 本质:用全局的切片分配任务给多个workers并发处理。 */ -// Task 一个具体任务需求 -type Task struct { +// Task 任务接口,由工作池抽象出的具体执行单元, +// 当pool启动时,会从chan中不断读取Task接口对象执行 +type Task interface { + Execute() error + GetTaskName() string +} + +// TaskInstance 一个具体任务需求 +type TaskInstance struct { + Name string Err error // 返回错误 Data interface{} // 真正的处理数据 f func(interface{}) error // 处理函数 } -// NewTask 建立任务 -func NewTask(f func(interface{}) error, data interface{}) *Task { - return &Task{ +// NewTaskInstance 建立任务 +func NewTaskInstance(name string, data interface{}, f func(interface{}) error) *TaskInstance { + return &TaskInstance{ + Name: name, Data: data, f: f, } } -// process 执行任务的函数。 -func (t *Task) process(workerID int) { - klog.Info("worker: ", workerID, ", processes task: ", t.Data) +func (t *TaskInstance) Execute() error { t.Err = t.f(t.Data) // 执行任务。如果任务执行错误,赋值err + return nil } + +func (t *TaskInstance) GetTaskName() string { + return t.Name +} + diff --git a/pkg/workerpool/worker.go b/pkg/workerpool/worker.go index f382b7e..7178a4e 100644 --- a/pkg/workerpool/worker.go +++ b/pkg/workerpool/worker.go @@ -9,13 +9,13 @@ import ( type worker struct { ID int // 消费者的id // 等待处理的任务chan (每个worker都有一个自己的chan) - taskChan chan *Task + taskChan chan Task // 停止通知 quit chan bool } // newWorker 建立新的消费者 -func newWorker(channel chan *Task, ID int) *worker { +func newWorker(channel chan Task, ID int) *worker { return &worker{ ID: ID, taskChan: channel, @@ -32,7 +32,8 @@ func (wr *worker) start(wg *sync.WaitGroup) { defer wg.Done() // 不断从chan中取出task执行 for task := range wr.taskChan { - task.process(wr.ID) + klog.Info("worker: ", wr.ID, ", processes task: ", task.GetTaskName()) + task.Execute() } }() } @@ -43,7 +44,8 @@ func (wr *worker) startBackground() { for { select { case task := <-wr.taskChan: - task.process(wr.ID) + klog.Info("worker: ", wr.ID, ", processes task: ", task.GetTaskName()) + task.Execute() case <-wr.quit: return } @@ -56,4 +58,4 @@ func (wr *worker) stop() { go func() { wr.quit <- true }() -} +} \ No newline at end of file