-
Notifications
You must be signed in to change notification settings - Fork 41
/
webtransportsconnection.go
77 lines (69 loc) · 1.49 KB
/
webtransportsconnection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package signalr
import (
"context"
"fmt"
"github.com/quic-go/webtransport-go"
"sync"
)
type webTransportsConnection struct {
ConnectionBase
session *webtransport.Session
// current stream guarded by mutex
stream webtransport.Stream
sync.Mutex
}
func newWebTransportsConnection(ctx context.Context, connectionID string, session *webtransport.Session) *webTransportsConnection {
w := &webTransportsConnection{
session: session,
ConnectionBase: *NewConnectionBase(ctx, connectionID),
}
return w
}
func (w *webTransportsConnection) Write(p []byte) (n int, err error) {
err = w.syncStream()
if err != nil {
return 0, err
}
n, err = ReadWriteWithContext(w.Context(),
func() (int, error) {
return w.stream.Write(p)
},
func() {})
if err != nil {
err = fmt.Errorf("%T: %w", w, err)
_ = w.closeStream()
}
return n, err
}
func (w *webTransportsConnection) Read(p []byte) (n int, err error) {
err = w.syncStream()
if err != nil {
return 0, err
}
n, err = ReadWriteWithContext(w.Context(),
func() (int, error) {
return w.stream.Read(p)
},
func() {})
if err != nil {
err = fmt.Errorf("%T: %w", w, err)
_ = w.closeStream()
}
return n, err
}
func (w *webTransportsConnection) syncStream() (err error) {
w.Lock()
defer w.Unlock()
if w.stream == nil {
w.stream, err = w.session.OpenStream()
}
return
}
func (w *webTransportsConnection) closeStream() (err error) {
w.Lock()
defer w.Unlock()
if w.stream == nil {
return nil
}
return w.stream.Close()
}