Skip to content

Commit

Permalink
check_signature + SignatureCriterion interface
Browse files Browse the repository at this point in the history
* check_signature method for Commit/Tag as alternative to verify
* SignatureCriterion interface to decouple crypto from git serialization
* single simple InvalidSignature exception generic to whatever criterion
* DRY re-impl for Commit/Tag verify using GpgSignatureCriterion
* ssh-keygen based SignatureCriterion classes in contrib
  • Loading branch information
castedo committed Nov 22, 2024
1 parent d35b4e5 commit 9236005
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 36 deletions.
115 changes: 115 additions & 0 deletions dulwich/contrib/ssh_keygen_criterion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# Copyright (c) 2024 E. Castedo Ellerman <[email protected]>

# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU
# General Public License as public by the Free Software Foundation; version 2.0
# or (at your option) any later version. You can redistribute it and/or
# modify it under the terms of either of these two licenses.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# You should have received a copy of the licenses; if not, see
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache
# License, Version 2.0.
# fmt: off

import subprocess
import tempfile
from datetime import datetime, timezone
from pathlib import Path

from ..objects import InvalidSignature, SignatureCriterion

# See the following C git implementation code for more details:
# https://archive.softwareheritage.org/swh:1:cnt:07335987a6b9ceaf6edc2da71c2e636b0513372f;origin=https://github.com/git/git;visit=swh:1:snp:e72051ba1b2437b7bf3ed0346d04b289f1393982;anchor=swh:1:rev:6a11438f43469f3815f2f0fc997bd45792ff04c0;path=/gpg-interface.c;lines=450

### WARNING!
### verify_time might or might not be in UTC.
### The following code might not be handling timezone correctly.


class SshKeygenCheckCriterion(SignatureCriterion):
"""Checks signature using ssh-keygen -Y check-novalidate."""

def __init__(self, capture_output: bool = True):
self.capture_output = capture_output

def _ssh_keygen_check(
self, subcmdline: list[str], crypto_msg: bytes, verify_time: int
) -> None:
verify_dt = datetime.fromtimestamp(verify_time, tz=timezone.utc)
cmdline = [
*subcmdline,
"-n", "git",
"-O", "verify-time=" + verify_dt.strftime("%Y%m%d%H%M%SZ"),
]
result = subprocess.run(
cmdline, input=crypto_msg, capture_output=self.capture_output
)
if 0 != result.returncode:
raise InvalidSignature

def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None:
with tempfile.NamedTemporaryFile() as sig_file:
sig_file.write(signature)
sig_file.flush()
subcmdline = ["ssh-keygen", "-Y", "check-novalidate", "-s", sig_file.name]
self._ssh_keygen_check(subcmdline, crypto_msg, verify_time)


class SshKeygenVerifyCriterion(SshKeygenCheckCriterion):
"""Verifies signature using ssh-keygen -Y verify."""

def __init__(self, allowed_signers: Path, capture_output: bool = True):
super().__init__(capture_output)
self.allowed_signers = str(allowed_signers)

def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None:
with tempfile.NamedTemporaryFile() as sig_file:
sig_file.write(signature)
sig_file.flush()
cmdline = [
"ssh-keygen", "-Y", "find-principals",
"-s", sig_file.name,
"-f", self.allowed_signers,
]
result = subprocess.run(cmdline, capture_output=True)
for principal in result.stdout.splitlines():
subcmdline = [
"ssh-keygen", "-Y", "verify",
"-f", self.allowed_signers,
"-I", str(principal),
"-s", sig_file.name,
]
self._ssh_keygen_check(subcmdline, crypto_msg, verify_time)

#ruff: noqa: I001

if __name__ == "__main__":
import argparse
import dulwich.repo

parser = argparse.ArgumentParser()
parser.add_argument("git_object", default="HEAD", nargs="?")
parser.add_argument("--allow", type=Path, help="ssh-keygen allowed signers file")
args = parser.parse_args()

if args.allow is None:
criterion = SshKeygenCheckCriterion(capture_output=False)
else:
criterion = SshKeygenVerifyCriterion(args.allow, capture_output=False)

