Skip to content

Commit

Permalink
Support SSH CA generation
Browse files Browse the repository at this point in the history
Fixes #491.

Usage example:

```bash
trezor-agent -v 'SSH Certificate Authority' > /path/to/trezor-ca.pub

echo 'TrustedUserCAKeys /etc/ssh/trezor-ca.pub' | sudo tee -a /etc/ssh/sshd_config

ssh-keygen -t ed25519 -f user-key

trezor-agent -v 'SSH Certificate Authority' -- \
  ssh-keygen -Us trezor-ca.pub -V '+10m' -I user-id -n user user-key.pub

ssh user@localhost -o CertificateFile=user-key-cert.pub -i user-key
```
  • Loading branch information
romanz committed Nov 23, 2024
1 parent 87f7117 commit ed67b9e
Showing 1 changed file with 49 additions and 21 deletions.
70 changes: 49 additions & 21 deletions libagent/ssh/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@

log = logging.getLogger(__name__)

SUPPORTED_CERT_TYPES = {
formats.SSH_ED25519_CERT_TYPE,
formats.SSH_NIST256_CERT_TYPE,
}


class Client:
"""Client wrapper for SSH authentication device."""
Expand All @@ -31,22 +36,17 @@ def export_public_keys(self, identities):

def sign_ssh_challenge(self, blob, identity):
"""Sign given blob using a private key on the device."""
log.debug('blob: %r', blob)
log.debug('blob (%d bytes): %r', len(blob), blob)
msg = parse_ssh_blob(blob)
log.debug('parsed: %r', msg)

identity_str = identity.to_string()
if msg['sshsig']:
log.info('please confirm "%s" signature for "%s" using %s...',
msg['namespace'], identity.to_string(), self.device)
msg['namespace'], identity_str, self.device)
else:
log.debug('%s: user %r via %r (%r)',
msg['conn'], msg['user'], msg['auth'], msg['key_type'])
log.debug('nonce: %r', msg['nonce'])
fp = msg['public_key']['fingerprint']
log.debug('fingerprint: %s', fp)
log.debug('hidden challenge size: %d bytes', len(blob))

log.info('please confirm user "%s" login to "%s" using %s...',
msg['user'].decode('ascii'), identity.to_string(),
self.device)
log.info('please confirm "%s" signature for "%s" using %s...',
msg['key_type'].decode('ascii'), identity_str, self.device)

with self.device:
return self.device.sign(blob=blob, identity=identity)
Expand All @@ -66,17 +66,45 @@ def parse_ssh_blob(data):
else:
i = io.BytesIO(data)
res['sshsig'] = False
res['nonce'] = util.read_frame(i)
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
res['user'] = util.read_frame(i)
res['conn'] = util.read_frame(i)
res['auth'] = util.read_frame(i)
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
res['key_type'] = util.read_frame(i)
public_key = util.read_frame(i)
res['public_key'] = formats.parse_pubkey(public_key)
first_frame = util.read_frame(i)
# https://github.com/openssh/openssh-portable/blob/master/PROTOCOL.certkeys
is_cert = first_frame in SUPPORTED_CERT_TYPES
if is_cert:
# see `sshkey_certify_custom()` for details:
# https://github.com/openssh/openssh-portable/blob/master/sshkey.c
res['key_type'] = first_frame
res['nonce'] = util.read_frame(i)
if first_frame == formats.SSH_NIST256_CERT_TYPE:
res['curve'] = util.read_frame(i)
res['pubkey'] = util.read_frame(i)
res['serial_number'] = util.recv(i, '>Q')
res['type'] = util.recv(i, '>L')
res['key_id'] = util.read_frame(i)
res['valid_principals'] = tuple(_iter_parse_list(util.read_frame(i)))
res['valid_after'] = util.recv(i, '>Q')
res['valid_before'] = util.recv(i, '>Q')
res['critical_options'] = tuple(_iter_parse_list(util.read_frame(i)))
res['extensions'] = tuple(_iter_parse_list(util.read_frame(i)))
res['reserved'] = util.read_frame(i)
res['signature_key'] = util.read_frame(i)
else:
res['nonce'] = first_frame
i.read(1) # SSH2_MSG_USERAUTH_REQUEST == 50 (from ssh2.h, line 108)
res['user'] = util.read_frame(i)
res['conn'] = util.read_frame(i)
res['auth'] = util.read_frame(i)
i.read(1) # have_sig == 1 (from sshconnect2.c, line 1056)
res['key_type'] = util.read_frame(i)
public_key = util.read_frame(i)
res['public_key'] = formats.parse_pubkey(public_key)

unparsed = i.read()
if unparsed:
log.warning('unparsed blob: %r', unparsed)
return res


def _iter_parse_list(blob):
i = io.BytesIO(blob)
while i.tell() < len(blob):
yield util.read_frame(i)

0 comments on commit ed67b9e

Please sign in to comment.