forked from BlackAngel35/json_stream_parser
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjson_stream_parser.py
executable file
·235 lines (182 loc) · 5.39 KB
/
json_stream_parser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
from typing import Tuple, Union, List, Dict
__all__ = ('load_iter', 'JSONDecodeError')
class JSONDecodeError(Exception):
pass
class JSONEOFError(JSONDecodeError):
pass
# TODO: options
def load_iter(fp, *, object_pairs_hook=dict):
ch = ''
while True:
try:
ch = _skip_ch_space(ch, fp)
except JSONEOFError:
return
obj, ch = _load_obj(ch, fp, object_pairs_hook=object_pairs_hook)
yield obj
Value = Union[int, float, str, None, List['Value'], Dict[str, 'Value']]
def _load_obj(ch, fp, *, object_pairs_hook) -> Tuple[Value, str]:
if ch == '{':
ch = ''
pairs = []
while True:
ch = _skip_ch_space(ch, fp)
if ch == '}':
return object_pairs_hook(pairs), ''
if pairs:
if ch != ',':
raise JSONDecodeError('expect comma, got %r', ch)
ch = _skip_space(fp)
if ch != '"':
raise JSONDecodeError('expect quote, got %r', ch)
key = _load_str(fp)
ch = _skip_space(fp)
if ch != ':':
raise JSONDecodeError('expect colon, got %r', ch)
val, ch = _load_obj(_skip_space(fp), fp, object_pairs_hook=object_pairs_hook)
pairs.append((key, val))
elif ch == '[':
ch = ''
rv = []
while True:
ch = _skip_ch_space(ch, fp)
if ch == ']':
return rv, ''
if len(rv) > 0:
if ch != ',':
raise JSONDecodeError('expect comma, got %r', ch)
ch = _skip_space(fp)
val, ch = _load_obj(ch, fp, object_pairs_hook=object_pairs_hook)
rv.append(val)
elif ch == 't':
_expect(fp, 'rue')
return True, ''
elif ch == 'f':
_expect(fp, 'alse')
return False, ''
elif ch == 'n':
_expect(fp, 'ull')
return None, ''
elif ch == '"':
return _load_str(fp), ''
elif ch in '0123456789-':
return _load_num(ch, fp)
else:
raise JSONDecodeError('unknown char: %r', ch)
def _expect(fp, tok):
if tok != fp.read(len(tok)):
raise JSONDecodeError('expect %r', tok)
_ESC_MAP = {
'"': '"',
"\\": '\\',
'/': '/',
'b': '\b',
'f': '\f',
'n': '\n',
'r': '\r',
't': '\t',
}
def _load_str(fp) -> str:
rv = []
while True:
ch = _read_char(fp, 'got eof on string')
if ch == '"':
return ''.join(rv)
elif ch == '\\':
ch = _read_char(fp, 'got eof on string escape')
if ch in _ESC_MAP:
rv.append(_ESC_MAP[ch])
elif ch == 'u':
digits = fp.read(4)
if len(digits) != 4:
raise JSONDecodeError('expect 4-hex-digits')
try:
ch = chr(int(digits, 16))
except ValueError:
raise JSONDecodeError('expect 4-hex-digits: got %r', digits)
rv.append(ch)
else:
raise JSONDecodeError('bad excape')
else:
if ord(ch) <= 0x1f:
raise JSONDecodeError('unexpected control char: %r', ch)
rv.append(ch)
def _read_char(fp, errmsg) -> str:
ch = fp.read(1)
if not ch:
raise JSONDecodeError(errmsg)
return ch
def _load_num(ch: str, fp) -> Tuple[Union[int, float], str]:
s = ch
# sign
if ch == '-':
ch = _read_char(fp, 'expect number')
s += ch
# first digits of int
if ch not in '0123456789':
raise JSONDecodeError('expected 0123456789, got: %r', ch)
is_zero = (ch == '0')
# remain of int
digits, ch = _maybe_digits(fp) # NOTE: ch may be ''
s += digits
# zero is special
if is_zero:
if digits:
raise JSONDecodeError('digits follows zero')
return 0, ''
# frac
is_float = False
if ch == '.':
is_float = True
digits, ch = _expect_digits(fp)
s += '.' + digits
# exp
if ch == 'e':
is_float = True
s += ch
ch = _read_char(fp, 'expect exp')
if ch in '+-':
s += ch
ch = _read_char(fp, 'expect exp digits')
if ch not in '0123456789':
raise JSONDecodeError('expected 0123456789, got: %r', ch)
s += ch
digits, ch = _maybe_digits(fp)
s += digits
if is_float:
return float(s), ch
else:
return int(s), ch
def _expect_digits(fp):
ch = _read_char(fp, 'expect digits')
if ch not in '0123456789':
raise JSONDecodeError('expected 0123456789, got: %r', ch)
digits, next_ch = _maybe_digits(fp)
return ch + digits, next_ch
def _maybe_digits(fp) -> Tuple[str, str]:
s = ''
while True:
ch = fp.read(1)
if ch and ch in '0123456789':
s += ch
else:
break
return s, ch
def _skip_space(fp) -> str:
while True:
ch = fp.read(1)
if not ch:
raise JSONEOFError()
if ch not in ' \t\n\r':
return ch
def _skip_ch_space(ch, fp) -> str:
if ch and ch not in ' \t\n\r':
return ch
return _skip_space(fp)
def main():
import sys
import json
for obj in load_iter(sys.stdin):
print(json.dumps(obj, ensure_ascii=False), end='\n')
if __name__ == '__main__':
main()