Skip to content

Commit

Permalink
Add a dev_scripts/registry.py script able to verify Github attestat…
Browse files Browse the repository at this point in the history
…ions
  • Loading branch information
almet committed Jan 22, 2025
1 parent 9aa84a2 commit ec34d60
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 0 deletions.
238 changes: 238 additions & 0 deletions dev_scripts/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
#!/usr/bin/python

import hashlib
import re
import shutil
import subprocess
from tempfile import NamedTemporaryFile

import click
import requests

DEFAULT_REPO = "freedomofpress/dangerzone"
SIGSTORE_BUNDLE = "application/vnd.dev.sigstore.bundle.v0.3+json"
DOCKER_MANIFEST_DISTRIBUTION = "application/vnd.docker.distribution.manifest.v2+json"
DOCKER_MANIFEST_INDEX = "application/vnd.oci.image.index.v1+json"
OCI_IMAGE_MANIFEST = "application/vnd.oci.image.manifest.v1+json"


class RegistryClient:
def __init__(self, registry, org, image):
self._registry = registry
self._org = org
self._image = image
self._auth_token = None
self._base_url = f"https://{registry}"
self._image_url = f"{self._base_url}/v2/{self._org}/{self._image}"

@property
def image(self):
return f"{self._registry}/{self._org}/{self._image}"

def get_auth_token(self):
if not self._auth_token:
auth_url = f"{self._base_url}/token"
response = requests.get(
auth_url,
params={
"service": f"{self._registry}",
"scope": f"repository:{self._org}/{self._image}:pull",
},
)
response.raise_for_status()
self._auth_token = response.json()["token"]
return self._auth_token

def get_auth_header(self):
return {"Authorization": f"Bearer {self.get_auth_token()}"}

def list_tags(self):
url = f"{self._image_url}/tags/list"
response = requests.get(url, headers=self.get_auth_header())
response.raise_for_status()
tags = response.json().get("tags", [])
return tags

def get_manifest(self, tag, extra_headers=None):
"""Get manifest information for a specific tag"""
manifest_url = f"{self._image_url}/manifests/{tag}"
headers = {
"Accept": DOCKER_MANIFEST_DISTRIBUTION,
"Authorization": f"Bearer {self.get_auth_token()}",
}
if extra_headers:
headers.update(extra_headers)

response = requests.get(manifest_url, headers=headers)
response.raise_for_status()
return response

def list_manifests(self, tag):
return (
self.get_manifest(
tag,
{
"Accept": DOCKER_MANIFEST_INDEX,
},
)
.json()
.get("manifests")
)

def get_blob(self, hash):
url = f"{self._image_url}/blobs/{hash}"
response = requests.get(
url,
headers={
"Authorization": f"Bearer {self.get_auth_token()}",
},
)
response.raise_for_status()
return response

def get_attestation(self, tag):
"""
Retrieve an attestation from a given tag.
The attestation needs to be attached using the Cosign Bundle
Specification defined at:
https://github.com/sigstore/cosign/blob/main/specs/BUNDLE_SPEC.md
"""

def _find_sigstore_bundle_manifest(manifests):
for manifest in manifests:
if manifest["artifactType"] == SIGSTORE_BUNDLE:
return manifest["mediaType"], manifest["digest"]

def _get_bundle_blob_digest(layers):
for layer in layers:
if layer.get("mediaType") == SIGSTORE_BUNDLE:
return layer["digest"]

tag_manifest_content = self.get_manifest(tag).content

# The attestation is available on the same container registry, with a
# specific tag named "sha256-{sha256(manifest)}"
tag_manifest_hash = hashlib.sha256(tag_manifest_content).hexdigest()

# This will get us a "list" of manifests...
manifests = self.list_manifests(f"sha256-{tag_manifest_hash}")

# ... from which we want the sigstore bundle
bundle_manifest_mediatype, bundle_manifest_digest = (
_find_sigstore_bundle_manifest(manifests)
)
if not bundle_manifest_digest:
raise Error("Not able to find sigstore bundle manifest info")

