-
Notifications
You must be signed in to change notification settings - Fork 0
/
blocking_batch.go
84 lines (77 loc) · 1.65 KB
/
blocking_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package batcher
import (
"context"
"github.com/garugaru/go-batcher/cond"
"sync"
)
// BlockingBatch implementation to create fixed size slices blocking append operations and avoiding OOMs
type BlockingBatch[I any] struct {
items []I
index int
maxSize int
full *cond.Cond
empty *cond.Cond
fl *sync.Mutex
el *sync.Mutex
}
func NewBlockingQueue[I any](size int) *BlockingBatch[I] {
return &BlockingBatch[I]{
items: make([]I, size),
maxSize: size,
full: cond.New(),
empty: cond.New(),
fl: &sync.Mutex{},
el: &sync.Mutex{},
}
}
func (q *BlockingBatch[I]) Push(items ...I) {
q.fl.Lock()
for _, it := range items {
// blocks if the max size has been reached
for q.index == q.maxSize {
q.fl.Unlock()
q.full.Wait(context.TODO())
q.fl.Lock()
}
q.items[q.index] = it
q.index++
if q.index >= q.maxSize {
q.empty.Signal()
}
}
q.fl.Unlock()
}
// PopAll Pops all the current items in the queue.
func (q *BlockingBatch[I]) PopAll(ctx context.Context) []I {
q.waitFull(ctx)
q.fl.Lock()
defer q.fl.Unlock()
cpy := make([]I, q.index)
copy(cpy, q.items)
q.index = 0
q.full.Signal()
return cpy
}
func (q *BlockingBatch[I]) waitFull(ctx context.Context) {
q.el.Lock()
defer q.el.Unlock()
for {
select {
case <-ctx.Done():
return
default:
if q.index < q.maxSize {
q.el.Unlock()
q.empty.Wait(ctx)
q.el.Lock()
} else {
return
}
}
}
}
// Size return the current size of the queue using the internal index
// this method doesn't apply any concurrency control system the result should be treated as an estimate.
func (q *BlockingBatch[I]) Size() int {
return q.index
}