From 0352806839a4fe1b36a456d52a2c75b62de0cc98 Mon Sep 17 00:00:00 2001 From: taichunmin Date: Tue, 19 Nov 2024 02:36:19 +0800 Subject: [PATCH] `hf mf elog --decrypt` skip records with found keys --- CHANGELOG.md | 1 + software/script/chameleon_cli_unit.py | 90 +++++++++++---------- software/script/crypto1.py | 110 ++++++++++++++++++++++++++ software/script/tests/test_crypto1.py | 82 +++++++++++++++++++ 4 files changed, 237 insertions(+), 46 deletions(-) create mode 100644 software/script/crypto1.py create mode 100644 software/script/tests/test_crypto1.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 398b0b31..39641881 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ All notable changes to this project will be documented in this file. This project uses the changelog in accordance with [keepchangelog](http://keepachangelog.com/). Please use this to write notable changes, which is not the same as git commit log... ## [unreleased][unreleased] + - `hf mf elog --decrypt` skip records with found keys (@taichunmin) - Added command to check keys of multiple sectors at once (@taichunmin) - Fixed unused target key type parameter for nested (@petepriority) - Skip already used items `hf mf elog --decrypt` (@p-l-) diff --git a/software/script/chameleon_cli_unit.py b/software/script/chameleon_cli_unit.py index 6bcf9e9b..11d01887 100644 --- a/software/script/chameleon_cli_unit.py +++ b/software/script/chameleon_cli_unit.py @@ -25,6 +25,7 @@ from chameleon_enum import Command, Status, SlotNumber, TagSenseType, TagSpecificType from chameleon_enum import MifareClassicWriteMode, MifareClassicPrngType, MifareClassicDarksideStatus, MfcKeyType from chameleon_enum import AnimationMode, ButtonPressFunction, ButtonType, MfcValueBlockOperator +from crypto1 import Crypto1 # NXP IDs based on https://www.nxp.com/docs/en/application-note/AN10833.pdf type_id_SAK_dict = {0x00: "MIFARE Ultralight Classic/C/EV1/Nano | NTAG 2xx", @@ -1240,32 +1241,34 @@ def _run_mfkey32v2(items): class ItemGenerator: - def __init__(self, rs, i=0, j=1): - self.rs = rs + def __init__(self, rs, uid_found_keys = set()): + self.rs: list = rs + self.progress = 0 self.i = 0 self.j = 1 self.found = set() self.keys = set() + for known_key in uid_found_keys: + self.test_key(known_key) def __iter__(self): return self def __next__(self): - try: - item_i = self.rs[self.i] - except IndexError: - raise StopIteration - if self.key_from_item(item_i) in self.found: + size = len(self.rs) + if self.j >= size: self.i += 1 + if self.i >= size - 1: + raise StopIteration self.j = self.i + 1 - return next(self) - try: - item_j = self.rs[self.j] - except IndexError: + item_i, item_j = self.rs[self.i], self.rs[self.j] + self.progress += 1 + self.j += 1 + if self.key_from_item(item_i) in self.found: + self.progress += max(0, size - self.j) self.i += 1 self.j = self.i + 1 return next(self) - self.j += 1 if self.key_from_item(item_j) in self.found: return next(self) return item_i, item_j @@ -1274,17 +1277,20 @@ def __next__(self): def key_from_item(item): return "{uid}-{nt}-{nr}-{ar}".format(**item) - def key_found(self, key, items): - self.keys.add(key) - for item in items: - try: - if item == self.rs[self.i]: - self.i += 1 - self.j = self.i + 1 - except IndexError: - break - self.found.update(self.key_from_item(item) for item in items) - + def test_key(self, key, items = list()): + for item in self.rs: + item_key = self.key_from_item(item) + if item_key in self.found: + continue + if (item in items) or (Crypto1.mfkey32_is_reader_has_key( + int(item['uid'], 16), + int(item['nt'], 16), + int(item['nr'], 16), + int(item['ar'], 16), + key, + )): + self.keys.add(key) + self.found.add(item_key) @hf_mf.command('elog') class HFMFELog(DeviceRequiredUnit): @@ -1296,7 +1302,7 @@ def args_parser(self) -> ArgumentParserNoExit: parser.add_argument('--decrypt', action='store_true', help="Decrypt key from MF1 log list") return parser - def decrypt_by_list(self, rs: list): + def decrypt_by_list(self, rs: list, uid_found_keys: set = set()): """ Decrypt key from reconnaissance log list @@ -1306,16 +1312,14 @@ def decrypt_by_list(self, rs: list): msg1 = f" > {len(rs)} records => " msg2 = f"/{(len(rs)*(len(rs)-1))//2} combinations. " msg3 = " key(s) found" - n = 1 - gen = ItemGenerator(rs) + gen = ItemGenerator(rs, uid_found_keys) + print(f"{msg1}{gen.progress}{msg2}{len(gen.keys)}{msg3}\r", end="") with Pool(cpu_count()) as pool: for result in pool.imap(_run_mfkey32v2, gen): - # TODO: if some keys already recovered, test them on item before running mfkey32 on item if result is not None: - gen.key_found(*result) - print(f"{msg1}{n}{msg2}{len(gen.keys)}{msg3}\r", end="") - n += 1 - print() + gen.test_key(*result) + print(f"{msg1}{gen.progress}{msg2}{len(gen.keys)}{msg3}\r", end="") + print(f"{msg1}{gen.progress}{msg2}{len(gen.keys)}{msg3}") return gen.keys def on_exec(self, args: argparse.Namespace): @@ -1356,22 +1360,16 @@ def on_exec(self, args: argparse.Namespace): for uid in result_maps.keys(): print(f" - Detection log for uid [{uid.upper()}]") result_maps_for_uid = result_maps[uid] + uid_found_keys = set() for block in result_maps_for_uid: - print(f" > Block {block} detect log decrypting...") - if 'A' in result_maps_for_uid[block]: - # print(f" - A record: { result_maps[block]['A'] }") - records = result_maps_for_uid[block]['A'] - if len(records) > 1: - result_maps[uid][block]['A'] = self.decrypt_by_list(records) - else: - print(f" > {len(records)} record") - if 'B' in result_maps_for_uid[block]: - # print(f" - B record: { result_maps[block]['B'] }") - records = result_maps_for_uid[block]['B'] - if len(records) > 1: - result_maps[uid][block]['B'] = self.decrypt_by_list(records) - else: - print(f" > {len(records)} record") + for keyType in 'AB': + records = result_maps_for_uid[block][keyType] if keyType in result_maps_for_uid[block] else [] + if len(records) < 1: + continue + print(f" > Decrypting block {block} key {keyType} detect log...") + result_maps[uid][block][keyType] = self.decrypt_by_list(records, uid_found_keys) + uid_found_keys.update(result_maps[uid][block][keyType]) + print(" > Result ---------------------------") for block in result_maps_for_uid.keys(): if 'A' in result_maps_for_uid[block]: diff --git a/software/script/crypto1.py b/software/script/crypto1.py new file mode 100644 index 00000000..a0005a06 --- /dev/null +++ b/software/script/crypto1.py @@ -0,0 +1,110 @@ +import re + +LFSR48_FILTER_A = 0x9E98 +LFSR48_FILTER_B = 0xB48E +LFSR48_FILTER_C = 0xEC57E80A +LFSR48_POLY = 0xE882B0AD621 +U8_TO_ODD4 = [((i & 0x80) >> 4) + ((i & 0x20) >> 3) + ((i & 0x08) >> 2) + ((i & 0x02) >> 1) for i in range(256)] +EVEN_PARITY_U8 = [0 for i in range(256)] + +def u8_to_odd4(u8): + return U8_TO_ODD4[u8 & 0xFF] + +def get_bit(num, x = 0): + return (num >> x) & 1 + +for i in range(256): + tmp = i + tmp ^= tmp >> 4 + tmp ^= tmp >> 2 + EVEN_PARITY_U8[i] = (tmp ^ (tmp >> 1)) & 1 + +def even_parity_u8(u8): + return EVEN_PARITY_U8[u8 & 0xFF] + +def odd_parity_u8(u8): + return even_parity_u8(u8) ^ 1 + +def even_parity_u16(u16): + return even_parity_u8((u16 >> 8) ^ u16) + +def even_parity_u48(u48): + return even_parity_u16((u48 >> 32) ^ (u48 >> 16) ^ u48) + +def swap_endian_u16(u16): + return ((u16 & 0xFF) << 8) | ((u16 >> 8) & 0xFF) + +def swap_endian_u32(u32): + return swap_endian_u16(u32 & 0xFFFF) << 16 | swap_endian_u16((u32 >> 16) & 0xFFFF) + +""" +ref: https://web.archive.org/web/20081010065744/http://sar.informatik.hu-berlin.de/research/publications/SAR-PR-2008-21/SAR-PR-2008-21_.pdf +""" +class Crypto1: + def __init__(self, new_lfsr48: int = 0): + self.lfsr48 = new_lfsr48 + + @property + def key(self) -> bytearray: + tmp, key = self.lfsr48, bytearray(6) + for i in range(6): + key[i] = tmp & 0xFF + tmp >>= 8 + return key.hex() + + @key.setter + def key(self, key: str): + if not re.match(r"^[a-fA-F0-9]{12}$", key): + raise ValueError(f"Invalid hex format key: {key}") + tmp, self.lfsr48 = int(key, 16), 0 + for i in range(6): + self.lfsr48 = (self.lfsr48 << 8) | tmp & 0xFF + tmp >>= 8 + + def lfsr48_filter(self): + f = 0 + f |= get_bit(LFSR48_FILTER_B, u8_to_odd4(self.lfsr48 >> 8)) # fb4 + f |= get_bit(LFSR48_FILTER_A, u8_to_odd4(self.lfsr48 >> 16)) << 1 # fa4 + f |= get_bit(LFSR48_FILTER_A, u8_to_odd4(self.lfsr48 >> 24)) << 2 # fa4 + f |= get_bit(LFSR48_FILTER_B, u8_to_odd4(self.lfsr48 >> 32)) << 3 # fb4 + f |= get_bit(LFSR48_FILTER_A, u8_to_odd4(self.lfsr48 >> 40)) << 4 # fa4 + return get_bit(LFSR48_FILTER_C, f) + + def lfsr48_bit(self, bit_in: int = 0, is_encrypted: bool = False) -> int: + out_bit = self.lfsr48_filter() + bit_feedback = even_parity_u48(LFSR48_POLY & self.lfsr48) ^ (bit_in & 1) ^ (is_encrypted & out_bit) + self.lfsr48 = (bit_feedback << 47) | (self.lfsr48 >> 1) + return out_bit + + def lfsr48_u8(self, u8_in: int = 0, is_encrypted: bool = False) -> int: + out_u8 = 0 + for i in range(8): + tmp = self.lfsr48_bit(u8_in >> i, is_encrypted) << i + out_u8 |= tmp + return out_u8 + + def lfsr48_u32(self, u32_in: int = 0, is_encrypted: bool = False) -> int: + out_u32 = 0 + for i in range(3, -1, -1): + bit_offset = i << 3 + out_u32 |= self.lfsr48_u8(u32_in >> bit_offset, is_encrypted) << bit_offset + return out_u32 + + @staticmethod + def prng_next(lfsr32: int, n: int = 1) -> int: + lfsr32 = swap_endian_u32(lfsr32) + for i in range(n): + lfsr32 = even_parity_u8(0x2D & (lfsr32 >> 16)) << 31 | (lfsr32 >> 1) + return swap_endian_u32(lfsr32) + + @staticmethod + def mfkey32_is_reader_has_key(uid: int, nt: int, nrEnc: int, arEnc: int, key: str) -> bool: + state = Crypto1() + state.key = key + state.lfsr48_u32(uid ^ nt, False) # ks0 + state.lfsr48_u32(nrEnc, True) # ks1 + ks2 = state.lfsr48_u32(0, False) # ks2 + ar = arEnc ^ ks2 + result = ar == Crypto1.prng_next(nt, 64) + # print(f'uid: {hex(uid)}, nt: {hex(nt)}, nrEnc: {hex(nrEnc)}, arEnc: {hex(arEnc)}, key: {key}, result = {result}') + return result \ No newline at end of file diff --git a/software/script/tests/test_crypto1.py b/software/script/tests/test_crypto1.py new file mode 100644 index 00000000..db24cbe6 --- /dev/null +++ b/software/script/tests/test_crypto1.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +import os, sys, unittest + +CURRENT_DIR = os.path.split(os.path.abspath(__file__))[0] +config_path = CURRENT_DIR.rsplit(os.sep, 1)[0] +sys.path.append(config_path) +print(config_path) +from crypto1 import Crypto1 + +class TestCrypto1(unittest.TestCase): + + def test_key_getter_setter(self): + state = Crypto1() + state.key = 'a0a1a2a3a4a5' + self.assertEqual(state.key, 'a0a1a2a3a4a5') + + def test_prng_next(self): + self.assertEqual(Crypto1.prng_next(0x2C198BE4, 64), 0xCC14C013) + + def test_reader_three_pass_auth(self): + uid, nt, nr, atEnc = 0x65535D33, 0xBE2B7B5D, 0x0B4271BA, 0x36081500 + reader = Crypto1() + reader.key = '974C262B9278' + ks0 = reader.lfsr48_u32(uid ^ nt, False) + self.assertEqual(ks0, 0xAC93C1A4, 'ks0 assert failed') + ks1 = reader.lfsr48_u32(nr, False) + self.assertEqual(ks1, 0xBAA3C92B, 'ks1 assert failed') + nrEnc = nr ^ ks1 + self.assertEqual(nrEnc, 0xB1E1B891, 'nrEnc assert failed') + ar = Crypto1.prng_next(nt, 64) + self.assertEqual(ar, 0xF0928568, 'ar assert failed') + ks2 = reader.lfsr48_u32(0, False) + self.assertEqual(ks2, 0xDC652720, 'ks2 assert failed') + arEnc = ar ^ ks2 + self.assertEqual(arEnc, 0x2CF7A248, 'arEnc assert failed') + ks3 = reader.lfsr48_u32(0, False) + self.assertEqual(ks3, 0xC6F4A093, 'ks3 assert failed') + at = atEnc ^ ks3 + nt96 = Crypto1.prng_next(nt, 96) + self.assertEqual(at, nt96, 'at assert failed') + + def test_tag_three_pass_auth(self): + uid, nt, nrEnc, arEnc = 0x65535D33, 0xBE2B7B5D, 0xB1E1B891, 0x2CF7A248 + tag = Crypto1() + tag.key = '974C262B9278' + ks0 = tag.lfsr48_u32(uid ^ nt, False) + self.assertEqual(ks0, 0xAC93C1A4, 'ks0 assert failed') + ks1 = tag.lfsr48_u32(nrEnc, True) + self.assertEqual(ks1, 0xBAA3C92B, 'ks1 assert failed') + nr = ks1 ^ nrEnc + self.assertEqual(nr, 0x0B4271BA, 'nr assert failed') + ks2 = tag.lfsr48_u32(0, False) + self.assertEqual(ks2, 0xDC652720, 'ks2 assert failed') + ar = ks2 ^ arEnc + self.assertEqual(ar, 0xF0928568, 'ar assert failed') + at = Crypto1.prng_next(nt, 96) + self.assertEqual(at, 0xF0FCB593, 'at assert failed') + ks3 = tag.lfsr48_u32(0, False) + self.assertEqual(ks3, 0xC6F4A093, 'ks3 assert failed') + atEnc = at ^ ks3 + self.assertEqual(atEnc, 0x36081500, 'atEnc assert failed') + + def test_mfkey32_is_reader_has_key_true(self): + self.assertTrue(Crypto1.mfkey32_is_reader_has_key( + uid = 0x65535D33, + nt = 0x2C198BE4, + nrEnc = 0xFEDAC6D2, + arEnc = 0xCF0A3C7E, + key = 'A9AC67832330' + )) + + def test_mfkey32_is_reader_has_key_false(self): + self.assertFalse(Crypto1.mfkey32_is_reader_has_key( + uid = 0x65535D33, + nt = 0x2C198BE4, + nrEnc = 0xFEDAC6D2, + arEnc = 0xCF0A3C7E, + key = 'FFFFFFFFFFFF' + )) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file