repo = dulwich.repo.Repo(".")
commit = repo[args.git_object.encode()]
print("commit", commit.id.decode())
try:
commit.check_signature(criterion)
# signature good or not signed
except InvalidSignature:
pass
print("Author:", commit.author.decode())
print("\n ", commit.message.decode())
89 changes: 53 additions & 36 deletions dulwich/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ class EmptyFileException(FileFormatException):
"""An unexpectedly empty file was encountered."""


class InvalidSignature(Exception):
"""A signature was rejected by a signature criterion."""


class SignatureCriterion:
def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None:
"""Check/verify signature for a cryptographic message.
Raises:
InvalidSignature
"""


def S_ISGITLINK(m):
"""Check if a mode indicates a submodule.
Expand Down Expand Up @@ -927,6 +940,10 @@ def raw_without_sig(self) -> bytes:
ret = ret[: -len(self._signature)]
return ret

def check_signature(self, criterion: SignatureCriterion) -> None:
if self.signature:
criterion.check(self.raw_without_sig(), self.signature, self.tag_time)

def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
"""Verify GPG signature for this tag (if it is signed).
Expand All @@ -941,24 +958,10 @@ def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
gpg.errors.MissingSignatures: if tag was not signed by a key
specified in keyids
"""
if self._signature is None:
return

import gpg

with gpg.Context() as ctx:
data, result = ctx.verify(
self.raw_without_sig(),
signature=self._signature,
)
if keyids:
keys = [ctx.get_key(key) for key in keyids]
for key in keys:
for subkey in keys:
for sig in result.signatures:
if subkey.can_sign and subkey.fpr == sig.fpr:
return
raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
try:
self.check_signature(GpgSignatureCriterion(keyids))
except InvalidSignature as ex:
raise ex.__cause__ from None # type: ignore[misc]


class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])):
Expand Down Expand Up @@ -1531,6 +1534,10 @@ def raw_without_sig(self) -> bytes:
tmp.gpgsig = None
return tmp.as_raw_string()

def check_signature(self, criterion: SignatureCriterion) -> None:
if self.gpgsig:
criterion.check(self.raw_without_sig(), self.gpgsig, self.commit_time)

def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
"""Verify GPG signature for this commit (if it is signed).
Expand All @@ -1545,24 +1552,10 @@ def verify(self, keyids: Optional[Iterable[str]] = None) -> None:
gpg.errors.MissingSignatures: if commit was not signed by a key
specified in keyids
"""
if self._gpgsig is None:
return

import gpg

with gpg.Context() as ctx:
data, result = ctx.verify(
self.raw_without_sig(),
signature=self._gpgsig,
)
if keyids:
keys = [ctx.get_key(key) for key in keyids]
for key in keys:
for subkey in keys:
for sig in result.signatures:
if subkey.can_sign and subkey.fpr == sig.fpr:
return
raise gpg.errors.MissingSignatures(result, keys, results=(data, result))
try:
self.check_signature(GpgSignatureCriterion(keyids))
except InvalidSignature as ex:
raise ex.__cause__ from None # type: ignore[misc]

def _serialize(self):
headers = []
Expand Down Expand Up @@ -1681,6 +1674,30 @@ def _get_extra(self):
_TYPE_MAP[cls.type_num] = cls


class GpgSignatureCriterion(SignatureCriterion):
"""Verifies GPG signature."""

def __init__(self, keyids: Optional[Iterable[str]] = None):
self.keyids = keyids

def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None:
import gpg

with gpg.Context() as ctx:
try:
data, result = ctx.verify(crypto_msg, signature=signature)
except gpg.errors.BadSignatures as ex:
raise InvalidSignature from ex
if self.keyids is not None:
keys = [ctx.get_key(keyid) for keyid in self.keyids]
for key in keys:
for sig in result.signatures:
if key.can_sign and key.fpr == sig.fpr:
return
ex2 = gpg.errors.MissingSignatures(result, keys, results=(data, result))
raise InvalidSignature from ex2


# Hold on to the pure-python implementations for testing
_parse_tree_py = parse_tree
_sorted_tree_items_py = sorted_tree_items
Expand Down

0 comments on commit 9236005

Please sign in to comment.