-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.go
108 lines (101 loc) · 3.01 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package goneric
import (
"sync"
)
// WorkerPool spawns `concurrency` goroutines eating from input channel and sending it to output channel
// caller should take care of closing input channel after it finished sending requests
// output channel will be closed after input is processed and closed
// optionally setting last option to true will make it close output channel
func WorkerPool[T1, T2 any](input chan T1, output chan T2, worker func(T1) T2, concurrency int, closeOutputChan ...bool) {
if concurrency < 1 {
panic("RTFM")
}
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for w := range input {
output <- worker(w)
}
}()
}
wg.Wait()
if len(closeOutputChan) > 0 && closeOutputChan[0] {
close(output)
}
}
// WorkerPoolBackground spawns `concurrency` goroutines eating from input channel and returns output channel with results
// optionally setting last option to true will make it close output channel
func WorkerPoolBackground[T1, T2 any](input chan T1, worker func(T1) T2, concurrency int, closeOutputChan ...bool) (output chan T2) {
if concurrency < 1 {
panic("RTFM")
}
output = make(chan T2, concurrency/2+1)
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for w := range input {
output <- worker(w)
}
}()
}
if len(closeOutputChan) > 0 && closeOutputChan[0] {
go func() {
wg.Wait()
close(output)
}()
}
return output
}
// WorkerPoolFinisher runs WorkerPool in the background and
// returns channel that returns `true` then closes when workers finish
func WorkerPoolFinisher[T1, T2 any](input chan T1, output chan T2, worker func(T1) T2, concurrency int) chan bool {
finisher := make(chan bool, 1)
go func() {
WorkerPool(input, output, worker, concurrency, true)
finisher <- true
close(finisher)
}()
return finisher
}
// WorkerPoolDrain runs function per input without returning anything. Goroutines close on channel close.
// returns finish channel that returns single boolean true after goroutines finish
func WorkerPoolDrain[T1 any](worker func(T1), concurrency int, input chan T1) (finish chan bool) {
finish = make(chan bool, 1)
go func() {
wg := sync.WaitGroup{}
wg.Add(concurrency)
for i := 0; i < concurrency; i++ {
go func() {
defer wg.Done()
for w := range input {
worker(w)
}
}()
}
wg.Wait()
finish <- true
}()
return finish
}
// WorkerPoolAsync returns a function that adds new job to queue and returns a channel with result, and function to stop worker
func WorkerPoolAsync[T1, T2 any](worker func(T1) T2, concurrency int) (async func(T1) chan T2, stop func()) {
inCh := make(chan Response[T1, T2], concurrency/2+1)
finish := WorkerPoolDrain(func(in Response[T1, T2]) {
in.ReturnCh <- worker(in.Data)
}, concurrency, inCh)
return func(in T1) (out chan T2) {
ch := make(chan T2, 1)
inCh <- Response[T1, T2]{
ReturnCh: ch,
Data: in,
}
return ch
}, func() {
close(inCh)
<-finish
}
}