-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathr3.py
47 lines (38 loc) · 1.32 KB
/
r3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import urllib3
import urllib.parse
import json as _json
import re
headers = {'Accept-Encoding': 'gzip'}
def url(url, **kwargs):
return url + '?' + urllib.parse.urlencode(kwargs)
def r(method, url, json=None):
"""Returns HTTPResponse object (including res.reason, .status, .headers) and also .json."""
_headers = headers.copy()
if json:
body = _json.dumps(json, separators=(',', ':')).encode()
_headers['Content-Type'] = 'application/json'
else:
body = None
res = urllib3.request(method, url, headers=_headers, body=body)
res.json = _json.loads(data) if (data := res.data.decode()) else None
return res
def get_objects(url):
while url:
res = get(url)
for o in res.json:
yield o
url = res.next_url
def get(url):
"""Returns HTTPResponse object (including res.reason, .status, .headers) and also .json, .next_url."""
res = r('GET', url)
links = [link for link in res.headers.get_all('link', []) if 'rel="next"' in link]
res.next_url = re.search('<(.*)>', links[0]).group(1) if links else None
return res
def post(url, json=None):
return r('POST', url, json)
def put(url, json=None):
return r('PUT', url, json)
def patch(url, json=None):
return r('PATCH', url, json)
def delete(url):
return r('DELETE', url)