-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_in.go
131 lines (113 loc) · 2.47 KB
/
stream_in.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package nu
import (
"context"
"fmt"
"io"
)
func newInputStreamRaw(id int) *rawStreamIn {
out := &rawStreamIn{
id: id,
buf: make(chan []byte, 10),
}
out.rdr, out.data = io.Pipe()
return out
}
type rawStreamIn struct {
id int
buf chan []byte
onAck func(ctx context.Context, id int) // plugin has consumed the latest Data msg
data io.WriteCloser
rdr io.ReadCloser
}
func (lsi *rawStreamIn) Run(ctx context.Context) {
up := make(chan struct{})
go func() {
defer lsi.data.Close()
close(up)
for {
select {
case in, ok := <-lsi.buf:
if !ok {
return
}
// todo: check for error - user closed the reader to signal to drop the stream?
lsi.data.Write(in)
lsi.onAck(ctx, lsi.id)
case <-ctx.Done():
return
}
}
}()
<-up
}
func (lsi *rawStreamIn) received(ctx context.Context, v any) error {
in, ok := v.([]byte)
if !ok {
return fmt.Errorf("raw stream input must be of type []byte, got %T", v)
}
lsi.buf <- in
return nil
}
func (lsi *rawStreamIn) endOfData() {
close(lsi.buf)
}
func newInputStreamList(id int) *listStreamIn {
in := &listStreamIn{
id: id,
data: make(chan Value),
buf: make(chan Value, 10),
}
return in
}
type listStreamIn struct {
id int
data chan Value // incoming data to be consumed by plugin
buf chan Value
// this callback is triggered to signal that the last item received
// has been processed, consumer is ready for the next one
onAck func(ctx context.Context, id int)
}
// return (readonly) chan to the command's Run handler
func (lsi *listStreamIn) InputStream() <-chan Value {
return lsi.data
}
func (lsi *listStreamIn) Run(ctx context.Context) {
// hackish way to make sure that when this func returns the
// goroutine is running. otherwise ie tests are flaky...
up := make(chan struct{})
go func() {
defer close(lsi.data)
close(up)
for {
select {
case in, ok := <-lsi.buf:
if !ok {
return
}
select {
case lsi.data <- in:
lsi.onAck(ctx, lsi.id)
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
<-up
}
// main loop calls on Data msg to given stream
func (lsi *listStreamIn) received(ctx context.Context, v any) error {
in, ok := v.(Value)
if !ok {
return fmt.Errorf("list stream input must be of type Value, got %T", v)
}
lsi.buf <- in
return nil
}
// main loop signals there will be no more data for the stream
// ctx with timeout for how long wait?
func (lsi *listStreamIn) endOfData() {
close(lsi.buf)
}