forked from quic-go/quic-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams_map_outgoing_generic.go
145 lines (126 loc) · 3.61 KB
/
streams_map_outgoing_generic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package quic
import (
"fmt"
"sync"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/qerr"
"github.com/lucas-clemente/quic-go/internal/wire"
)
//go:generate genny -in $GOFILE -out streams_map_outgoing_bidi.go gen "item=streamI Item=BidiStream streamTypeGeneric=protocol.StreamTypeBidi"
//go:generate genny -in $GOFILE -out streams_map_outgoing_uni.go gen "item=sendStreamI Item=UniStream streamTypeGeneric=protocol.StreamTypeUni"
type outgoingItemsMap struct {
mutex sync.RWMutex
cond sync.Cond
streams map[protocol.StreamID]item
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0)
blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream
newStream func(protocol.StreamID) item
queueStreamIDBlocked func(*wire.StreamsBlockedFrame)
closeErr error
}
func newOutgoingItemsMap(
nextStream protocol.StreamID,
newStream func(protocol.StreamID) item,
queueControlFrame func(wire.Frame),
) *outgoingItemsMap {
m := &outgoingItemsMap{
streams: make(map[protocol.StreamID]item),
nextStream: nextStream,
newStream: newStream,
queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) },
}
m.cond.L = &m.mutex
return m
}
func (m *outgoingItemsMap) OpenStream() (item, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.closeErr != nil {
return nil, m.closeErr
}
str, err := m.openStreamImpl()
if err != nil {
return nil, streamOpenErr{err}
}
return str, nil
}
func (m *outgoingItemsMap) OpenStreamSync() (item, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
for {
if m.closeErr != nil {
return nil, m.closeErr
}
str, err := m.openStreamImpl()
if err == nil {
return str, nil
}
if err != nil && err != errTooManyOpenStreams {
return nil, streamOpenErr{err}
}
m.cond.Wait()
}
}
func (m *outgoingItemsMap) openStreamImpl() (item, error) {
if !m.maxStreamSet || m.nextStream > m.maxStream {
if !m.blockedSent {
if m.maxStreamSet {
m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
Type: streamTypeGeneric,
StreamLimit: m.maxStream.StreamNum(),
})
} else {
m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
Type: streamTypeGeneric,
StreamLimit: 0,
})
}
m.blockedSent = true
}
return nil, errTooManyOpenStreams
}
s := m.newStream(m.nextStream)
m.streams[m.nextStream] = s
m.nextStream += 4
return s, nil
}
func (m *outgoingItemsMap) GetStream(id protocol.StreamID) (item, error) {
m.mutex.RLock()
if id >= m.nextStream {
m.mutex.RUnlock()
return nil, qerr.Error(qerr.StreamStateError, fmt.Sprintf("peer attempted to open stream %d", id))
}
s := m.streams[id]
m.mutex.RUnlock()
return s, nil
}
func (m *outgoingItemsMap) DeleteStream(id protocol.StreamID) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if _, ok := m.streams[id]; !ok {
return fmt.Errorf("Tried to delete unknown stream %d", id)
}
delete(m.streams, id)
return nil
}
func (m *outgoingItemsMap) SetMaxStream(id protocol.StreamID) {
m.mutex.Lock()
if !m.maxStreamSet || id > m.maxStream {
m.maxStream = id
m.maxStreamSet = true
m.blockedSent = false
m.cond.Broadcast()
}
m.mutex.Unlock()
}
func (m *outgoingItemsMap) CloseWithError(err error) {
m.mutex.Lock()
m.closeErr = err
for _, str := range m.streams {
str.closeForShutdown(err)
}
m.cond.Broadcast()
m.mutex.Unlock()
}