From ed67b9e9bb2720791ef8cadacf2a77762bd3d9e0 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Wed, 13 Nov 2024 17:12:21 +0200 Subject: [PATCH] Support SSH CA generation Fixes https://github.com/romanz/trezor-agent/issues/491. Usage example: ```bash trezor-agent -v 'SSH Certificate Authority' > /path/to/trezor-ca.pub echo 'TrustedUserCAKeys /etc/ssh/trezor-ca.pub' | sudo tee -a /etc/ssh/sshd_config ssh-keygen -t ed25519 -f user-key trezor-agent -v 'SSH Certificate Authority' -- \ ssh-keygen -Us trezor-ca.pub -V '+10m' -I user-id -n user user-key.pub ssh user@localhost -o CertificateFile=user-key-cert.pub -i user-key ``` --- libagent/ssh/client.py | 70 +++++++++++++++++++++++++++++------------- 1 file changed, 49 insertions(+), 21 deletions(-) diff --git a/libagent/ssh/client.py b/libagent/ssh/client.py index aa3b47cc..e2b2684b 100644 --- a/libagent/ssh/client.py +++ b/libagent/ssh/client.py @@ -10,6 +10,11 @@ log = logging.getLogger(__name__) +SUPPORTED_CERT_TYPES = { + formats.SSH_ED25519_CERT_TYPE, + formats.SSH_NIST256_CERT_TYPE, +} + class Client: """Client wrapper for SSH authentication device.""" @@ -31,22 +36,17 @@ def export_public_keys(self, identities): def sign_ssh_challenge(self, blob, identity): """Sign given blob using a private key on the device.""" - log.debug('blob: %r', blob) + log.debug('blob (%d bytes): %r', len(blob), blob) msg = parse_ssh_blob(blob) + log.debug('parsed: %r', msg) + + identity_str = identity.to_string() if msg['sshsig']: log.info('please confirm "%s" signature for "%s" using %s...', - msg['namespace'], identity.to_string(), self.device) + msg['namespace'], identity_str, self.device) else: - log.debug('%s: user %r via %r (%r)', - msg['conn'], msg['user'], msg['auth'], msg['key_type']) - log.debug('nonce: %r', msg['nonce']) - fp = msg['public_key']['fingerprint'] - log.debug('fingerprint: %s', fp) - log.debug('hidden challenge size: %d bytes', len(blob)) - - log.info('please confirm user "%s" login to "%s" using %s...', - msg['user'].decode('ascii'), identity.to_string(), - self.device) + log.info('please confirm "%s" signature for "%s" using %s...', + msg['key_type'].decode('ascii'), identity_str, self.device) with self.device: return self.device.sign(blob=blob, identity=identity) @@ -66,17 +66,45 @@ def parse_ssh_blob(data): else: i = io.BytesIO(data) res['sshsig'] = False - res['nonce'] = util.read_frame(i) - i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108) - res['user'] = util.read_frame(i) - res['conn'] = util.read_frame(i) - res['auth'] = util.read_frame(i) - i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056) - res['key_type'] = util.read_frame(i) - public_key = util.read_frame(i) - res['public_key'] = formats.parse_pubkey(public_key) + first_frame = util.read_frame(i) + # https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys + is_cert = first_frame in SUPPORTED_CERT_TYPES + if is_cert: + # see `sshkey_certify_custom()` for details: + # https://github.com/openssh/openssh-portable/blob/master/sshkey.c + res['key_type'] = first_frame + res['nonce'] = util.read_frame(i) + if first_frame == formats.SSH_NIST256_CERT_TYPE: + res['curve'] = util.read_frame(i) + res['pubkey'] = util.read_frame(i) + res['serial_number'] = util.recv(i, '>Q') + res['type'] = util.recv(i, '>L') + res['key_id'] = util.read_frame(i) + res['valid_principals'] = tuple(_iter_parse_list(util.read_frame(i))) + res['valid_after'] = util.recv(i, '>Q') + res['valid_before'] = util.recv(i, '>Q') + res['critical_options'] = tuple(_iter_parse_list(util.read_frame(i))) + res['extensions'] = tuple(_iter_parse_list(util.read_frame(i))) + res['reserved'] = util.read_frame(i) + res['signature_key'] = util.read_frame(i) + else: + res['nonce'] = first_frame + i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108) + res['user'] = util.read_frame(i) + res['conn'] = util.read_frame(i) + res['auth'] = util.read_frame(i) + i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056) + res['key_type'] = util.read_frame(i) + public_key = util.read_frame(i) + res['public_key'] = formats.parse_pubkey(public_key) unparsed = i.read() if unparsed: log.warning('unparsed blob: %r', unparsed) return res + + +def _iter_parse_list(blob): + i = io.BytesIO(blob) + while i.tell() < len(blob): + yield util.read_frame(i)