-
Notifications
You must be signed in to change notification settings - Fork 0
/
datagram_queue_test.go
118 lines (101 loc) · 3.17 KB
/
datagram_queue_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package quic
import (
"errors"
"github.com/tumi8/quic-go/noninternal/utils"
"github.com/tumi8/quic-go/noninternal/wire"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Datagram Queue", func() {
var queue *datagramQueue
var queued chan struct{}
BeforeEach(func() {
queued = make(chan struct{}, 100)
queue = newDatagramQueue(func() { queued <- struct{}{} }, utils.DefaultLogger)
})
Context("sending", func() {
It("returns nil when there's no datagram to send", func() {
Expect(queue.Peek()).To(BeNil())
})
It("queues a datagram", func() {
done := make(chan struct{})
frame := &wire.DatagramFrame{Data: []byte("foobar")}
go func() {
defer GinkgoRecover()
defer close(done)
Expect(queue.AddAndWait(frame)).To(Succeed())
}()
Eventually(queued).Should(HaveLen(1))
Consistently(done).ShouldNot(BeClosed())
f := queue.Peek()
Expect(f.Data).To(Equal([]byte("foobar")))
Eventually(done).Should(BeClosed())
queue.Pop()
Expect(queue.Peek()).To(BeNil())
})
It("returns the same datagram multiple times, when Pop isn't called", func() {
sent := make(chan struct{}, 1)
go func() {
defer GinkgoRecover()
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foo")})).To(Succeed())
sent <- struct{}{}
Expect(queue.AddAndWait(&wire.DatagramFrame{Data: []byte("bar")})).To(Succeed())
sent <- struct{}{}
}()
Eventually(queued).Should(HaveLen(1))
f := queue.Peek()
Expect(f.Data).To(Equal([]byte("foo")))
Eventually(sent).Should(Receive())
Expect(queue.Peek()).To(Equal(f))
Expect(queue.Peek()).To(Equal(f))
queue.Pop()
f = queue.Peek()
Expect(f.Data).To(Equal([]byte("bar")))
})
It("closes", func() {
errChan := make(chan error, 1)
go func() {
defer GinkgoRecover()
errChan <- queue.AddAndWait(&wire.DatagramFrame{Data: []byte("foobar")})
}()
Consistently(errChan).ShouldNot(Receive())
queue.CloseWithError(errors.New("test error"))
Eventually(errChan).Should(Receive(MatchError("test error")))
})
})
Context("receiving", func() {
It("receives DATAGRAM frames", func() {
queue.HandleDatagramFrame(&wire.DatagramFrame{Data: []byte("foo")})
queue.HandleDatagramFrame(&wire.DatagramFrame{Data: []byte("bar")})
data, err := queue.Receive()
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal([]byte("foo")))
data, err = queue.Receive()
Expect(err).ToNot(HaveOccurred())
Expect(data).To(Equal([]byte("bar")))
})
It("blocks until a frame is received", func() {
c := make(chan []byte, 1)
go func() {
defer GinkgoRecover()
data, err := queue.Receive()
Expect(err).ToNot(HaveOccurred())
c <- data
}()
Consistently(c).ShouldNot(Receive())
queue.HandleDatagramFrame(&wire.DatagramFrame{Data: []byte("foobar")})
Eventually(c).Should(Receive(Equal([]byte("foobar"))))
})
It("closes", func() {
errChan := make(chan error, 1)
go func() {
defer GinkgoRecover()
_, err := queue.Receive()
errChan <- err
}()
Consistently(errChan).ShouldNot(Receive())
queue.CloseWithError(errors.New("test error"))
Eventually(errChan).Should(Receive(MatchError("test error")))
})
})
})