From 923600557acf666e82b49ccbdd112c4cf9f8a3ce Mon Sep 17 00:00:00 2001 From: Castedo Ellerman Date: Fri, 22 Nov 2024 14:28:16 +0000 Subject: [PATCH] check_signature + SignatureCriterion interface * check_signature method for Commit/Tag as alternative to verify * SignatureCriterion interface to decouple crypto from git serialization * single simple InvalidSignature exception generic to whatever criterion * DRY re-impl for Commit/Tag verify using GpgSignatureCriterion * ssh-keygen based SignatureCriterion classes in contrib --- dulwich/contrib/ssh_keygen_criterion.py | 115 ++++++++++++++++++++++++ dulwich/objects.py | 89 ++++++++++-------- 2 files changed, 168 insertions(+), 36 deletions(-) create mode 100644 dulwich/contrib/ssh_keygen_criterion.py diff --git a/dulwich/contrib/ssh_keygen_criterion.py b/dulwich/contrib/ssh_keygen_criterion.py new file mode 100644 index 000000000..f530e68b2 --- /dev/null +++ b/dulwich/contrib/ssh_keygen_criterion.py @@ -0,0 +1,115 @@ +# Copyright (c) 2024 E. Castedo Ellerman + +# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU +# General Public License as public by the Free Software Foundation; version 2.0 +# or (at your option) any later version. You can redistribute it and/or +# modify it under the terms of either of these two licenses. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# You should have received a copy of the licenses; if not, see +# for a copy of the GNU General Public License +# and for a copy of the Apache +# License, Version 2.0. +# fmt: off + +import subprocess +import tempfile +from datetime import datetime, timezone +from pathlib import Path + +from ..objects import InvalidSignature, SignatureCriterion + +# See the following C git implementation code for more details: +# https://archive.softwareheritage.org/swh:1:cnt:07335987a6b9ceaf6edc2da71c2e636b0513372f;origin=https://github.com/git/git;visit=swh:1:snp:e72051ba1b2437b7bf3ed0346d04b289f1393982;anchor=swh:1:rev:6a11438f43469f3815f2f0fc997bd45792ff04c0;path=/gpg-interface.c;lines=450 + +### WARNING! +### verify_time might or might not be in UTC. +### The following code might not be handling timezone correctly. + + +class SshKeygenCheckCriterion(SignatureCriterion): + """Checks signature using ssh-keygen -Y check-novalidate.""" + + def __init__(self, capture_output: bool = True): + self.capture_output = capture_output + + def _ssh_keygen_check( + self, subcmdline: list[str], crypto_msg: bytes, verify_time: int + ) -> None: + verify_dt = datetime.fromtimestamp(verify_time, tz=timezone.utc) + cmdline = [ + *subcmdline, + "-n", "git", + "-O", "verify-time=" + verify_dt.strftime("%Y%m%d%H%M%SZ"), + ] + result = subprocess.run( + cmdline, input=crypto_msg, capture_output=self.capture_output + ) + if 0 != result.returncode: + raise InvalidSignature + + def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: + with tempfile.NamedTemporaryFile() as sig_file: + sig_file.write(signature) + sig_file.flush() + subcmdline = ["ssh-keygen", "-Y", "check-novalidate", "-s", sig_file.name] + self._ssh_keygen_check(subcmdline, crypto_msg, verify_time) + + +class SshKeygenVerifyCriterion(SshKeygenCheckCriterion): + """Verifies signature using ssh-keygen -Y verify.""" + + def __init__(self, allowed_signers: Path, capture_output: bool = True): + super().__init__(capture_output) + self.allowed_signers = str(allowed_signers) + + def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: + with tempfile.NamedTemporaryFile() as sig_file: + sig_file.write(signature) + sig_file.flush() + cmdline = [ + "ssh-keygen", "-Y", "find-principals", + "-s", sig_file.name, + "-f", self.allowed_signers, + ] + result = subprocess.run(cmdline, capture_output=True) + for principal in result.stdout.splitlines(): + subcmdline = [ + "ssh-keygen", "-Y", "verify", + "-f", self.allowed_signers, + "-I", str(principal), + "-s", sig_file.name, + ] + self._ssh_keygen_check(subcmdline, crypto_msg, verify_time) + +#ruff: noqa: I001 + +if __name__ == "__main__": + import argparse + import dulwich.repo + + parser = argparse.ArgumentParser() + parser.add_argument("git_object", default="HEAD", nargs="?") + parser.add_argument("--allow", type=Path, help="ssh-keygen allowed signers file") + args = parser.parse_args() + + if args.allow is None: + criterion = SshKeygenCheckCriterion(capture_output=False) + else: + criterion = SshKeygenVerifyCriterion(args.allow, capture_output=False) + + repo = dulwich.repo.Repo(".") + commit = repo[args.git_object.encode()] + print("commit", commit.id.decode()) + try: + commit.check_signature(criterion) + # signature good or not signed + except InvalidSignature: + pass + print("Author:", commit.author.decode()) + print("\n ", commit.message.decode()) diff --git a/dulwich/objects.py b/dulwich/objects.py index 31ffe1e2a..47ccfd339 100644 --- a/dulwich/objects.py +++ b/dulwich/objects.py @@ -85,6 +85,19 @@ class EmptyFileException(FileFormatException): """An unexpectedly empty file was encountered.""" +class InvalidSignature(Exception): + """A signature was rejected by a signature criterion.""" + + +class SignatureCriterion: + def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: + """Check/verify signature for a cryptographic message. + + Raises: + InvalidSignature + """ + + def S_ISGITLINK(m): """Check if a mode indicates a submodule. @@ -927,6 +940,10 @@ def raw_without_sig(self) -> bytes: ret = ret[: -len(self._signature)] return ret + def check_signature(self, criterion: SignatureCriterion) -> None: + if self.signature: + criterion.check(self.raw_without_sig(), self.signature, self.tag_time) + def verify(self, keyids: Optional[Iterable[str]] = None) -> None: """Verify GPG signature for this tag (if it is signed). @@ -941,24 +958,10 @@ def verify(self, keyids: Optional[Iterable[str]] = None) -> None: gpg.errors.MissingSignatures: if tag was not signed by a key specified in keyids """ - if self._signature is None: - return - - import gpg - - with gpg.Context() as ctx: - data, result = ctx.verify( - self.raw_without_sig(), - signature=self._signature, - ) - if keyids: - keys = [ctx.get_key(key) for key in keyids] - for key in keys: - for subkey in keys: - for sig in result.signatures: - if subkey.can_sign and subkey.fpr == sig.fpr: - return - raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) + try: + self.check_signature(GpgSignatureCriterion(keyids)) + except InvalidSignature as ex: + raise ex.__cause__ from None # type: ignore[misc] class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])): @@ -1531,6 +1534,10 @@ def raw_without_sig(self) -> bytes: tmp.gpgsig = None return tmp.as_raw_string() + def check_signature(self, criterion: SignatureCriterion) -> None: + if self.gpgsig: + criterion.check(self.raw_without_sig(), self.gpgsig, self.commit_time) + def verify(self, keyids: Optional[Iterable[str]] = None) -> None: """Verify GPG signature for this commit (if it is signed). @@ -1545,24 +1552,10 @@ def verify(self, keyids: Optional[Iterable[str]] = None) -> None: gpg.errors.MissingSignatures: if commit was not signed by a key specified in keyids """ - if self._gpgsig is None: - return - - import gpg - - with gpg.Context() as ctx: - data, result = ctx.verify( - self.raw_without_sig(), - signature=self._gpgsig, - ) - if keyids: - keys = [ctx.get_key(key) for key in keyids] - for key in keys: - for subkey in keys: - for sig in result.signatures: - if subkey.can_sign and subkey.fpr == sig.fpr: - return - raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) + try: + self.check_signature(GpgSignatureCriterion(keyids)) + except InvalidSignature as ex: + raise ex.__cause__ from None # type: ignore[misc] def _serialize(self): headers = [] @@ -1681,6 +1674,30 @@ def _get_extra(self): _TYPE_MAP[cls.type_num] = cls +class GpgSignatureCriterion(SignatureCriterion): + """Verifies GPG signature.""" + + def __init__(self, keyids: Optional[Iterable[str]] = None): + self.keyids = keyids + + def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: + import gpg + + with gpg.Context() as ctx: + try: + data, result = ctx.verify(crypto_msg, signature=signature) + except gpg.errors.BadSignatures as ex: + raise InvalidSignature from ex + if self.keyids is not None: + keys = [ctx.get_key(keyid) for keyid in self.keyids] + for key in keys: + for sig in result.signatures: + if key.can_sign and key.fpr == sig.fpr: + return + ex2 = gpg.errors.MissingSignatures(result, keys, results=(data, result)) + raise InvalidSignature from ex2 + + # Hold on to the pure-python implementations for testing _parse_tree_py = parse_tree _sorted_tree_items_py = sorted_tree_items