Skip to content

Commit

Permalink
feat: 修改Task为接口对象方便扩展,并增加example范例
Browse files Browse the repository at this point in the history
  • Loading branch information
googs1025 committed Aug 2, 2023
1 parent 60b6490 commit 55f3845
Show file tree
Hide file tree
Showing 17 changed files with 461 additions and 49 deletions.
18 changes: 8 additions & 10 deletions example/example1_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package example

import (
"fmt"
"github.com/myconcurrencytools/workpoolframework/pkg/workerpool"
"k8s.io/klog/v2"
"testing"
"time"
)

/*
使用方法:
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 启动工作池
使用方法:
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 启动工作池
*/

func TestTaskPool1(t *testing.T) {
Expand All @@ -35,14 +36,11 @@ func TestTaskPool1(t *testing.T) {
for i := 1; i <= 1000; i++ {

// 需要做的任务
task := workerpool.NewTask(tt, i)
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, tt)

// 所有的任务放入全局队列中
pool.AddGlobalQueue(task)
}
pool.Run() // 启动




}
}
22 changes: 11 additions & 11 deletions example/example2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
)

/*
使用方法:
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 启动工作池
使用方法:
1. 创建工作池
2. 定义需要的任务func
3. 遍历任务数,放入全局队列
4. 启动工作池
*/

func TestTaskPool2(t *testing.T) {
Expand All @@ -28,16 +28,16 @@ func TestTaskPool2(t *testing.T) {
for i := 1; i <= 100; i++ {

// 需要做的任务
task := workerpool.NewTask(func(data interface{}) error {
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", i), i, func(data interface{}) error {
taskID := data.(int)

/*
业务逻辑
业务逻辑
*/
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}, i)
})

// 所有的任务放入list中
pool.AddGlobalQueue(task)
Expand All @@ -58,15 +58,15 @@ func TestTaskPool2(t *testing.T) {

time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
// 模拟后续加入pool
task := workerpool.NewTask(func(data interface{}) error {
task := workerpool.NewTaskInstance(fmt.Sprintf("task-%v", taskID), taskID, func(data interface{}) error {
taskID := data.(int)
time.Sleep(100 * time.Millisecond)
klog.Info("Task ", taskID, " processed")
return nil
}, taskID)
})

pool.AddTask(task)
}

fmt.Println("finished...")
}
}
31 changes: 31 additions & 0 deletions example/http_example/cmd/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cmd

import (
"fmt"
"github.com/spf13/cobra"
"os"
)

var runCmd = &cobra.Command{
Use: "run",
Short: "test-server",
Long: "",
}

var (
debug bool
serverPort string
)

func init() {
runCmd.PersistentFlags().BoolVarP(&debug, "debug", "d", false, "debug mode")
runCmd.PersistentFlags().StringVarP(&serverPort, "port", "p", "8080", "server port")
runCmd.AddCommand(httpServerCmd())
}

func Execute() {
if err := runCmd.Execute(); err != nil {
fmt.Printf("cmd err: %s\n", err)
os.Exit(1)
}
}
24 changes: 24 additions & 0 deletions example/http_example/cmd/httpServer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cmd

import (
"github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/common"
"github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/server"
"github.com/spf13/cobra"
)

func httpServerCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "httpServer",
Short: "run http server",
Long: "",
Run: func(cmd *cobra.Command, args []string) {
cfg := &common.ServerConfig{
Debug: debug,
Port: serverPort,
}
// 启动http server
server.HttpServer(cfg)
},
}
return cmd
}
7 changes: 7 additions & 0 deletions example/http_example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "github.com/myconcurrencytools/workpoolframework/example/http_example/cmd"

func main() {
cmd.Execute()
}
6 changes: 6 additions & 0 deletions example/http_example/pkg/common/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package common

type ServerConfig struct {
Debug bool
Port string
}
32 changes: 32 additions & 0 deletions example/http_example/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package scheduler

import (
"github.com/myconcurrencytools/workpoolframework/pkg/workerpool"
"log"
)

type Scheduler struct {
pool *workerpool.Pool
}

func NewScheduler(workerNum int) *Scheduler {
s := &Scheduler{
pool: workerpool.NewPool(workerNum),
}
return s
}

