-
Notifications
You must be signed in to change notification settings - Fork 0
/
sn_rpc.py
113 lines (92 loc) · 2.22 KB
/
sn_rpc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import os
import json
import hashlib
import time
import base64
import requests
import Crypto.Cipher.AES
SN_RPC_URL = "https://www.scoutnet.de/jsonrpc/server.php"
SN_RPC_IV = b'1234567890123456'
class RPCError(Exception):
pass
def rpc(url, method, *params):
"""
JSON-RPC 1.0 über HTTP(S) POST
"""
call_id = os.urandom(16).hex()
json_data = {
"method": method,
"params": params,
"id": call_id
}
got = requests.post(
url,
json=json_data
).json()
e = got["error"]
if e is not None:
raise RPCError(e)
return got["result"]
def get_data_by_global_id(global_id, query={}):
return rpc(
SN_RPC_URL,
"get_data_by_global_id",
global_id,
query
)
def checkPermission(type, global_id, username, auth):
return rpc(
SN_RPC_URL,
"checkPermission",
type,
global_id,
username,
auth
)
def requestPermission(type, global_id, username, auth):
return rpc(
SN_RPC_URL,
"requestPermission",
type,
global_id,
username,
auth
)
def setData(type, id, object, username, auth):
return rpc(
SN_RPC_URL,
"requestPermission",
type,
id,
object,
username,
auth
)
def deleteObject(type, global_id, id, username, auth):
return rpc(
SN_RPC_URL,
"requestPermission",
type,
global_id,
id,
username,
auth
)
def generate_auth(api_key, check_value):
auth = {
"sha1": hashlib.sha1(check_value).hexdigest(),
"md5": hashlib.md5(check_value).hexdigest(),
"time": time.time()
}
auth = json.dumps(auth).encode("ascii")
first_block = os.urandom(16)
aes = Crypto.Cipher.AES.new(api_key, Crypto.Cipher.AES.MODE_CBC, SN_RPC_IV)
blocks = first_block + auth
blocks = blocks + bytes(16 - len(blocks) % 16) # padding
out = aes.encrypt(blocks)
out = base64.b64encode(out)
for a, b in ((b'+', b'-'), (b'/', b'_'), (b'=', b'~')):
out = out.replace(a, b)
return out
def generate_check_value(*args):
return ("".join(str(p) for p in args)).encode("ascii")