forked from karrick/goavro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
binaryReader.go
151 lines (142 loc) · 5.17 KB
/
binaryReader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package goavro
import (
"fmt"
"io"
"math"
)
// bytesBinaryReader reads bytes from io.Reader and returns byte slice of
// specified size or the error encountered while trying to read those bytes.
func bytesBinaryReader(ior io.Reader) ([]byte, error) {
size, err := longBinaryReader(ior)
if err != nil {
return nil, fmt.Errorf("cannot read bytes: cannot read size: %s", err)
}
if size < 0 {
return nil, fmt.Errorf("cannot read bytes: size is negative: %d", size)
}
if size > MaxBlockSize {
return nil, fmt.Errorf("cannot read bytes: size exceeds MaxBlockSize: %d > %d", size, MaxBlockSize)
}
buf := make([]byte, size)
_, err = io.ReadAtLeast(ior, buf, int(size))
if err != nil {
return nil, fmt.Errorf("cannot read bytes: %s", err)
}
return buf, nil
}
// longBinaryReader reads bytes from io.Reader until has complete long value, or
// read error.
func longBinaryReader(ior io.Reader) (int64, error) {
var value uint64
var shift uint
var err error
var b byte
// NOTE: While benchmarks show it's more performant to invoke ReadByte when
// available, testing whether a variable's data type implements a particular
// method is quite slow too. So perform the test once, and branch to the
// appropriate loop based on the results.
if byteReader, ok := ior.(io.ByteReader); ok {
for {
if b, err = byteReader.ReadByte(); err != nil {
return 0, err // NOTE: must send back unaltered error to detect io.EOF
}
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return (int64(value>>1) ^ -int64(value&1)), nil
}
shift += 7
}
}
// NOTE: ior does not also implement io.ByteReader, so we must allocate a
// byte slice with a single byte, and read each byte into the slice.
buf := make([]byte, 1)
for {
if _, err = ior.Read(buf); err != nil {
return 0, err // NOTE: must send back unaltered error to detect io.EOF
}
b = buf[0]
value |= uint64(b&intMask) << shift
if b&intFlag == 0 {
return (int64(value>>1) ^ -int64(value&1)), nil
}
shift += 7
}
}
// metadataBinaryReader reads bytes from io.Reader until has entire map value,
// or read error.
func metadataBinaryReader(ior io.Reader) (map[string][]byte, error) {
var err error
var value interface{}
// block count and block size
if value, err = longBinaryReader(ior); err != nil {
return nil, fmt.Errorf("cannot read map block count: %s", err)
}
blockCount := value.(int64)
if blockCount < 0 {
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can never be
// made positive
return nil, fmt.Errorf("cannot read map with block count: %d", math.MinInt64)
}
// NOTE: A negative block count implies there is a long encoded block
// size following the negative block count. We have no use for the block
// size in this decoder, so we read and discard the value.
blockCount = -blockCount // convert to its positive equivalent
if _, err = longBinaryReader(ior); err != nil {
return nil, fmt.Errorf("cannot read map block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, fmt.Errorf("cannot read map when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
// NOTE: While the attempt of a RAM optimization shown below is not
// necessary, many encoders will encode all items in a single block. We can
// optimize amount of RAM allocated by runtime for the array by initializing
// the array for that number of items.
mapValues := make(map[string][]byte, blockCount)
for blockCount != 0 {
// Decode `blockCount` datum values from buffer
for i := int64(0); i < blockCount; i++ {
// first decode the key string
keyBytes, err := bytesBinaryReader(ior)
if err != nil {
return nil, fmt.Errorf("cannot read map key: %s", err)
}
key := string(keyBytes)
if _, ok := mapValues[key]; ok {
return nil, fmt.Errorf("cannot read map: duplicate key: %q", key)
}
// metadata values are always bytes
buf, err := bytesBinaryReader(ior)
if err != nil {
return nil, fmt.Errorf("cannot read map value for key %q: %s", key, err)
}
mapValues[key] = buf
}
// Decode next blockCount from buffer, because there may be more blocks
if value, err = longBinaryReader(ior); err != nil {
return nil, fmt.Errorf("cannot read map block count: %s", err)
}
blockCount = value.(int64)
if blockCount < 0 {
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can never be
// made positive
return nil, fmt.Errorf("cannot read map with block count: %d", math.MinInt64)
}
// NOTE: A negative block count implies there is a long encoded
// block size following the negative block count. We have no use for
// the block size in this decoder, so we read and discard the value.
blockCount = -blockCount // convert to its positive equivalent
if _, err = longBinaryReader(ior); err != nil {
return nil, fmt.Errorf("cannot read map block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, fmt.Errorf("cannot read map when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
}
return mapValues, nil
}