diff --git a/.github/workflows/black.yml b/.github/workflows/black.yml index af06272..cc31d1b 100644 --- a/.github/workflows/black.yml +++ b/.github/workflows/black.yml @@ -12,4 +12,4 @@ jobs: python -m pip install --upgrade pip pip install -r requirements.txt - run: | - black --check --verbose . + python3 -m black -t py38 --check . diff --git a/server/fractionator.py b/server/fractionator.py index 1985c9c..0e923f8 100644 --- a/server/fractionator.py +++ b/server/fractionator.py @@ -3,18 +3,19 @@ import os from typing import Optional -from cryptography.hazmat.primitives.ciphers import algorithms +from cryptography.hazmat.primitives.ciphers import algorithms, modes from fraction import Fraction import utils -class Fractionator(utils.AES_CFB_HELPER): +class Fractionator(utils.AES_WITH_IV_HELPER): MAGIC: int = 0xDEADBEEF CHUNK_SIZE: int = 8192 FRACTION_PATH_LEN: int = 16 algorithm = algorithms.AES256 - + mode = modes.CBC + def __init__(self, file_path: str, out_path: str, key: bytes) -> None: """Prepare a Fractionator object for reading and generating fractions.""" self.file_path: str = file_path @@ -26,7 +27,7 @@ def __init__(self, file_path: str, out_path: str, key: bytes) -> None: self._buf_reader: Optional[io.BufferedReader] = None - super().__init__(key, self.algorithm) + super().__init__(key, self.algorithm, self.mode) def open_reading_stream(self) -> None: """Open a stream for reading the object file.""" diff --git a/server/utils.py b/server/utils.py index 58654d5..ed0000c 100644 --- a/server/utils.py +++ b/server/utils.py @@ -1,36 +1,52 @@ import random import string import secrets -from typing import Optional +from typing import Optional, Type from cryptography.hazmat.primitives.ciphers import Cipher, modes +from cryptography.hazmat.primitives._cipheralgorithm import BlockCipherAlgorithm +from cryptography.hazmat.primitives import padding - -class AES_CFB_HELPER: +class AES_WITH_IV_HELPER: LENGTH_IV: int = 16 - def __init__(self, key: bytes, algorithm) -> None: - self.algorithm = algorithm - self.mode = modes.CFB - self.key: bytes = key + def __init__(self, key: bytes, algorithm: Type[BlockCipherAlgorithm], mode: Type[modes.ModeWithInitializationVector]) -> None: + self.key = key + + self.algorithm = algorithm(self.key) + self.mode = mode + + self.padder_ctx = padding.PKCS7(self.algorithm.block_size) + self._iv: Optional[bytes] = None + + def pad(self, data: bytes) -> bytes: + padder = self.padder_ctx.padder() + return padder.update(data) + padder.finalize() + + def unpad(self, data: bytes) -> bytes: + unpadder = self.padder_ctx.unpadder() + return unpadder.update(data) + unpadder.finalize() def get_cipher(self, iv: bytes) -> Cipher: """Return a cipher instance.""" - return Cipher(self.algorithm(self.key), self.mode(iv)) + return Cipher(self.algorithm, self.mode(iv)) def get_iv(self, new: bool = False) -> bytes: """Generate or reuse initialization vector (IV).""" if not self._iv or new: - return secrets.token_bytes(self.LENGTH_IV) + self._iv = secrets.token_bytes(self.LENGTH_IV) return self._iv def encrypt(self, data: bytes) -> bytes: - """Encrypt data using AES-CFB mode.""" - cipher = self.get_cipher(self.get_iv(True)) - operator = cipher.encryptor() - return operator.update(data) + operator.finalize() - + """Encrypt data""" + encryptor = self.get_cipher(self.get_iv(True)).encryptor() + return encryptor.update(self.pad(data)) + encryptor.finalize() + + def decrypt(self, data: bytes, iv: bytes) -> bytes: + """Decrypt data""" + decryptor = self.get_cipher(iv).decryptor() + return self.unpad(decryptor.update(data) + decryptor.finalize()) def random_string(n: int = 16, sample: str = string.ascii_lowercase + string.digits): """Returns a random string using the characters defined in sample"""