forked from shettyh/threadpool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadpool.go
94 lines (79 loc) · 2.42 KB
/
threadpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package threadpool
import "fmt"
var (
ErrQueueFull = fmt.Errorf("queue is full, not able add the task")
)
//ThreadPool type for holding the workers and handle the job requests
type ThreadPool struct {
queueSize int64
noOfWorkers int
jobQueue chan interface{}
workerPool chan chan interface{}
closeHandle chan bool // Channel used to stop all the workers
}
// NewThreadPool creates thread threadpool
func NewThreadPool(noOfWorkers int, queueSize int64) *ThreadPool {
threadPool := &ThreadPool{queueSize: queueSize, noOfWorkers: noOfWorkers}
threadPool.jobQueue = make(chan interface{}, queueSize)
threadPool.workerPool = make(chan chan interface{}, noOfWorkers)
threadPool.closeHandle = make(chan bool)
threadPool.createPool()
return threadPool
}
func (t *ThreadPool) submitTask( task interface{}) error {
// Add the task to the job queue
if len(t.jobQueue) == int(t.queueSize) {
return ErrQueueFull
}
t.jobQueue <- task
return nil
}
// Execute submits the job to available worker
func (t *ThreadPool) Execute(task Runnable) error {
return t.submitTask(task)
}
// ExecuteFuture will submit the task to the threadpool and return the response handle
func (t *ThreadPool) ExecuteFuture(task Callable) (*Future, error) {
// Create future and task
handle := &Future{response: make(chan interface{})}
futureTask := callableTask{Task: task, Handle: handle}
err := t.submitTask(futureTask)
if err != nil {
return nil, err
}
return futureTask.Handle, nil
}
// Close will close the threadpool
// It sends the stop signal to all the worker that are running
//TODO: need to check the existing /running task before closing the threadpool
func (t *ThreadPool) Close() {
close(t.closeHandle) // Stops all the routines
close(t.workerPool) // Closes the Job threadpool
close(t.jobQueue) // Closes the job Queue
}
// createPool creates the workers and start listening on the jobQueue
func (t *ThreadPool) createPool() {
for i := 0; i < t.noOfWorkers; i++ {
worker := NewWorker(t.workerPool, t.closeHandle)
worker.Start()
}
go t.dispatch()
}
// dispatch listens to the jobqueue and handles the jobs to the workers
func (t *ThreadPool) dispatch() {
for {
select {
case job := <-t.jobQueue:
// Got job
func(job interface{}) {
//Find a worker for the job
jobChannel := <-t.workerPool
//Submit job to the worker
jobChannel <- job
}(job)
case <-t.closeHandle:
// Close thread threadpool
return
}
}
}