-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathencryption.py
159 lines (126 loc) · 4.87 KB
/
encryption.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
Usage: Python code to demonstrate the implementation on Envelope Encryption using AWS KMS
Before running this code, configure AWS Credentials in your environment:
$ export AWS_ACCESS_KEY_ID=<Your Access KEY ID>
$ export AWS_SECRET_ACCESS_KEY=<Your Secret Access Key>
Written on Dec/28/2022
"""
# Importing required libraries
import boto3
from botocore.exceptions import ClientError
from logging import getLogger
from os import urandom
import secrets
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
import base64
import json
# Setting the region
region = "us-west-1"
getLogger().setLevel("INFO")
getLogger("botocore").setLevel("CRITICAL")
getLogger("boto3").setLevel("CRITICAL")
# Create the KMS client
kms_client = boto3.client("kms", region_name=region)
# AWS CMK Key ARN
KEY_ID = "<Your Key ARN>"
TOPIC = "envelope_encryption_demo"
DATA_ENCRYPTION_KEY_GENERATOR = "OPENSSL"
def encrypt_data(plaintext, key_id):
"""
Method to encrypt the data using envelope encryption
The method returns encrypted data, encrypted data key, and also respective base64 encoded values.
"""
# Generate a random IV
iv = urandom(16)
plaintext_data_key: bytes
try:
if DATA_ENCRYPTION_KEY_GENERATOR == "KMS":
# Generate a data key using AWS KMS
data_key = kms_client.generate_data_key(KeyId=key_id, KeySpec="AES_256")
plaintext_data_key = data_key["Plaintext"]
elif DATA_ENCRYPTION_KEY_GENERATOR == "OPENSSL":
# Generate a data key using OpenSSL
plaintext_data_key = secrets.token_bytes(32)
# encrypt the data key with AWS KMS
data_key: dict = kms_client.encrypt(
KeyId=key_id, Plaintext=plaintext_data_key
)
except ClientError as e:
getLogger().error(e)
exit()
# Encrypt the data with AES CBC mode
encrypted_data_key = data_key["CiphertextBlob"]
cipher = Cipher(algorithms.AES(plaintext_data_key), modes.GCM(iv))
encryptor = cipher.encryptor()
padder = padding.PKCS7(128).padder()
padded_data = padder.update(plaintext)
padded_data += padder.finalize()
ciphertext_blob = encryptor.update(padded_data) + encryptor.finalize()
# Encode the encrypted data and data key with base64
encoded_ciphertext_blob = base64.b64encode(ciphertext_blob)
encoded_encrypted_data_key = base64.b64encode(encrypted_data_key)
encoded_gcm_tag = base64.b64encode(encryptor.tag)
# Return the encrypted data and the encrypted data key
return (
ciphertext_blob,
encoded_ciphertext_blob,
encrypted_data_key,
encoded_encrypted_data_key,
iv,
encoded_gcm_tag,
)
def decrypt_data(encoded_ciphertext_blob: bytes, encoded_encrypted_data_key: bytes, iv: bytes, gcm_tag: bytes) -> bytes:
"""
Method to decrypt the data
The method returns the decrypted data
"""
# Base64 decode of encrypted data and encrypted data key
decoded_ciphertext_blob: bytes = base64.b64decode(encoded_ciphertext_blob)
decoded_encrypted_data_key: bytes = base64.b64decode(encoded_encrypted_data_key)
decoded_gcmtag: bytes = base64.b64decode(gcm_tag)
# Decrypt the data key
data_key: dict = kms_client.decrypt(CiphertextBlob=decoded_encrypted_data_key)
plaintext_data_key: bytes = data_key["Plaintext"]
# Decrypt the data
cipher = Cipher(algorithms.AES(plaintext_data_key), modes.GCM(iv))
decryptor = cipher.decryptor()
decrypted_padded_data: bytes = decryptor.update(decoded_ciphertext_blob) + decryptor.finalize_with_tag(decoded_gcmtag)
unpadder = padding.PKCS7(128).unpadder()
plaintext: bytes = unpadder.update(decrypted_padded_data)
plaintext += unpadder.finalize()
# Return the decrypted data
return plaintext
def main():
"""
Main method
"""
# Setting the plain text, which needs to be encrypted
# input_plaintext = b"This is a confidential message which needs to be encrypted."
input_plaintext = bytes(
json.dumps(["foo", {"bar": ("baz", None, 1.0, 2)}]), "utf-8"
)
# Encrypting the data
(
ciphertext_blob,
encoded_ciphertext_blob,
encrypted_data_key,
encoded_encrypted_data_key,
iv,
gcm_tag
) = encrypt_data(input_plaintext, KEY_ID)
print(encoded_ciphertext_blob)
# Decrypt the data
plaintext = decrypt_data(encoded_ciphertext_blob, encoded_encrypted_data_key, iv, gcm_tag)
print(json.loads(str(plaintext, "utf-8")))
# Store the encrypted string and the encrypted data key in a file (optional)
with open("datastore.csv", "w") as f:
f.writelines(
[
str(encoded_ciphertext_blob) + ",",
str(encoded_encrypted_data_key) + ",",
str(iv),
]
)
if __name__ == "__main__":
main()