forked from quic-go/quic-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
streams_map_outgoing_bidi.go
147 lines (127 loc) · 3.58 KB
/
streams_map_outgoing_bidi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// This file was automatically generated by genny.
// Any changes will be lost if this file is regenerated.
// see https://github.com/cheekybits/genny
package quic
import (
"fmt"
"sync"
"github.com/lucas-clemente/quic-go/internal/protocol"
"github.com/lucas-clemente/quic-go/internal/qerr"
"github.com/lucas-clemente/quic-go/internal/wire"
)
type outgoingBidiStreamsMap struct {
mutex sync.RWMutex
cond sync.Cond
streams map[protocol.StreamID]streamI
nextStream protocol.StreamID // stream ID of the stream returned by OpenStream(Sync)
maxStream protocol.StreamID // the maximum stream ID we're allowed to open
maxStreamSet bool // was maxStream set. If not, it's not possible to any stream (also works for stream 0)
blockedSent bool // was a STREAMS_BLOCKED sent for the current maxStream
newStream func(protocol.StreamID) streamI
queueStreamIDBlocked func(*wire.StreamsBlockedFrame)
closeErr error
}
func newOutgoingBidiStreamsMap(
nextStream protocol.StreamID,
newStream func(protocol.StreamID) streamI,
queueControlFrame func(wire.Frame),
) *outgoingBidiStreamsMap {
m := &outgoingBidiStreamsMap{
streams: make(map[protocol.StreamID]streamI),
nextStream: nextStream,
newStream: newStream,
queueStreamIDBlocked: func(f *wire.StreamsBlockedFrame) { queueControlFrame(f) },
}
m.cond.L = &m.mutex
return m
}
func (m *outgoingBidiStreamsMap) OpenStream() (streamI, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.closeErr != nil {
return nil, m.closeErr
}
str, err := m.openStreamImpl()
if err != nil {
return nil, streamOpenErr{err}
}
return str, nil
}
func (m *outgoingBidiStreamsMap) OpenStreamSync() (streamI, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
for {
if m.closeErr != nil {
return nil, m.closeErr
}
str, err := m.openStreamImpl()
if err == nil {
return str, nil
}
if err != nil && err != errTooManyOpenStreams {
return nil, streamOpenErr{err}
}
m.cond.Wait()
}
}
func (m *outgoingBidiStreamsMap) openStreamImpl() (streamI, error) {
if !m.maxStreamSet || m.nextStream > m.maxStream {
if !m.blockedSent {
if m.maxStreamSet {
m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
Type: protocol.StreamTypeBidi,
StreamLimit: m.maxStream.StreamNum(),
})
} else {
m.queueStreamIDBlocked(&wire.StreamsBlockedFrame{
Type: protocol.StreamTypeBidi,
StreamLimit: 0,
})
}
m.blockedSent = true
}
return nil, errTooManyOpenStreams
}
s := m.newStream(m.nextStream)
m.streams[m.nextStream] = s
m.nextStream += 4
return s, nil
}
func (m *outgoingBidiStreamsMap) GetStream(id protocol.StreamID) (streamI, error) {
m.mutex.RLock()
if id >= m.nextStream {
m.mutex.RUnlock()
return nil, qerr.Error(qerr.StreamStateError, fmt.Sprintf("peer attempted to open stream %d", id))
}
s := m.streams[id]
m.mutex.RUnlock()
return s, nil
}
func (m *outgoingBidiStreamsMap) DeleteStream(id protocol.StreamID) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if _, ok := m.streams[id]; !ok {
return fmt.Errorf("Tried to delete unknown stream %d", id)
}
delete(m.streams, id)
return nil
}
func (m *outgoingBidiStreamsMap) SetMaxStream(id protocol.StreamID) {
m.mutex.Lock()
if !m.maxStreamSet || id > m.maxStream {
m.maxStream = id
m.maxStreamSet = true
m.blockedSent = false
m.cond.Broadcast()
}
m.mutex.Unlock()
}
func (m *outgoingBidiStreamsMap) CloseWithError(err error) {
m.mutex.Lock()
m.closeErr = err
for _, str := range m.streams {
str.closeForShutdown(err)
}
m.cond.Broadcast()
m.mutex.Unlock()
}