-
Notifications
You must be signed in to change notification settings - Fork 41
/
streamer.go
40 lines (36 loc) · 831 Bytes
/
streamer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package signalr
import (
"reflect"
"sync"
)
type streamer struct {
cancels sync.Map
conn hubConnection
}
func (s *streamer) Start(invocationID string, reflectedChannel reflect.Value) {
go func() {
loop:
for {
// Waits for channel, so might hang
if chanResult, ok := reflectedChannel.Recv(); ok {
if _, ok := s.cancels.Load(invocationID); ok {
s.cancels.Delete(invocationID)
_ = s.conn.Completion(invocationID, nil, "")
break loop
}
if s.conn.Context().Err() != nil {
break loop
}
_ = s.conn.StreamItem(invocationID, chanResult.Interface())
} else {
if s.conn.Context().Err() == nil {
_ = s.conn.Completion(invocationID, nil, "")
}
break loop
}
}
}()
}
func (s *streamer) Stop(invocationID string) {
s.cancels.Store(invocationID, struct{}{})
}