func (s *Scheduler) Start() {
go func() {
log.Printf("start scheduler...")
s.pool.RunBackground()
}()
}

func (s *Scheduler) Stop() {
s.pool.StopBackground()
}

func (s *Scheduler) AddTask(task workerpool.Task) {
s.pool.AddTask(task)
}
9 changes: 9 additions & 0 deletions example/http_example/pkg/server/model/mock_map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package model

type MockMap map[string]*MyTask

var TaskMap MockMap

func init() {
TaskMap = MockMap{}
}
49 changes: 49 additions & 0 deletions example/http_example/pkg/server/model/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package model

import "fmt"

const (
TaskRunning = "running"
TaskFail = "fail"
TaskSuccess = "success"
)

type MyTask struct {
TaskName string `json:"taskName"`
TaskType string `json:"taskType"`
Input interface{} `json:"input"`
f func(data interface{}) error
Err error
Status string
}

func (my *MyTask) ChooseTaskType() {
if my.TaskType == "string" {
my.f = func(data interface{}) error {
fmt.Println("string task...", data)
return nil
}
} else {
my.f = func(data interface{}) error {
fmt.Println("int task...", data)
return nil
}
}
}

func (my *MyTask) Execute() error {

my.Status = TaskRunning

if err := my.f(my.Input); err != nil {
my.Err = err
my.Status = TaskFail
return err
}
my.Status = TaskSuccess
return nil
}

func (my *MyTask) GetTaskName() string {
return my.TaskName
}
64 changes: 64 additions & 0 deletions example/http_example/pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package server

import (
"fmt"
"github.com/gin-gonic/gin"
"github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/common"
"github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/scheduler"
"github.com/myconcurrencytools/workpoolframework/example/http_example/pkg/server/model"
)

func HttpServer(c *common.ServerConfig) {

if !c.Debug {
gin.SetMode(gin.ReleaseMode)
}

r := gin.New()

// 启动调度器
s := scheduler.NewScheduler(6)
s.Start()

r.GET("/test", func(c *gin.Context) {
c.String(200, "测试用")
})

// 执行任务接口
/*
url: http://127.0.0.1:8080/start
body入参:
{
"taskName": "tttt",
"taskType": "int",
"input": "aaaa"
}
*/
r.POST("/start", func(c *gin.Context) {
var myTask model.MyTask
if err := c.ShouldBindJSON(&myTask); err != nil {

fmt.Errorf("do operation action error: %s", err)
c.JSON(400, gin.H{"message": "start task error"})
return
}
myTask.ChooseTaskType()
s.AddTask(&myTask)

model.TaskMap[myTask.TaskName] = &myTask
c.JSON(200, gin.H{"message": "start task success"})
})

// 查询任务状态接口
/*
http://127.0.0.1:8080/task?taskName=tttt
*/
r.GET("/task", func(c *gin.Context) {
taskName := c.Query("taskName")
my := model.TaskMap[taskName]
c.JSON(200, gin.H{"message": my.Status})
})

err := r.Run(fmt.Sprintf(":%v", c.Port))
fmt.Println(err)
}
32 changes: 32 additions & 0 deletions example/scheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package scheduler

import (
"github.com/myconcurrencytools/workpoolframework/pkg/workerpool"
"log"
)

type Scheduler struct {
pool *workerpool.Pool
}

func NewScheduler(workerNum int) *Scheduler {
s := &Scheduler{
pool: workerpool.NewPool(workerNum),
}
return s
}

func (s *Scheduler) Start() {
go func() {
log.Printf("start scheduler...")
s.pool.RunBackground()
}()
}

func (s *Scheduler) Stop() {
s.pool.StopBackground()
}

func (s *Scheduler) AddTask(task workerpool.Task) {
s.pool.AddTask(task)
}
24 changes: 24 additions & 0 deletions example/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package scheduler

import (
"fmt"
"github.com/myconcurrencytools/workpoolframework/pkg/workerpool"
"testing"
"time"
)

func TestScheduler(t *testing.T) {
s := NewScheduler(5)

s.Start()

tsk := workerpool.NewTaskInstance("task1", "aaa", func(i interface{}) error {
fmt.Println(i)
return nil
})

s.AddTask(tsk)

<-time.After(time.Second * 60)
s.Stop()
}
Loading

0 comments on commit 55f3845

Please sign in to comment.