bundle_manifest = self.get_manifest(
bundle_manifest_digest, extra_headers={"Accept": bundle_manifest_mediatype}
).json()

# From there, we will get the attestation in a blob.
# It will be the first layer listed at this manifest hash location
layers = bundle_manifest.get("layers", [])

blob_digest = _get_bundle_blob_digest(layers)
bundle = self.get_blob(blob_digest)
return tag_manifest_content, bundle.content

def verify_attestation(self, image_tag: str, expected_repo: str):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
manifest, bundle = self.get_attestation(image_tag)

def _write(file, content):
file.write(content)
file.flush()

# Put the value in files and verify with cosign
with (
NamedTemporaryFile(mode="wb") as manifest_json,
NamedTemporaryFile(mode="wb") as bundle_json,
):
_write(manifest_json, manifest)
_write(bundle_json, bundle)

# Call cosign with the temporary file paths
cmd = [
"cosign",
"verify-blob-attestation",
"--bundle",
bundle_json.name,
"--new-bundle-format",
"--certificate-oidc-issuer",
"https://token.actions.githubusercontent.com",
"--certificate-identity-regexp",
f"^https://github.com/{expected_repo}/.github/workflows/release-container-image.yml@refs/heads/test/image-publication-cosign",
manifest_json.name,
]

result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise Exception(f"Attestation cannot be verified. {result.stderr}")
return True


def parse_image_location(input_string):
"""Parses container image location into (registry, namespace, repository, tag)"""
pattern = (
r"^"
r"(?P<registry>[a-zA-Z0-9.-]+)/"
r"(?P<namespace>[a-zA-Z0-9-]+)/"
r"(?P<repository>[^:]+)"
r"(?::(?P<tag>[a-zA-Z0-9.-]+))?"
r"$"
)
match = re.match(pattern, input_string)
if not match:
raise ValueError("Malformed image location")
return match.group("registry", "namespace", "repository", "tag")


@click.group()
def main():
pass


@main.command()
@click.argument("image")
def list_tags(image):
registry, org, package, _ = parse_image_location(image)
client = RegistryClient(registry, org, package)
tags = client.list_tags()
click.echo(f"Existing tags for {client.image}")
for tag in tags:
click.echo(tag)


@main.command()
@click.argument("image")
@click.option(
"--repo",
default=DEFAULT_REPO,
help="The github repository to check the attestation for",
)
def attest(image: str, repo: str):
"""
Look up the image attestation to see if the image has been built
on Github runners, and from a given repository.
"""
if shutil.which("cosign") is None:
click.echo("The cosign binary is needed but not installed.")
raise click.Abort()

registry, org, package, tag = parse_image_location(image)
tag = tag or "latest"

client = RegistryClient(registry, org, package)
verified = client.verify_attestation(tag, repo)
if verified:
click.echo(f"🎉 The {client.image}:{tag} has been built by {repo}")


if __name__ == "__main__":
main()
20 changes: 20 additions & 0 deletions docs/developer/independent-container-updates.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Independent Container Updates

Since version 0.9.0, Dangerzone is able to ship container images independently
from issuing a new release of the software.

This change has been introduced to avoid doing time-costly releases only to
update the container images, which should happen regularly to keep them updated
with the latest security patches.

## Nightly images and attestations

Each night, new images are built and pushed to our container registry, alongside
a provenance attestation, making it possible to verify that the image has been
originally built by the Github CI runners.

In order to verify that the attestations match our expectations, it is possible to use the `dev_scripts/registry.py` script:

```bash
poetry run ./dev_scripts/registry.py attest ghcr.io/freedomofpress/dangerzone/dangerzone:latest --repo freedomofpress/dangerzone
```
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pymupdf = "1.24.11" # Last version to support python 3.8 (needed for Ubuntu Foca

[tool.poetry.group.dev.dependencies]
httpx = "^0.27.2"
requests = "*"
click = "*"

[tool.doit]
verbosity = 3
Expand Down

0 comments on commit ec34d60

Please sign in to comment.