Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ecdsa functions to ks #10

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions examples/test-keystore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import binascii
import hashlib
import json
import logging
import random
import time
from uuid import UUID

import ubirch
from ubirch.ubirch_protocol import UBIRCH_PROTOCOL_TYPE_BIN

logging.basicConfig(format='%(asctime)s %(name)20.20s %(levelname)-8.8s %(message)s', level=logging.DEBUG)
logger = logging.getLogger()


########################################################################
# Implement the ubirch-protocol with signing and saving the signatures
class Proto(ubirch.Protocol):

def __init__(self, key_store: ubirch.KeyStore, uuid: UUID) -> None:
super().__init__()
self.__ks = key_store

# check if the device already has keys or generate a new pair
if not keystore.exists_signing_key(uuid):
keystore.create_ecdsa_keypair(uuid)

def _sign(self, uuid: UUID, message: bytes) -> bytes:
return self.__ks.find_signing_key(uuid).sign(message)

def _verify(self, uuid: UUID, message: bytes, signature: bytes):
return self.__ks.find_verifying_key(uuid).verify(signature, message)


########################################################################

uuid = UUID(hex="c8e5026e-5aef-4ad1-a7f0-1111820bf060")

# create a keystore for the device
keystore = ubirch.KeyStore("demo-device.jks", "keystore")

# create an instance of the protocol with signature saving
protocol = Proto(keystore, uuid)

# create a message like being sent to the customer backend
# include an ID and timestamp in the data message to ensure a unique hash
message = {
"id": str(uuid),
"ts": int(time.time()),
"data": "{:d}".format(random.randint(0, 100))
}

# create a compact rendering of the message to ensure determinism when creating the hash
serialized = json.dumps(message, separators=(',', ':'), sort_keys=True, ensure_ascii=False).encode()

# hash the message
message_hash = hashlib.sha256(serialized).digest()
logger.info("message hash: {}".format(binascii.b2a_base64(message_hash).decode().rstrip("\n")))

# create a new chained protocol message with the message hash
upp = protocol.message_chained(uuid, UBIRCH_PROTOCOL_TYPE_BIN, message_hash)
logger.info("UPP: {}".format(binascii.hexlify(upp).decode()))

# verify the upp
unpacked = protocol.message_verify(upp)
print("UPP verified")
91 changes: 73 additions & 18 deletions ubirch/ubirch_ks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,38 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import hashlib
from datetime import datetime, timedelta
from logging import getLogger
from os import urandom
from uuid import UUID

import ecdsa
import ed25519
from ed25519 import SigningKey, VerifyingKey
from jks import jks, AlgorithmIdentifier, rfc5208, TrustedCertEntry
from pyasn1.codec.ber import encoder

logger = getLogger(__name__)

ECC_ENCRYPTION_OID = (1, 2, 1, 3, 101, 112)
EDDSA_OID = (1, 2, 1, 3, 101, 112)
ECDSA_OID = (1, 2, 840, 10045, 4, 3, 2)


class ED25519Certificate(TrustedCertEntry):

def __init__(self, alias: str, verifying_key: VerifyingKey, **kwargs):
def __init__(self, alias: str, verifying_key: ed25519.VerifyingKey, **kwargs):
super().__init__(**kwargs)
self.alias = alias
self.cert = verifying_key.to_bytes()
self.timestamp = int(datetime.utcnow().timestamp())

class ECDSACertificate(TrustedCertEntry):

def __init__(self, alias: str, verifying_key: ecdsa.VerifyingKey, **kwargs):
super().__init__(**kwargs)
self.alias = alias
self.cert = verifying_key.to_string()
self.timestamp = int(datetime.utcnow().timestamp())

