forked from bcicen/jstream
-
Notifications
You must be signed in to change notification settings - Fork 0
/
scanner.go
114 lines (95 loc) · 2.49 KB
/
scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package jstream
import (
"io"
"sync/atomic"
)
const (
chunk = 4095 // ~4k
maxUint = ^uint(0)
maxInt = int64(maxUint >> 1)
nullByte = byte(0)
)
type scanner struct {
pos int64 // position in reader
ipos int64 // internal buffer position
ifill int64 // internal buffer fill
end int64
buf [chunk + 1]byte // internal buffer (with a lookback size of 1)
nbuf [chunk]byte // next internal buffer
fillReq chan struct{}
fillReady chan int64
readerErr error // underlying reader error, if any
}
func newScanner(r io.Reader) *scanner {
sr := &scanner{
end: maxInt,
fillReq: make(chan struct{}),
fillReady: make(chan int64),
}
go func() {
var rpos int64 // total bytes read into buffer
defer func() {
atomic.StoreInt64(&sr.end, rpos)
close(sr.fillReady)
}()
for range sr.fillReq {
scan:
n, err := r.Read(sr.nbuf[:])
if n == 0 {
switch err {
case io.EOF: // reader is exhausted
return
case nil: // no data and no error, retry fill
goto scan
default: // unexpected reader error
sr.readerErr = err
return
}
}
rpos += int64(n)
sr.fillReady <- int64(n)
}
}()
sr.fillReq <- struct{}{} // initial fill
return sr
}
// remaining returns the number of unread bytes
// if EOF for the underlying reader has not yet been found,
// maximum possible integer value will be returned
func (s *scanner) remaining() int64 {
if atomic.LoadInt64(&s.end) == maxInt {
return maxInt
}
return atomic.LoadInt64(&s.end) - s.pos
}
// read byte at current position (without advancing)
func (s *scanner) cur() byte { return s.buf[s.ipos] }
// read next byte
func (s *scanner) next() byte {
if s.pos >= atomic.LoadInt64(&s.end) {
return nullByte
}
s.ipos++
if s.ipos > s.ifill { // internal buffer is exhausted
s.ifill = <-s.fillReady
s.buf[0] = s.buf[len(s.buf)-1] // copy current last item to guarantee lookback
copy(s.buf[1:], s.nbuf[:]) // copy contents of pre-filled next buffer
s.ipos = 1 // move to beginning of internal buffer
// request next fill to be prepared
if s.end == maxInt {
s.fillReq <- struct{}{}
}
}
s.pos++
return s.buf[s.ipos]
}
// back undoes a previous call to next(), moving backward one byte in the internal buffer.
// as we only guarantee a lookback buffer size of one, any subsequent calls to back()
// before calling next() may panic
func (s *scanner) back() {
if s.ipos <= 0 {
panic("back buffer exhausted")
}
s.ipos--
s.pos--
}