-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcrypto.py
84 lines (64 loc) · 3.4 KB
/
crypto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import base64
from binascii import hexlify
from coincurve import PrivateKey
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Hash import SHA3_256, SHA256 # type: ignore
from rotkehlchen.errors import UnableToDecryptRemoteData
from rotkehlchen.typing import BinaryEthAddress, EthAddress
# AES encrypt/decrypt taken from here: https://stackoverflow.com/a/44212550/110395
def encrypt(key: bytes, source: bytes) -> str:
assert isinstance(key, bytes), 'key should be given in bytes'
assert isinstance(source, bytes), 'source should be given in bytes'
key = SHA256.new(key).digest() # use SHA-256 over our key to get a proper-sized AES key
iv = Random.new().read(AES.block_size) # generate iv
encryptor = AES.new(key, AES.MODE_CBC, iv)
padding = AES.block_size - len(source) % AES.block_size # calculate needed padding
source += bytes([padding]) * padding # Python 2.x: source += chr(padding) * padding
data = iv + encryptor.encrypt(source) # store the iv at the beginning and encrypt
return base64.b64encode(data).decode("latin-1")
def decrypt(key: bytes, given_source: str) -> bytes:
"""
Decrypts the given source data we with the given key.
Returns the decrypted data.
If data can't be decrypted then raises UnableToDecryptRemoteData
"""
assert isinstance(key, bytes), 'key should be given in bytes'
assert isinstance(given_source, str), 'source should be given in string'
source = base64.b64decode(given_source.encode("latin-1"))
key = SHA256.new(key).digest() # use SHA-256 over our key to get a proper-sized AES key
iv = source[:AES.block_size] # extract the iv from the beginning
decryptor = AES.new(key, AES.MODE_CBC, iv)
data = decryptor.decrypt(source[AES.block_size:]) # decrypt
padding = data[-1] # pick the padding value from the end; Python 2.x: ord(data[-1])
if data[-padding:] != bytes([padding]) * padding: # Python 2.x: chr(padding) * padding
raise UnableToDecryptRemoteData(
'Invalid padding when decrypting the DB data we received from the server. '
'Are you using a new user and if yes have you used the same password as before? '
'If you have then please open a bug report.',
)
return data[:-padding] # remove the padding
def sha3(data: bytes) -> bytes:
"""
Raises:
RuntimeError: If Keccak lib initialization failed, or if the function
failed to compute the hash.
TypeError: This function does not accept unicode objects, they must be
encoded prior to usage.
"""
return SHA3_256.new(data).digest()
def ishash(data: bytes) -> bool:
return len(data) == 32
def privatekey_to_publickey(private_key_bin: bytes) -> bytes:
""" Returns public key in bitcoins 'bin' encoding. """
if not ishash(private_key_bin):
raise ValueError('private_key_bin format mismatch. maybe hex encoded?')
private_key = PrivateKey(private_key_bin)
return private_key.public_key.format(compressed=False)
def publickey_to_address(publickey: bytes) -> BinaryEthAddress:
return BinaryEthAddress(sha3(publickey[1:])[12:])
def privatekey_to_address(private_key_bin: bytes) -> BinaryEthAddress:
return publickey_to_address(privatekey_to_publickey(private_key_bin))
def address_encoder(address: BinaryEthAddress) -> EthAddress:
assert len(address) in (20, 0)
return EthAddress('0x' + hexlify(address).decode())