-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
cb7c343
commit be36981
Showing
1 changed file
with
54 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -127,16 +127,6 @@ def test_manifest_validity_with_assertions( | |
|
||
## TAMPER FUNCTIONS | ||
|
||
|
||
def break_binding(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
# base64 decode policy from manifest.encryptionInformation.policy | ||
p = manifest.encryptionInformation.policy_object | ||
p.body.dataAttributes = [] | ||
p.body.dissem = ["[email protected]"] | ||
manifest.encryptionInformation.policy_object = p | ||
return manifest | ||
|
||
|
||
def change_last_three(byt: bytes) -> bytes: | ||
new_three = "".join( | ||
random.choices(string.ascii_lowercase + string.digits, k=3) | ||
|
@@ -146,15 +136,28 @@ def change_last_three(byt: bytes) -> bytes: | |
return change_last_three(byt) | ||
return byt[:-3] + new_three | ||
|
||
def change_policy(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
# base64 decode policy from manifest.encryptionInformation.policy | ||
p = manifest.encryptionInformation.policy_object | ||
p.body.dataAttributes = [] | ||
p.body.dissem = ["[email protected]"] | ||
manifest.encryptionInformation.policy_object = p | ||
return manifest | ||
|
||
def change_policy_binding(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
pb = manifest.encryptionInformation.keyAccess[0].policyBinding | ||
altered_pb = base64.b64encode(change_last_three(base64.b64decode(pb))) | ||
manifest.encryptionInformation.keyAccess[0].policyBinding = altered_pb | ||
return manifest | ||
|
||
def break_root_signature(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
def change_root_signature(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
root_sig = manifest.encryptionInformation.integrityInformation.rootSignature.sig | ||
altered_sig = base64.b64encode(change_last_three(base64.b64decode(root_sig))) | ||
manifest.encryptionInformation.integrityInformation.rootSignature.sig = altered_sig | ||
return manifest | ||
|
||
|
||
def break_segment_signature(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
def change_segment_hash(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
assert manifest.encryptionInformation.integrityInformation.segments | ||
segments = manifest.encryptionInformation.integrityInformation.segments | ||
# choose a random segment | ||
|
@@ -165,17 +168,36 @@ def break_segment_signature(manifest: tdfs.Manifest) -> tdfs.Manifest: | |
manifest.encryptionInformation.integrityInformation.segments[index] = segment | ||
return manifest | ||
|
||
def change_segment_size(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
assert manifest.encryptionInformation.integrityInformation.segments | ||
segments = manifest.encryptionInformation.integrityInformation.segments | ||
# choose a random segment | ||
index = random.randrange(len(segments)) | ||
segment = segments[index] | ||
segment.encryptedSegmentSize = segment.segmentSize + 1 | ||
manifest.encryptionInformation.integrityInformation.segments[index] = segment | ||
return manifest | ||
|
||
def change_encrypted_segment_size(manifest: tdfs.Manifest) -> tdfs.Manifest: | ||
assert manifest.encryptionInformation.integrityInformation.segments | ||
segments = manifest.encryptionInformation.integrityInformation.segments | ||
# choose a random segment | ||
index = random.randrange(len(segments)) | ||
segment = segments[index] | ||
segment.encryptedSegmentSize = segment.encryptedSegmentSize + 1 | ||
manifest.encryptionInformation.integrityInformation.segments[index] = segment | ||
return manifest | ||
|
||
## TAMPER TESTS | ||
|
||
## TAMPER TESTS | ||
|
||
def test_tdf_with_unbound_policy( | ||
encrypt_sdk: tdfs.sdk_type, decrypt_sdk: tdfs.sdk_type, pt_file: str, tmp_dir: str | ||
) -> None: | ||
skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
ct_file = do_encrypt_with(pt_file, encrypt_sdk, "ztdf", tmp_dir) | ||
assert os.path.isfile(ct_file) | ||
b_file = tdfs.update_manifest("unbound_policy", ct_file, break_binding) | ||
b_file = tdfs.update_manifest("unbound_policy", ct_file, change_policy) | ||
fname = os.path.basename(b_file).split(".")[0] | ||
rt_file = f"{tmp_dir}test-{fname}.untdf" | ||
try: | ||
|
@@ -192,7 +214,7 @@ def test_tdf_with_altered_root_sig( | |
skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
ct_file = do_encrypt_with(pt_file, encrypt_sdk, "ztdf", tmp_dir) | ||
assert os.path.isfile(ct_file) | ||
b_file = tdfs.update_manifest("broken_root_sig", ct_file, break_root_signature) | ||
b_file = tdfs.update_manifest("broken_root_sig", ct_file, change_root_signature) | ||
fname = os.path.basename(b_file).split(".")[0] | ||
rt_file = f"{tmp_dir}test-{fname}.untdf" | ||
try: | ||
|
@@ -209,7 +231,7 @@ def test_tdf_with_altered_seg_sig( | |
skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
ct_file = do_encrypt_with(pt_file, encrypt_sdk, "ztdf", tmp_dir) | ||
assert os.path.isfile(ct_file) | ||
b_file = tdfs.update_manifest("broken_seg_sig", ct_file, break_segment_signature) | ||
b_file = tdfs.update_manifest("broken_seg_sig", ct_file, change_segment_hash) | ||
fname = os.path.basename(b_file).split(".")[0] | ||
rt_file = f"{tmp_dir}test-{fname}.untdf" | ||
try: | ||
|
@@ -219,6 +241,22 @@ def test_tdf_with_altered_seg_sig( | |
assert b"signature" in exc.output | ||
assert b"tamper" in exc.output or b"IntegrityError" in exc.output | ||
|
||
def test_tdf_with_altered_seg_size( | ||
encrypt_sdk: tdfs.sdk_type, decrypt_sdk: tdfs.sdk_type, pt_file: str, tmp_dir: str | ||
): | ||
skip_hexless_skew(encrypt_sdk, decrypt_sdk) | ||
ct_file = do_encrypt_with(pt_file, encrypt_sdk, "ztdf", tmp_dir) | ||
assert os.path.isfile(ct_file) | ||
b_file = tdfs.update_manifest("broken_seg_size", ct_file, change_segment_size) | ||
fname = os.path.basename(b_file).split(".")[0] | ||
rt_file = f"{tmp_dir}test-{fname}.untdf" | ||
try: | ||
tdfs.decrypt(decrypt_sdk, b_file, rt_file, "ztdf") | ||
assert False, "decrypt succeeded unexpectedly" | ||
except subprocess.CalledProcessError as exc: | ||
assert b"segment" in exc.output | ||
assert b"tamper" in exc.output or b"IntegrityError" in exc.output | ||
|
||
|
||
## ASSERTION TESTS | ||
|
||
|