class KeyStore(object):
"""
Expand All @@ -60,25 +68,26 @@ def _load_keys(self) -> None:
logger.warning("creating new key store: {}".format(self._ks_file))
self._ks = jks.KeyStore.new("jks", [])

def insert_ed25519_signing_key(self, uuid, sk: SigningKey):
"""Insert an existing ED25519 signing key."""
def insert_ed25519_signing_key(self, uuid: UUID, sk: ed25519.SigningKey):
"""Store an existing ED25519 signing key in the key store."""
# encode the ED25519 private key as PKCS#8
private_key_info = rfc5208.PrivateKeyInfo()
private_key_info.setComponentByName('version', 'v1')
a = AlgorithmIdentifier()
a.setComponentByName('algorithm', ECC_ENCRYPTION_OID)
a.setComponentByName('algorithm', EDDSA_OID)
private_key_info.setComponentByName('privateKeyAlgorithm', a)
private_key_info.setComponentByName('privateKey', sk.to_bytes())
pkey_pkcs8 = encoder.encode(private_key_info)
pke = jks.PrivateKeyEntry.new(alias=str(uuid.hex), certs=[], key=pkey_pkcs8)
self._ks.entries['pke_' + uuid.hex] = pke

def insert_ed25519_verifying_key(self, uuid, vk: VerifyingKey):
# store verifying key in certificate store
def insert_ed25519_verifying_key(self, uuid: UUID, vk: ed25519.VerifyingKey):
"""Store an existing ED25519 verifying key in the key store."""
self._ks.entries[uuid.hex] = ED25519Certificate(uuid.hex, vk)

def insert_ed25519_keypair(self, uuid: UUID, vk: VerifyingKey, sk: SigningKey) -> (VerifyingKey, SigningKey):
"""Insert an existing ED25519 key pair into the key store."""
def insert_ed25519_keypair(self, uuid: UUID, vk: ed25519.VerifyingKey, sk: ed25519.SigningKey) -> (
ed25519.VerifyingKey, ed25519.SigningKey):
"""Store an existing ED25519 key pair in the key store."""
if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs:
raise Exception("uuid '{}' already exists in keystore".format(uuid.hex))

Expand All @@ -88,11 +97,46 @@ def insert_ed25519_keypair(self, uuid: UUID, vk: VerifyingKey, sk: SigningKey) -
logger.info("inserted new key pair for {}: {}".format(uuid.hex, bytes.decode(vk.to_ascii(encoding='hex'))))
return (vk, sk)

def create_ed25519_keypair(self, uuid: UUID) -> (VerifyingKey, SigningKey):
def create_ed25519_keypair(self, uuid: UUID) -> (ed25519.VerifyingKey, ed25519.SigningKey):
"""Create a new ED25519 key pair and store in key store."""
sk, vk = ed25519.create_keypair(entropy=urandom)
return self.insert_ed25519_keypair(uuid, vk, sk)

def insert_ecdsa_signing_key(self, uuid, sk: ecdsa.SigningKey):
"""Insert an existing ECDSA signing key."""
# encode the ECDSA private key as PKCS#8
private_key_info = rfc5208.PrivateKeyInfo()
private_key_info.setComponentByName('version', 'v1')
a = AlgorithmIdentifier()
a.setComponentByName('algorithm', ECDSA_OID)
private_key_info.setComponentByName('privateKeyAlgorithm', a)
private_key_info.setComponentByName('privateKey', sk.to_string())
pkey_pkcs8 = encoder.encode(private_key_info)
pke = jks.PrivateKeyEntry.new(alias=str(uuid.hex), certs=[], key=pkey_pkcs8)
self._ks.entries['pke_' + uuid.hex] = pke

def insert_ecdsa_verifying_key(self, uuid, vk: ecdsa.VerifyingKey):
# store verifying key in certificate store
self._ks.entries[uuid.hex] = ECDSACertificate(uuid.hex, vk)

def insert_ecdsa_keypair(self, uuid: UUID, vk: ecdsa.VerifyingKey, sk: ecdsa.SigningKey) -> (ecdsa.VerifyingKey, ecdsa.SigningKey):
"""Insert an existing ECDSA key pair into the key store."""
if uuid.hex in self._ks.entries or uuid.hex in self._ks.certs:
raise Exception("uuid '{}' already exists in keystore".format(uuid.hex))

self.insert_ecdsa_verifying_key(uuid, vk)
self.insert_ecdsa_signing_key(uuid, sk)
self._ks.save(self._ks_file, self._ks_password)
logger.info("inserted new key pair for {}: {}".format(uuid.hex, vk.to_string().decode()))
return (vk, sk)

def create_ecdsa_keypair(self, uuid: UUID, curve: ecdsa.curves.Curve = ecdsa.NIST256p, hashfunc=hashlib.sha256) -> (ecdsa.VerifyingKey, ecdsa.SigningKey):
"""Create new ECDSA key pair and store in key store"""

sk = ecdsa.SigningKey.generate(curve, entropy=urandom, hashfunc=hashfunc)
vk = sk.get_verifying_key()
return self.insert_ecdsa_keypair(uuid, vk, sk)

def exists_signing_key(self, uuid: UUID):
"""Check whether this UUID has a signing key in the key store."""
return 'pke_' + uuid.hex in self._ks.private_keys
Expand All @@ -101,17 +145,28 @@ def exists_verifying_key(self, uuid: UUID):
"""Check whether this UUID has a verifying key in the key store."""
return uuid.hex in self._ks.certs

def find_signing_key(self, uuid: UUID) -> SigningKey:
def find_signing_key(self, uuid: UUID) -> ed25519.SigningKey or ecdsa.SigningKey:
"""Find the signing key for this UUID."""
sk = self._ks.private_keys['pke_' + uuid.hex]
return SigningKey(sk.pkey)

def find_verifying_key(self, uuid: UUID) -> VerifyingKey:
if sk._algorithm_oid == EDDSA_OID: # FIXME sk._algorithm_oid is None
return ed25519.SigningKey(sk.pkey)
elif sk._algorithm_oid == ECDSA_OID:
return ecdsa.SigningKey(sk.pkey)
else:
raise Exception("stored key with unknown algorithm OID: '{}'".format(sk._algorithm_oid))

def find_verifying_key(self, uuid: UUID) -> ed25519.VerifyingKey or ecdsa.VerifyingKey:
"""Find the verifying key for this UUID."""
cert = self._ks.certs[uuid.hex]
return VerifyingKey(cert.cert)

# try:
# vk = ed25519.VerifyingKey(cert.cert)
# except

return ed25519.VerifyingKey(cert.cert)

def get_certificate(self, uuid: UUID) -> dict or None:
"""Get the public key info for key registration"""
if not uuid.hex in self._ks.certs:
return None

Expand All @@ -122,7 +177,7 @@ def get_certificate(self, uuid: UUID) -> dict or None:
# TODO fix handling of key validity
not_after = created + timedelta(days=365)
return {
"algorithm": 'ECC_ED25519',
"algorithm": 'ECC_ED25519', # TODO if ECDSA 'ecdsa-p256v1'
"created": int(created.timestamp()),
"hwDeviceId": uuid.bytes,
"pubKey": vk.to_bytes(),
Expand Down