-
Notifications
You must be signed in to change notification settings - Fork 11
/
from_fn.go
61 lines (50 loc) · 1.14 KB
/
from_fn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package co
import (
"go.tempura.ink/co/ds/queue"
syncx "go.tempura.ink/co/internal/syncx"
)
type asyncFn[R any] func() (R, error)
type AsyncFns[R any] struct {
*asyncSequence[R]
fnQueue *queue.Queue[asyncFn[R]]
}
func FromFn[R any]() *AsyncFns[R] {
a := &AsyncFns[R]{
fnQueue: queue.NewQueue[asyncFn[R]](),
}
a.asyncSequence = NewAsyncSequence[R](a)
return a
}
func (c *AsyncFns[R]) AddFns(fns ...func() (R, error)) *AsyncFns[R] {
for i := range fns {
c.fnQueue.Enqueue(fns[i])
}
return c
}
func (c *AsyncFns[R]) iterator() Iterator[R] {
it := &asyncFnsIterator[R]{
AsyncFns: c,
}
it.asyncSequenceIterator = NewAsyncSequenceIterator[R](it)
return it
}
type asyncFnsIterator[R any] struct {
*asyncSequenceIterator[R]
*AsyncFns[R]
}
func (it *asyncFnsIterator[R]) next() *Optional[R] {
for fn := it.fnQueue.Dequeue(); it.fnQueue.Len() != 0; fn = it.fnQueue.Dequeue() {
val, err := syncx.SafeEFn(fn)
if err != nil {
it.handleError(err)
if it.errorMode.shouldSkip() {
continue
}
if it.errorMode.shouldStop() {
return NewOptionalEmpty[R]()
}
}
return OptionalOf(val)
}
return NewOptionalEmpty[R]()
}