-
-
Notifications
You must be signed in to change notification settings - Fork 402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
check_signature + SignatureCriterion interface #1452
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,115 @@ | ||
# Copyright (c) 2024 E. Castedo Ellerman <[email protected]> | ||
|
||
# Dulwich is dual-licensed under the Apache License, Version 2.0 and the GNU | ||
# General Public License as public by the Free Software Foundation; version 2.0 | ||
# or (at your option) any later version. You can redistribute it and/or | ||
# modify it under the terms of either of these two licenses. | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
# You should have received a copy of the licenses; if not, see | ||
# <http://www.gnu.org/licenses/> for a copy of the GNU General Public License | ||
# and <http://www.apache.org/licenses/LICENSE-2.0> for a copy of the Apache | ||
# License, Version 2.0. | ||
# fmt: off | ||
|
||
import subprocess | ||
import tempfile | ||
from datetime import datetime, timezone | ||
from pathlib import Path | ||
|
||
from ..objects import InvalidSignature, SignatureCriterion | ||
|
||
# See the following C git implementation code for more details: | ||
# https://archive.softwareheritage.org/swh:1:cnt:07335987a6b9ceaf6edc2da71c2e636b0513372f;origin=https://github.com/git/git;visit=swh:1:snp:e72051ba1b2437b7bf3ed0346d04b289f1393982;anchor=swh:1:rev:6a11438f43469f3815f2f0fc997bd45792ff04c0;path=/gpg-interface.c;lines=450 | ||
|
||
### WARNING! | ||
### verify_time might or might not be in UTC. | ||
### The following code might not be handling timezone correctly. | ||
|
||
|
||
class SshKeygenCheckCriterion(SignatureCriterion): | ||
"""Checks signature using ssh-keygen -Y check-novalidate.""" | ||
|
||
def __init__(self, capture_output: bool = True): | ||
self.capture_output = capture_output | ||
|
||
def _ssh_keygen_check( | ||
self, subcmdline: list[str], crypto_msg: bytes, verify_time: int | ||
) -> None: | ||
verify_dt = datetime.fromtimestamp(verify_time, tz=timezone.utc) | ||
cmdline = [ | ||
*subcmdline, | ||
"-n", "git", | ||
"-O", "verify-time=" + verify_dt.strftime("%Y%m%d%H%M%SZ"), | ||
] | ||
result = subprocess.run( | ||
cmdline, input=crypto_msg, capture_output=self.capture_output | ||
) | ||
if 0 != result.returncode: | ||
raise InvalidSignature | ||
|
||
def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: | ||
with tempfile.NamedTemporaryFile() as sig_file: | ||
sig_file.write(signature) | ||
sig_file.flush() | ||
subcmdline = ["ssh-keygen", "-Y", "check-novalidate", "-s", sig_file.name] | ||
self._ssh_keygen_check(subcmdline, crypto_msg, verify_time) | ||
|
||
|
||
class SshKeygenVerifyCriterion(SshKeygenCheckCriterion): | ||
"""Verifies signature using ssh-keygen -Y verify.""" | ||
|
||
def __init__(self, allowed_signers: Path, capture_output: bool = True): | ||
super().__init__(capture_output) | ||
self.allowed_signers = str(allowed_signers) | ||
|
||
def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: | ||
with tempfile.NamedTemporaryFile() as sig_file: | ||
sig_file.write(signature) | ||
sig_file.flush() | ||
cmdline = [ | ||
"ssh-keygen", "-Y", "find-principals", | ||
"-s", sig_file.name, | ||
"-f", self.allowed_signers, | ||
] | ||
result = subprocess.run(cmdline, capture_output=True) | ||
for principal in result.stdout.splitlines(): | ||
subcmdline = [ | ||
"ssh-keygen", "-Y", "verify", | ||
"-f", self.allowed_signers, | ||
"-I", str(principal), | ||
"-s", sig_file.name, | ||
] | ||
self._ssh_keygen_check(subcmdline, crypto_msg, verify_time) | ||
|
||
#ruff: noqa: I001 | ||
|
||
if __name__ == "__main__": | ||
import argparse | ||
import dulwich.repo | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument("git_object", default="HEAD", nargs="?") | ||
parser.add_argument("--allow", type=Path, help="ssh-keygen allowed signers file") | ||
args = parser.parse_args() | ||
|
||
if args.allow is None: | ||
criterion = SshKeygenCheckCriterion(capture_output=False) | ||
else: | ||
criterion = SshKeygenVerifyCriterion(args.allow, capture_output=False) | ||
|
||
repo = dulwich.repo.Repo(".") | ||
commit = repo[args.git_object.encode()] | ||
print("commit", commit.id.decode()) | ||
try: | ||
commit.check_signature(criterion) | ||
# signature good or not signed | ||
except InvalidSignature: | ||
pass | ||
print("Author:", commit.author.decode()) | ||
print("\n ", commit.message.decode()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,6 +85,19 @@ class EmptyFileException(FileFormatException): | |
"""An unexpectedly empty file was encountered.""" | ||
|
||
|
||
class InvalidSignature(Exception): | ||
"""A signature was rejected by a signature criterion.""" | ||
|
||
|
||
class SignatureCriterion: | ||
def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: | ||
"""Check/verify signature for a cryptographic message. | ||
|
||
Raises: | ||
InvalidSignature | ||
""" | ||
|
||
|
||
def S_ISGITLINK(m): | ||
"""Check if a mode indicates a submodule. | ||
|
||
|
@@ -927,6 +940,10 @@ def raw_without_sig(self) -> bytes: | |
ret = ret[: -len(self._signature)] | ||
return ret | ||
|
||
def check_signature(self, criterion: SignatureCriterion) -> None: | ||
if self.signature: | ||
criterion.check(self.raw_without_sig(), self.signature, self.tag_time) | ||
|
||
def verify(self, keyids: Optional[Iterable[str]] = None) -> None: | ||
"""Verify GPG signature for this tag (if it is signed). | ||
|
||
|
@@ -941,24 +958,10 @@ def verify(self, keyids: Optional[Iterable[str]] = None) -> None: | |
gpg.errors.MissingSignatures: if tag was not signed by a key | ||
specified in keyids | ||
""" | ||
if self._signature is None: | ||
return | ||
|
||
import gpg | ||
|
||
with gpg.Context() as ctx: | ||
data, result = ctx.verify( | ||
self.raw_without_sig(), | ||
signature=self._signature, | ||
) | ||
if keyids: | ||
keys = [ctx.get_key(key) for key in keyids] | ||
for key in keys: | ||
for subkey in keys: | ||
for sig in result.signatures: | ||
if subkey.can_sign and subkey.fpr == sig.fpr: | ||
return | ||
raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) | ||
try: | ||
self.check_signature(GpgSignatureCriterion(keyids)) | ||
except InvalidSignature as ex: | ||
raise ex.__cause__ from None # type: ignore[misc] | ||
|
||
|
||
class TreeEntry(namedtuple("TreeEntry", ["path", "mode", "sha"])): | ||
|
@@ -1531,6 +1534,10 @@ def raw_without_sig(self) -> bytes: | |
tmp.gpgsig = None | ||
return tmp.as_raw_string() | ||
|
||
def check_signature(self, criterion: SignatureCriterion) -> None: | ||
if self.gpgsig: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a bit of an odd interface; the commit stores whether the signature is PGP or SSH, so why does the caller need to care? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The caller doesn't need to care about PGP vs GPG. They can use a yet-to-be-implemented criterion implementation that checks either kind. The PGP vs SSH info is in the signature which a criterion implementation can check when SignatureCriterion.check is called. The caller needs to care at a high level what criterion they want for a signature to be "valid". And depending on that high level desire, they then can pick a SignatureCriterion implementation. For instance, in my situation no GPG is signature is valid, only RSA and ed25519 public keys, and there is no restriction on commit date. Other application will have other criterion for what make a signature "valid". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably makes sense to make this parameter optional and have a built-in minimal criterion that gets used. A built-in minimal criterion would do what is currently done for PGP sigs when there is no trusted keys list and then for SSH it does what There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be a useful to have an example somewhere of a helper |
||
criterion.check(self.raw_without_sig(), self.gpgsig, self.commit_time) | ||
|
||
def verify(self, keyids: Optional[Iterable[str]] = None) -> None: | ||
"""Verify GPG signature for this commit (if it is signed). | ||
|
||
|
@@ -1545,24 +1552,10 @@ def verify(self, keyids: Optional[Iterable[str]] = None) -> None: | |
gpg.errors.MissingSignatures: if commit was not signed by a key | ||
specified in keyids | ||
""" | ||
if self._gpgsig is None: | ||
return | ||
|
||
import gpg | ||
|
||
with gpg.Context() as ctx: | ||
data, result = ctx.verify( | ||
self.raw_without_sig(), | ||
signature=self._gpgsig, | ||
) | ||
if keyids: | ||
keys = [ctx.get_key(key) for key in keyids] | ||
for key in keys: | ||
for subkey in keys: | ||
for sig in result.signatures: | ||
if subkey.can_sign and subkey.fpr == sig.fpr: | ||
return | ||
raise gpg.errors.MissingSignatures(result, keys, results=(data, result)) | ||
try: | ||
self.check_signature(GpgSignatureCriterion(keyids)) | ||
except InvalidSignature as ex: | ||
raise ex.__cause__ from None # type: ignore[misc] | ||
|
||
def _serialize(self): | ||
headers = [] | ||
|
@@ -1681,6 +1674,30 @@ def _get_extra(self): | |
_TYPE_MAP[cls.type_num] = cls | ||
|
||
|
||
class GpgSignatureCriterion(SignatureCriterion): | ||
"""Verifies GPG signature.""" | ||
|
||
def __init__(self, keyids: Optional[Iterable[str]] = None): | ||
self.keyids = keyids | ||
|
||
def check(self, crypto_msg: bytes, signature: bytes, verify_time: int) -> None: | ||
import gpg | ||
|
||
with gpg.Context() as ctx: | ||
try: | ||
data, result = ctx.verify(crypto_msg, signature=signature) | ||
except gpg.errors.BadSignatures as ex: | ||
raise InvalidSignature from ex | ||
if self.keyids is not None: | ||
keys = [ctx.get_key(keyid) for keyid in self.keyids] | ||
for key in keys: | ||
for sig in result.signatures: | ||
if key.can_sign and key.fpr == sig.fpr: | ||
return | ||
ex2 = gpg.errors.MissingSignatures(result, keys, results=(data, result)) | ||
raise InvalidSignature from ex2 | ||
|
||
|
||
# Hold on to the pure-python implementations for testing | ||
_parse_tree_py = parse_tree | ||
_sorted_tree_items_py = sorted_tree_items | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why turn off fmt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few reasons:
Why do you block CI integration tests on ruff formatting? Why do you care?