From 5d50663deee996654e68f9f0a46b7c13226f4afb Mon Sep 17 00:00:00 2001 From: "Matthias L. Jugel" Date: Sun, 5 May 2019 19:03:35 +0200 Subject: [PATCH 1/3] add ecdsa functions to ks --- ubirch/ubirch_ks.py | 76 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/ubirch/ubirch_ks.py b/ubirch/ubirch_ks.py index 998ce6a..a19fbaa 100644 --- a/ubirch/ubirch_ks.py +++ b/ubirch/ubirch_ks.py @@ -15,30 +15,38 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import hashlib from datetime import datetime, timedelta from logging import getLogger from os import urandom from uuid import UUID +import ecdsa import ed25519 -from ed25519 import SigningKey, VerifyingKey from jks import jks, AlgorithmIdentifier, rfc5208, TrustedCertEntry from pyasn1.codec.ber import encoder logger = getLogger(__name__) -ECC_ENCRYPTION_OID = (1, 2, 1, 3, 101, 112) +EDDSA_OID = (1, 2, 1, 3, 101, 112) +ECDSA_OID = (1, 2, 840, 10045, 4, 3, 2) class ED25519Certificate(TrustedCertEntry): - def __init__(self, alias: str, verifying_key: VerifyingKey, **kwargs): + def __init__(self, alias: str, verifying_key: ed25519.VerifyingKey, **kwargs): super().__init__(**kwargs) self.alias = alias self.cert = verifying_key.to_bytes() self.timestamp = int(datetime.utcnow().timestamp()) +class ECDSACertificate(TrustedCertEntry): + + def __init__(self, alias: str, verifying_key: ecdsa.VerifyingKey, **kwargs): + super().__init__(**kwargs) + self.alias = alias + self.cert = verifying_key.to_string() + self.timestamp = int(datetime.utcnow().timestamp()) class KeyStore(object): """ @@ -60,24 +68,24 @@ def _load_keys(self) -> None: logger.warning("creating new key store: {}".format(self._ks_file)) self._ks = jks.KeyStore.new("jks", []) - def insert_ed25519_signing_key(self, uuid, sk: SigningKey): + def insert_ed25519_signing_key(self, uuid, sk: ed25519.SigningKey): """Insert an existing ED25519 signing key.""" # encode the ED25519 private key as PKCS#8 private_key_info = rfc5208.PrivateKeyInfo() private_key_info.setComponentByName('version', 'v1') a = AlgorithmIdentifier() - a.setComponentByName('algorithm', ECC_ENCRYPTION_OID) + a.setComponentByName('algorithm', EDDSA_OID) private_key_info.setComponentByName('privateKeyAlgorithm', a) private_key_info.setComponentByName('privateKey', sk.to_bytes()) pkey_pkcs8 = encoder.encode(private_key_info) pke = jks.PrivateKeyEntry.new(alias=str(uuid.hex), certs=[], key=pkey_pkcs8) self._ks.entries['pke_' + uuid.hex] = pke - def insert_ed25519_verifying_key(self, uuid, vk: VerifyingKey): + def insert_ed25519_verifying_key(self, uuid, vk: ed25519.VerifyingKey): # store verifying key in certificate store self._ks.entries[uuid.hex] = ED25519Certificate(uuid.hex, vk) - def insert_ed25519_keypair(self, uuid: UUID, vk: VerifyingKey, sk: SigningKey) -> (VerifyingKey, SigningKey): + def insert_ed25519_keypair(self, uuid: UUID, vk: ed25519.VerifyingKey, sk: ed25519.SigningKey) -> (ed25519.VerifyingKey, ed25519.SigningKey): """Insert an existing ED25519 key pair into the key store.""" if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs: raise Exception("uuid '{}' already exists in keystore".format(uuid.hex)) @@ -88,11 +96,46 @@ def insert_ed25519_keypair(self, uuid: UUID, vk: VerifyingKey, sk: SigningKey) - logger.info("inserted new key pair for {}: {}".format(uuid.hex, bytes.decode(vk.to_ascii(encoding='hex')))) return (vk, sk) - def create_ed25519_keypair(self, uuid: UUID) -> (VerifyingKey, SigningKey): + def create_ed25519_keypair(self, uuid: UUID) -> (ed25519.VerifyingKey, ed25519.SigningKey): """Create a new ED25519 key pair and store in key store.""" sk, vk = ed25519.create_keypair(entropy=urandom) return self.insert_ed25519_keypair(uuid, vk, sk) + def insert_ecdsa_signing_key(self, uuid, sk: ecdsa.SigningKey): + """Insert an existing ECDSA signing key.""" + # encode the ECDSA private key as PKCS#8 + private_key_info = rfc5208.PrivateKeyInfo() + private_key_info.setComponentByName('version', 'v1') + a = AlgorithmIdentifier() + a.setComponentByName('algorithm', ECDSA_OID) + private_key_info.setComponentByName('privateKeyAlgorithm', a) + private_key_info.setComponentByName('privateKey', sk.to_string()) + pkey_pkcs8 = encoder.encode(private_key_info) + pke = jks.PrivateKeyEntry.new(alias=str(uuid.hex), certs=[], key=pkey_pkcs8) + self._ks.entries['pke_' + uuid.hex] = pke + + def insert_ecdsa_verifying_key(self, uuid, vk: ecdsa.VerifyingKey): + # store verifying key in certificate store + self._ks.entries[uuid.hex] = ED25519Certificate(uuid.hex, vk) + + def insert_ecdsa_keypair(self, uuid: UUID, vk: ecdsa.VerifyingKey, sk: ecdsa.SigningKey) -> (ecdsa.VerifyingKey, ecdsa.SigningKey): + """Insert an existing ECDSA key pair into the key store.""" + if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs: + raise Exception("uuid '{}' already exists in keystore".format(uuid.hex)) + + self.insert_ed25519_verifying_key(uuid, vk) + self.insert_ed25519_signing_key(uuid, sk) + self._ks.save(self._ks_file, self._ks_password) + logger.info("inserted new key pair for {}: {}".format(uuid.hex, bytes.decode(vk.to_string()))) + return (vk, sk) + + def create_ecdsa_keypair(self, uuid: UUID, curve: ecdsa.curves.Curve = ecdsa.NIST256p, hashfunc=hashlib.sha256) -> (ecdsa.VerifyingKey, ecdsa.SigningKey): + """Create new ECDSA key pair and store in key store""" + + sk = ecdsa.SigningKey.generate(curve, entropy=urandom, hashfunc=hashfunc) + vk = sk.get_verifying_key() + return self.insert_ecdsa_keypair(uuid, vk, sk) + def exists_signing_key(self, uuid: UUID): """Check whether this UUID has a signing key in the key store.""" return 'pke_' + uuid.hex in self._ks.private_keys @@ -101,14 +144,21 @@ def exists_verifying_key(self, uuid: UUID): """Check whether this UUID has a verifying key in the key store.""" return uuid.hex in self._ks.certs - def find_signing_key(self, uuid: UUID) -> SigningKey: + def find_signing_key(self, uuid: UUID) -> ed25519.SigningKey or ecdsa.SigningKey: """Find the signing key for this UUID.""" sk = self._ks.private_keys['pke_' + uuid.hex] - return SigningKey(sk.pkey) - - def find_verifying_key(self, uuid: UUID) -> VerifyingKey: + if sk._algorithm_oid == EDDSA_OID: + return ed25519.SigningKey(sk.pkey) + elif sk._algorithm_oid == ECDSA_OID: + return ecdsa.SigningKey(sk.pkey) + else: + raise Exception("stored key with unknown algorithm OID: '{}'".format(sk._algorithm_oid)) + + def find_verifying_key(self, uuid: UUID) -> ed25519.VerifyingKey or ecdsa.VerifyingKey: """Find the verifying key for this UUID.""" cert = self._ks.certs[uuid.hex] + + if cert. return VerifyingKey(cert.cert) def get_certificate(self, uuid: UUID) -> dict or None: From 8dc2bcdc3e69d327d37afa4a85743114378b463a Mon Sep 17 00:00:00 2001 From: Roxana Meixner Date: Wed, 20 Jan 2021 14:51:38 +0100 Subject: [PATCH 2/3] [WIP] fixed insert_ecdsa_keypair and insert_ecdsa_verifying_key methods to insert correct key types and added todos --- ubirch/ubirch_ks.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/ubirch/ubirch_ks.py b/ubirch/ubirch_ks.py index a19fbaa..5a99bf2 100644 --- a/ubirch/ubirch_ks.py +++ b/ubirch/ubirch_ks.py @@ -68,8 +68,8 @@ def _load_keys(self) -> None: logger.warning("creating new key store: {}".format(self._ks_file)) self._ks = jks.KeyStore.new("jks", []) - def insert_ed25519_signing_key(self, uuid, sk: ed25519.SigningKey): - """Insert an existing ED25519 signing key.""" + def insert_ed25519_signing_key(self, uuid: UUID, sk: ed25519.SigningKey): + """Store an existing ED25519 signing key in the key store.""" # encode the ED25519 private key as PKCS#8 private_key_info = rfc5208.PrivateKeyInfo() private_key_info.setComponentByName('version', 'v1') @@ -81,12 +81,13 @@ def insert_ed25519_signing_key(self, uuid, sk: ed25519.SigningKey): pke = jks.PrivateKeyEntry.new(alias=str(uuid.hex), certs=[], key=pkey_pkcs8) self._ks.entries['pke_' + uuid.hex] = pke - def insert_ed25519_verifying_key(self, uuid, vk: ed25519.VerifyingKey): - # store verifying key in certificate store + def insert_ed25519_verifying_key(self, uuid: UUID, vk: ed25519.VerifyingKey): + """Store an existing ED25519 verifying key in the key store.""" self._ks.entries[uuid.hex] = ED25519Certificate(uuid.hex, vk) - def insert_ed25519_keypair(self, uuid: UUID, vk: ed25519.VerifyingKey, sk: ed25519.SigningKey) -> (ed25519.VerifyingKey, ed25519.SigningKey): - """Insert an existing ED25519 key pair into the key store.""" + def insert_ed25519_keypair(self, uuid: UUID, vk: ed25519.VerifyingKey, sk: ed25519.SigningKey) -> ( + ed25519.VerifyingKey, ed25519.SigningKey): + """Store an existing ED25519 key pair in the key store.""" if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs: raise Exception("uuid '{}' already exists in keystore".format(uuid.hex)) @@ -116,17 +117,17 @@ def insert_ecdsa_signing_key(self, uuid, sk: ecdsa.SigningKey): def insert_ecdsa_verifying_key(self, uuid, vk: ecdsa.VerifyingKey): # store verifying key in certificate store - self._ks.entries[uuid.hex] = ED25519Certificate(uuid.hex, vk) + self._ks.entries[uuid.hex] = ECDSACertificate(uuid.hex, vk) def insert_ecdsa_keypair(self, uuid: UUID, vk: ecdsa.VerifyingKey, sk: ecdsa.SigningKey) -> (ecdsa.VerifyingKey, ecdsa.SigningKey): """Insert an existing ECDSA key pair into the key store.""" if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs: raise Exception("uuid '{}' already exists in keystore".format(uuid.hex)) - self.insert_ed25519_verifying_key(uuid, vk) - self.insert_ed25519_signing_key(uuid, sk) + self.insert_ecdsa_verifying_key(uuid, vk) + self.insert_ecdsa_signing_key(uuid, sk) self._ks.save(self._ks_file, self._ks_password) - logger.info("inserted new key pair for {}: {}".format(uuid.hex, bytes.decode(vk.to_string()))) + logger.info("inserted new key pair for {}: {}".format(uuid.hex, vk.to_string().decode())) return (vk, sk) def create_ecdsa_keypair(self, uuid: UUID, curve: ecdsa.curves.Curve = ecdsa.NIST256p, hashfunc=hashlib.sha256) -> (ecdsa.VerifyingKey, ecdsa.SigningKey): @@ -147,7 +148,7 @@ def exists_verifying_key(self, uuid: UUID): def find_signing_key(self, uuid: UUID) -> ed25519.SigningKey or ecdsa.SigningKey: """Find the signing key for this UUID.""" sk = self._ks.private_keys['pke_' + uuid.hex] - if sk._algorithm_oid == EDDSA_OID: + if sk._algorithm_oid == EDDSA_OID: # FIXME sk._algorithm_oid is None return ed25519.SigningKey(sk.pkey) elif sk._algorithm_oid == ECDSA_OID: return ecdsa.SigningKey(sk.pkey) @@ -158,10 +159,14 @@ def find_verifying_key(self, uuid: UUID) -> ed25519.VerifyingKey or ecdsa.Verify """Find the verifying key for this UUID.""" cert = self._ks.certs[uuid.hex] - if cert. - return VerifyingKey(cert.cert) + # try: + # vk = ed25519.VerifyingKey(cert.cert) + # except + + return ed25519.VerifyingKey(cert.cert) def get_certificate(self, uuid: UUID) -> dict or None: + """Get the public key info for key registration""" if not uuid.hex in self._ks.certs: return None @@ -172,7 +177,7 @@ def get_certificate(self, uuid: UUID) -> dict or None: # TODO fix handling of key validity not_after = created + timedelta(days=365) return { - "algorithm": 'ECC_ED25519', + "algorithm": 'ECC_ED25519', # TODO if ECDSA 'ecdsa-p256v1' "created": int(created.timestamp()), "hwDeviceId": uuid.bytes, "pubKey": vk.to_bytes(), From e59110eb5570fbcb1a3112a3fd137b68478a6319 Mon Sep 17 00:00:00 2001 From: Roxana Meixner Date: Wed, 20 Jan 2021 14:58:57 +0100 Subject: [PATCH 3/3] script to test keystore --- examples/test-keystore.py | 66 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 examples/test-keystore.py diff --git a/examples/test-keystore.py b/examples/test-keystore.py new file mode 100644 index 0000000..a4ffa41 --- /dev/null +++ b/examples/test-keystore.py @@ -0,0 +1,66 @@ +import binascii +import hashlib +import json +import logging +import random +import time +from uuid import UUID + +import ubirch +from ubirch.ubirch_protocol import UBIRCH_PROTOCOL_TYPE_BIN + +logging.basicConfig(format='%(asctime)s %(name)20.20s %(levelname)-8.8s %(message)s', level=logging.DEBUG) +logger = logging.getLogger() + + +######################################################################## +# Implement the ubirch-protocol with signing and saving the signatures +class Proto(ubirch.Protocol): + + def __init__(self, key_store: ubirch.KeyStore, uuid: UUID) -> None: + super().__init__() + self.__ks = key_store + + # check if the device already has keys or generate a new pair + if not keystore.exists_signing_key(uuid): + keystore.create_ecdsa_keypair(uuid) + + def _sign(self, uuid: UUID, message: bytes) -> bytes: + return self.__ks.find_signing_key(uuid).sign(message) + + def _verify(self, uuid: UUID, message: bytes, signature: bytes): + return self.__ks.find_verifying_key(uuid).verify(signature, message) + + +######################################################################## + +uuid = UUID(hex="c8e5026e-5aef-4ad1-a7f0-1111820bf060") + +# create a keystore for the device +keystore = ubirch.KeyStore("demo-device.jks", "keystore") + +# create an instance of the protocol with signature saving +protocol = Proto(keystore, uuid) + +# create a message like being sent to the customer backend +# include an ID and timestamp in the data message to ensure a unique hash +message = { + "id": str(uuid), + "ts": int(time.time()), + "data": "{:d}".format(random.randint(0, 100)) +} + +# create a compact rendering of the message to ensure determinism when creating the hash +serialized = json.dumps(message, separators=(',', ':'), sort_keys=True, ensure_ascii=False).encode() + +# hash the message +message_hash = hashlib.sha256(serialized).digest() +logger.info("message hash: {}".format(binascii.b2a_base64(message_hash).decode().rstrip("\n"))) + +# create a new chained protocol message with the message hash +upp = protocol.message_chained(uuid, UBIRCH_PROTOCOL_TYPE_BIN, message_hash) +logger.info("UPP: {}".format(binascii.hexlify(upp).decode())) + +# verify the upp +unpacked = protocol.message_verify(upp) +print("UPP verified")