Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalized the AES helper class to work for all AES modes with IV #30

Merged
merged 3 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/black.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ jobs:
python -m pip install --upgrade pip
pip install -r requirements.txt
- run: |
black --check --verbose .
python3 -m black -t py38 --check .
9 changes: 5 additions & 4 deletions server/fractionator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
import os
from typing import Optional

from cryptography.hazmat.primitives.ciphers import algorithms
from cryptography.hazmat.primitives.ciphers import algorithms, modes

from fraction import Fraction
import utils


class Fractionator(utils.AES_CFB_HELPER):
class Fractionator(utils.AES_WITH_IV_HELPER):
MAGIC: int = 0xDEADBEEF
CHUNK_SIZE: int = 8192
FRACTION_PATH_LEN: int = 16
algorithm = algorithms.AES256

mode = modes.CBC

def __init__(self, file_path: str, out_path: str, key: bytes) -> None:
"""Prepare a Fractionator object for reading and generating fractions."""
self.file_path: str = file_path
Expand All @@ -26,7 +27,7 @@ def __init__(self, file_path: str, out_path: str, key: bytes) -> None:

self._buf_reader: Optional[io.BufferedReader] = None

super().__init__(key, self.algorithm)
super().__init__(key, self.algorithm, self.mode)

def open_reading_stream(self) -> None:
"""Open a stream for reading the object file."""
Expand Down
44 changes: 30 additions & 14 deletions server/utils.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,52 @@
import random
import string
import secrets
from typing import Optional
from typing import Optional, Type

from cryptography.hazmat.primitives.ciphers import Cipher, modes
from cryptography.hazmat.primitives._cipheralgorithm import BlockCipherAlgorithm
from cryptography.hazmat.primitives import padding


class AES_CFB_HELPER:
class AES_WITH_IV_HELPER:
LENGTH_IV: int = 16

def __init__(self, key: bytes, algorithm) -> None:
self.algorithm = algorithm
self.mode = modes.CFB
self.key: bytes = key
def __init__(self, key: bytes, algorithm: Type[BlockCipherAlgorithm], mode: Type[modes.ModeWithInitializationVector]) -> None:
self.key = key

self.algorithm = algorithm(self.key)
self.mode = mode

self.padder_ctx = padding.PKCS7(self.algorithm.block_size)

self._iv: Optional[bytes] = None

def pad(self, data: bytes) -> bytes:
padder = self.padder_ctx.padder()
return padder.update(data) + padder.finalize()

def unpad(self, data: bytes) -> bytes:
unpadder = self.padder_ctx.unpadder()
return unpadder.update(data) + unpadder.finalize()

def get_cipher(self, iv: bytes) -> Cipher:
"""Return a cipher instance."""
return Cipher(self.algorithm(self.key), self.mode(iv))
return Cipher(self.algorithm, self.mode(iv))

def get_iv(self, new: bool = False) -> bytes:
"""Generate or reuse initialization vector (IV)."""
if not self._iv or new:
return secrets.token_bytes(self.LENGTH_IV)
self._iv = secrets.token_bytes(self.LENGTH_IV)
return self._iv

def encrypt(self, data: bytes) -> bytes:
"""Encrypt data using AES-CFB mode."""
cipher = self.get_cipher(self.get_iv(True))
operator = cipher.encryptor()
return operator.update(data) + operator.finalize()

"""Encrypt data"""
encryptor = self.get_cipher(self.get_iv(True)).encryptor()
return encryptor.update(self.pad(data)) + encryptor.finalize()

def decrypt(self, data: bytes, iv: bytes) -> bytes:
"""Decrypt data"""
decryptor = self.get_cipher(iv).decryptor()
return self.unpad(decryptor.update(data) + decryptor.finalize())

def random_string(n: int = 16, sample: str = string.ascii_lowercase + string.digits):
"""Returns a random string using the characters defined in sample"""
Expand Down
Loading