diff --git a/.github/workflows/pr-tests.yaml b/.github/workflows/pr-tests.yaml index 2ed59347..05f209be 100644 --- a/.github/workflows/pr-tests.yaml +++ b/.github/workflows/pr-tests.yaml @@ -99,7 +99,7 @@ jobs: OCI_NAMESPACE: oci-conformance/distribution-test OCI_TEST_PULL: 1 OCI_TEST_PUSH: 1 - OCI_TEST_CONTENT_DISCOVERY: 0 + OCI_TEST_CONTENT_DISCOVERY: 1 OCI_TEST_CONTENT_MANAGEMENT: 1 OCI_HIDE_SKIPPED_WORKFLOWS: 0 OCI_DEBUG: 0 diff --git a/Cargo.toml b/Cargo.toml index 070ef31c..1989e2e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ async-trait = "0.1.74" walkdir = "2.0" rand = "0.8" humansize = "2.1" -sqlx = { version = "0.8", features = ["runtime-tokio", "migrate", "sqlite"] } +sqlx = { version = "0.8", features = ["runtime-tokio", "migrate", "sqlite", "json"] } oci-spec = "0.7.0" oci-client = "0.14.0" diff --git a/migrations/01_initial.sql b/migrations/01_initial.sql index 0fdc8a1d..ac484438 100644 --- a/migrations/01_initial.sql +++ b/migrations/01_initial.sql @@ -1,40 +1,28 @@ CREATE TABLE "blob" ( - "digest" varchar NOT NULL PRIMARY KEY, - "size" integer NOT NULL, - "is_manifest" boolean NOT NULL, - "last_accessed" integer NOT NULL DEFAULT (unixepoch()) -); -CREATE TABLE "blob_blob_association" ( - "parent_digest" varchar NOT NULL, - "child_digest" varchar NOT NULL, - PRIMARY KEY ("parent_digest", "child_digest"), - FOREIGN KEY ("parent_digest") - REFERENCES "blob" ("digest") - ON DELETE CASCADE - ON UPDATE CASCADE, - FOREIGN KEY ("child_digest") - REFERENCES "blob" ("digest") -); + "digest" TEXT NOT NULL PRIMARY KEY, + "size" INTEGER NOT NULL, + "last_accessed" INTEGER NOT NULL DEFAULT (unixepoch()) +) STRICT; +CREATE TABLE "manifest" ( + "digest" TEXT NOT NULL PRIMARY KEY, + "last_accessed" INTEGER NOT NULL DEFAULT (unixepoch()), + "json" BLOB NOT NULL, + "blob" BLOB NOT NULL +) STRICT; CREATE TABLE "blob_upload" ( "uuid" TEXT NOT NULL PRIMARY KEY, - "offset" integer NOT NULL, - "updated_at" timestamp_text NOT NULL DEFAULT CURRENT_TIMESTAMP, - "repo" varchar NOT NULL -); + "offset" INTEGER NOT NULL, + "updated_at" TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, + "repo" TEXT NOT NULL +) STRICT; CREATE TABLE "repo_blob_association" ( - "repo_name" varchar NOT NULL, - "blob_digest" varchar NOT NULL, - PRIMARY KEY ("repo_name", "blob_digest"), - FOREIGN KEY ("blob_digest") - REFERENCES "blob" ("digest") - ON DELETE CASCADE -); + "repo_name" TEXT NOT NULL, + "blob_digest" TEXT NOT NULL, + PRIMARY KEY ("repo_name", "blob_digest") +) STRICT; CREATE TABLE "tag" ( - "tag" varchar NOT NULL, - "repo" varchar NOT NULL, - "manifest_digest" varchar NOT NULL, - CONSTRAINT "IDX_repo_tag" PRIMARY KEY ("repo", "tag"), - FOREIGN KEY ("repo", "manifest_digest") - REFERENCES "repo_blob_association" ("repo_name", "blob_digest") - ON DELETE CASCADE -); + "tag" TEXT NOT NULL, + "repo" TEXT NOT NULL, + "manifest_digest" TEXT NOT NULL, + CONSTRAINT "IDX_repo_tag" PRIMARY KEY ("repo", "tag") +) STRICT; diff --git a/src/registry/digest.rs b/src/registry/digest.rs index 7d12b87c..e3c3c811 100644 --- a/src/registry/digest.rs +++ b/src/registry/digest.rs @@ -1,4 +1,3 @@ -use std::io::Read; use std::{fmt, io}; use lazy_static::lazy_static; @@ -7,6 +6,7 @@ use serde::{Deserialize, Serialize}; use sha2::digest::OutputSizeUser; use sha2::{Digest as ShaDigest, Sha256}; use thiserror::Error; +use tokio::io::{AsyncRead, AsyncReadExt}; // Buffer size for SHA2 hashing const BUFFER_SIZE: usize = 1024 * 1024; @@ -72,17 +72,24 @@ impl Digest { &self.0[7..] } - pub fn digest_sha256(mut reader: R) -> io::Result { - Self::digest::(&mut reader) + pub async fn digest_sha256(mut reader: R) -> io::Result { + Self::digest::(&mut reader).await + } + + pub fn digest_sha256_slice(slice: &[u8]) -> Digest { + let hash = hex::encode(Sha256::digest(slice)); + Self(format!("sha256:{hash}")) } #[allow(clippy::self_named_constructors)] - pub fn digest(reader: &mut R) -> io::Result { + pub async fn digest( + reader: &mut R, + ) -> io::Result { let mut digest = D::default(); - let mut buffer = [0u8; BUFFER_SIZE]; + let mut buffer = vec![0u8; BUFFER_SIZE]; let mut n = BUFFER_SIZE; while n == BUFFER_SIZE { - n = reader.read(&mut buffer)?; + n = reader.read(&mut buffer).await?; digest.update(&buffer[..n]); } let hash = hex::encode(digest.finalize()); @@ -100,17 +107,20 @@ impl Digest { pub fn as_str(&self) -> &str { &self.0 } + + pub fn into_string(self) -> String { + self.0 + } } #[cfg(test)] mod test { - use std::io::BufReader; use crate::registry::digest::Digest; #[test] fn sha256_digest_test() { - let result = Digest::digest_sha256(BufReader::new("hello world".as_bytes())).unwrap(); + let result = Digest::digest_sha256_slice("hello world".as_bytes()); assert_eq!( &result.0, "sha256:b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" @@ -119,7 +129,7 @@ mod test { #[test] fn sha256_digest_empty_test() { - let result = Digest::digest_sha256(BufReader::new("".as_bytes())).unwrap(); + let result = Digest::digest_sha256_slice("".as_bytes()); assert_eq!( result.0, "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string() @@ -128,10 +138,8 @@ mod test { #[test] fn sha256_digest_brown_fox_test() { - let result = Digest::digest_sha256(BufReader::new( - "the quick brown fox jumps over the lazy dog".as_bytes(), - )) - .unwrap(); + let result = + Digest::digest_sha256_slice("the quick brown fox jumps over the lazy dog".as_bytes()); assert_eq!( result.0, "sha256:05c6e08f1d9fdafa03147fcb8f82f124c76d2f70e3d989dc8aadb5e7d7450bec".to_string() diff --git a/src/registry/manifest.rs b/src/registry/manifest.rs index c2585f18..50288d64 100644 --- a/src/registry/manifest.rs +++ b/src/registry/manifest.rs @@ -1,8 +1,9 @@ use std::borrow::Cow; +use std::collections::HashMap; use anyhow::Result; use lazy_static::lazy_static; -use oci_spec::image::{ImageIndex, ImageManifest, MediaType}; +use oci_spec::image::{Descriptor, ImageIndex, ImageManifest, MediaType}; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -101,6 +102,30 @@ pub mod manifest_media_type { } impl OCIManifest { + #[inline] + pub fn subject(&self) -> Option { + match self { + OCIManifest::V2(m2) => m2.subject(), + OCIManifest::List(list) => list.subject(), + } + .clone() + } + #[inline] + pub fn artifact_type(&self) -> Option { + match self { + OCIManifest::V2(m2) => m2.artifact_type(), + OCIManifest::List(list) => list.artifact_type(), + } + .clone() + } + #[inline] + pub fn annotations(&self) -> &Option> { + match self { + OCIManifest::V2(m2) => m2.annotations(), + OCIManifest::List(list) => list.annotations(), + } + } + /// Returns a Vector of the digests of all assets referenced in the Manifest /// With the exception of digests for "foreign blobs" pub fn get_local_asset_digests(&self) -> Vec { @@ -109,13 +134,7 @@ impl OCIManifest { let mut digests: Vec = m2 .layers() .iter() - .filter(|l| { - l.media_type() - != &MediaType::Other( - "application/vnd.docker.image.rootfs.foreign.diff.tar.gzip" - .to_string(), - ) - }) + .filter(|l| layer_is_distributable(l.media_type())) .map(|x| x.digest().to_string()) .collect(); digests.push(m2.config().digest().to_string()); @@ -141,6 +160,17 @@ impl OCIManifest { } } +pub fn layer_is_distributable(layer: &MediaType) -> bool { + let non_distributable = [ + MediaType::ImageLayerNonDistributable, + MediaType::ImageLayerNonDistributableGzip, + MediaType::ImageLayerNonDistributableZstd, + MediaType::Other("application/vnd.docker.image.rootfs.foreign.diff.tar.gzip".to_string()), + ]; + + !non_distributable.contains(layer) +} + #[cfg(test)] mod test { diff --git a/src/registry/mod.rs b/src/registry/mod.rs index a847c2d5..6cebe3f0 100644 --- a/src/registry/mod.rs +++ b/src/registry/mod.rs @@ -59,7 +59,6 @@ impl From for RegistryError { } StorageBackendError::InvalidContentRange => Self::InvalidContentRange, StorageBackendError::InvalidDigest => Self::InvalidDigest, - StorageBackendError::InvalidManifest(_msg) => Self::InvalidManifest, StorageBackendError::InvalidName(name) => Self::InvalidName(name), StorageBackendError::Io(e) => { tracing::error!("Internal IO error: {e:?}"); diff --git a/src/registry/proxy/proxy_config.rs b/src/registry/proxy/proxy_config.rs index d9831e74..562e41cd 100644 --- a/src/registry/proxy/proxy_config.rs +++ b/src/registry/proxy/proxy_config.rs @@ -4,7 +4,6 @@ use aws_sdk_ecr::config::http::HttpResponse; use aws_sdk_ecr::error::SdkError; use aws_sdk_ecr::operation::get_authorization_token::GetAuthorizationTokenError; use base64::Engine; -use bytes::Bytes; use futures::future::try_join_all; use oci_client::client::ClientProtocol; use oci_client::secrets::RegistryAuth; @@ -79,6 +78,14 @@ pub enum DownloadRemoteImageError { InvalidDigest(#[from] DigestError), #[error("Failed to download image")] DownloadAttemptsFailed, + #[error("Manifest JSON is not canonicalized")] + ManifestNotCanonicalized, + #[error("OCI client error: {0}")] + OciClientError(#[from] oci_client::errors::OciDistributionError), + #[error("Storage backend error: {0}")] + StorageError(#[from] crate::registry::storage::StorageBackendError), + #[error("Could not deserialize manifest: {0}")] + ManifestDeserializationError(#[from] serde_json::Error), } impl SingleRegistryProxyConfig { @@ -108,7 +115,7 @@ impl SingleRegistryProxyConfig { image: &RemoteImage, registry: &TrowServer, db: &SqlitePool, - ) -> Result { + ) -> Result { // Replace eg f/docker/alpine by f/docker/library/alpine let repo_name = format!("f/{}/{}", self.alias, image.get_repo()); tracing::debug!("Downloading proxied image {}", repo_name); @@ -152,20 +159,14 @@ impl SingleRegistryProxyConfig { for mani_digest in digests.into_iter() { let mani_digest_str = mani_digest.as_str(); - let have_manifest = sqlx::query_scalar!( - r#" - SELECT EXISTS( - SELECT 1 FROM blob - WHERE digest = $1 AND is_manifest = 1 - ) - "#, + let has_manifest = sqlx::query_scalar!( + r#"SELECT EXISTS(SELECT 1 FROM manifest WHERE digest = $1)"#, mani_digest_str ) .fetch_one(&mut *db.acquire().await?) - .await? - == 1; - if have_manifest { - return Ok(mani_digest); + .await?; + if has_manifest == 1 { + return Ok(mani_digest.to_string()); } if let Some((cl, auth)) = &try_cl { let ref_to_dl = image_ref.clone_with_digest(mani_digest.to_string()); @@ -180,24 +181,23 @@ impl SingleRegistryProxyConfig { ) .await; - if let Err(e) = manifest_download { - tracing::warn!("Failed to download proxied image: {}", e) - } else { - if let Some(tag) = image_ref.tag() { - sqlx::query!( - r#" - INSERT INTO tag (repo, tag, manifest_digest) - VALUES ($1, $2, $3) - ON CONFLICT (repo, tag) DO UPDATE SET manifest_digest = $3 - "#, - repo_name, - tag, - mani_digest_str - ) - .execute(&mut *db.acquire().await?) - .await?; + match manifest_download { + Err(e) => tracing::warn!("Failed to download proxied image: {}", e), + Ok(()) => { + if let Some(tag) = image_ref.tag() { + sqlx::query!( + r#"INSERT INTO tag (repo, tag, manifest_digest) + VALUES ($1, $2, $3) + ON CONFLICT (repo, tag) DO UPDATE SET manifest_digest = $3"#, + repo_name, + tag, + mani_digest_str + ) + .execute(&mut *db.acquire().await?) + .await?; + } + return Ok(mani_digest.to_string()); } - return Ok(mani_digest); } } } @@ -252,8 +252,7 @@ async fn get_aws_ecr_password_from_env(ecr_host: &str) -> Result Result<()> { +) -> Result<(), DownloadRemoteImageError> { async fn download_blob( cl: &oci_client::Client, db: SqlitePool, @@ -269,7 +268,7 @@ async fn download_manifest_and_layers( ref_: &Reference, layer_digest: &str, local_repo_name: &str, - ) -> Result<()> { + ) -> Result<(), DownloadRemoteImageError> { tracing::trace!("Downloading blob {}", layer_digest); let already_has_blob = sqlx::query_scalar!( "SELECT EXISTS(SELECT 1 FROM blob WHERE digest = $1);", @@ -286,7 +285,7 @@ async fn download_manifest_and_layers( .await?; let size = path.metadata().unwrap().len() as i64; sqlx::query!( - "INSERT INTO blob (digest, is_manifest, size) VALUES ($1, FALSE, $2) ON CONFLICT DO NOTHING;", + "INSERT INTO blob (digest, size) VALUES ($1, $2) ON CONFLICT DO NOTHING;", layer_digest, size ) @@ -300,14 +299,6 @@ async fn download_manifest_and_layers( ) .execute(&mut *db.acquire().await?) .await?; - let parent_digest = ref_.digest().unwrap(); - sqlx::query!( - "INSERT INTO blob_blob_association (parent_digest, child_digest) VALUES ($1, $2) ON CONFLICT DO NOTHING;", - parent_digest, - layer_digest - ) - .execute(&mut *db.acquire().await?) - .await?; Ok(()) } @@ -328,19 +319,13 @@ async fn download_manifest_and_layers( oci_client::errors::OciDistributionError::ManifestParsingError(e.to_string()) })?; - let manifest_size = raw_manifest.len() as i64; - let already_has_manifest = match sqlx::query!( - "INSERT INTO blob (digest, is_manifest, size) VALUES ($1, TRUE, $2);", + sqlx::query!( + "INSERT INTO manifest (digest, json, blob) VALUES ($1, jsonb($2), $2) ON CONFLICT DO NOTHING", digest, - manifest_size + raw_manifest ) .execute(&mut *db.acquire().await?) - .await - { - Err(sqlx::Error::Database(e)) if e.is_unique_violation() => true, - Err(e) => return Err(e.into()), - Ok(_) => false, - }; + .await?; sqlx::query!( "INSERT INTO repo_blob_association (repo_name, blob_digest) VALUES ($1, $2) ON CONFLICT DO NOTHING;", @@ -350,17 +335,7 @@ async fn download_manifest_and_layers( .execute(&mut *db.acquire().await?) .await?; - if !already_has_manifest { - storage - .write_image_manifest( - Bytes::from(raw_manifest), - local_repo_name, - &Digest::try_from_raw(&digest).unwrap(), - ) - .await?; - } - - match manifest { + match &manifest { OCIManifest::List(m) => { let images_to_dl = m .manifests() diff --git a/src/registry/storage.rs b/src/registry/storage.rs index d72be72a..9bca6e7d 100644 --- a/src/registry/storage.rs +++ b/src/registry/storage.rs @@ -1,7 +1,6 @@ use std::borrow::Cow; use std::ops::RangeInclusive; use std::path::{Path, PathBuf}; -use std::pin::pin; use std::{io, str}; use bytes::Bytes; @@ -11,7 +10,6 @@ use tokio::io::AsyncWriteExt; use tokio::time::Duration; use tokio_util::compat::TokioAsyncReadCompatExt; -use super::manifest::ManifestError; use crate::registry::blob_storage::Stored; use crate::registry::temporary_file::FileWrapper; use crate::registry::Digest; @@ -22,8 +20,6 @@ use crate::types::BoundedStream; pub enum StorageBackendError { #[error("the name `{0}` is not valid")] InvalidName(String), - #[error("Invalid manifest: {0:?}")] - InvalidManifest(#[from] ManifestError), #[error("Blob not found:{0}")] BlobNotFound(PathBuf), #[error("Digest did not match content")] @@ -38,29 +34,17 @@ pub enum StorageBackendError { Io(#[from] io::Error), } -static BLOBS_DIR: &str = "blobs"; -static UPLOADS_DIR: &str = "scratch"; - -/// Current storage structure: -/// - /blobs/sha256/: is a blob (manifests are treated as blobs) -/// - /manifests//: file containing a list of manifest digests -/// - /scratch/: is a blob being uploaded -/// -/// TODO future structure: -/// - /blobs/sha256/: contains blobs -/// - /uploads/: is a blob being uploaded -/// - /repositories//tags/: is a file with a manifest digest -/// - /repositories//revisions/sha256/: is a manifest #[derive(Clone, Debug)] pub struct TrowStorageBackend { - path: PathBuf, + blobs_dir: PathBuf, + uploads_dir: PathBuf, } impl TrowStorageBackend { - fn init_create_path(root: &Path, dir: &str) -> Result<(), StorageBackendError> { + fn init_create_path(root: &Path, dir: &str) -> Result { let path = root.join(dir); match std::fs::create_dir_all(&path) { - Ok(_) => Ok(()), + Ok(_) => Ok(path), Err(e) => { tracing::error!( r#" @@ -76,24 +60,13 @@ impl TrowStorageBackend { } pub fn new(path: PathBuf) -> Result { - Self::init_create_path(&path, BLOBS_DIR)?; - Self::init_create_path(&path, UPLOADS_DIR)?; - - Ok(Self { path }) - } + let blobs_dir = Self::init_create_path(&path, "blobs")?; + let uploads_dir = Self::init_create_path(&path, "uploads")?; - pub async fn get_manifest( - &self, - repo: &str, - digest: &Digest, - ) -> Result { - tracing::debug!("Get manifest {repo}:{digest}"); - let path = self.path.join(BLOBS_DIR).join(digest.as_str()); - if !path.exists() { - return Err(StorageBackendError::BlobNotFound(path)); - } - - Ok(tokio::fs::read(&path).await?.into()) + Ok(Self { + blobs_dir, + uploads_dir, + }) } pub async fn get_blob_stream( @@ -102,7 +75,7 @@ impl TrowStorageBackend { digest: &Digest, ) -> Result, StorageBackendError> { tracing::debug!("Get blob {repo_name}:{digest}"); - let path = self.path.join(BLOBS_DIR).join(digest.to_string()); + let path = self.blobs_dir.join(digest.to_string()); let file = tokio::fs::File::open(&path).await.map_err(|e| { tracing::error!("Could not open blob: {}", e); StorageBackendError::BlobNotFound(path) @@ -122,13 +95,12 @@ impl TrowStorageBackend { E: std::error::Error + Send + Sync + 'static, { tracing::debug!("Write blob {digest}"); - let tmp_location = self.path.join(UPLOADS_DIR).join(digest.to_string()); - let location = self.path.join(BLOBS_DIR).join(digest.to_string()); + let tmp_location = self.uploads_dir.join(digest.as_str()); + let location = self.blobs_dir.join(digest.as_str()); if location.exists() { - tracing::info!("Blob already exists"); + tracing::info!(digest = digest.as_str(), "Blob already exists"); return Ok(location); } - tokio::fs::create_dir_all(location.parent().unwrap()).await?; let mut tmp_file = match FileWrapper::new_tmp(tmp_location.clone()).await { // All good Ok(tmpf) => tmpf, @@ -139,19 +111,20 @@ impl TrowStorageBackend { // wait for download to be done (temp file to be moved) tokio::time::sleep(Duration::from_millis(200)).await; } - // TODO - return Ok(location); + if location.exists() { + return Ok(location); + } else { + return Err(StorageBackendError::BlobNotFound(location)); + } } Err(e) => { - return Err(e.into()); + tracing::error!("Could not open {}", tmp_location.display()); + return Err(StorageBackendError::Io(e)); } }; tmp_file.write_stream(stream).await?; if verify { - let reader = std::fs::File::open(tmp_file.path()).map_err(|e| { - StorageBackendError::Internal(Cow::Owned(format!("Could not open tmp file: {e}"))) - })?; - let tmp_digest = Digest::digest_sha256(reader).map_err(|e| { + let tmp_digest = tmp_file.digest().await.map_err(|e| { StorageBackendError::Internal(Cow::Owned(format!( "Could not calculate digest of blob: {e}" ))) @@ -160,7 +133,6 @@ impl TrowStorageBackend { return Err(StorageBackendError::InvalidDigest); } } - tmp_file.rename(&location).await?; Ok(location) } @@ -178,11 +150,11 @@ impl TrowStorageBackend { E: std::error::Error + Send + Sync + 'static, { tracing::debug!("Write blob part {upload_id} ({range:?})"); - let tmp_location = self.path.join(UPLOADS_DIR).join(upload_id.to_string()); + let tmp_location = self.uploads_dir.join(upload_id.to_string()); let mut tmp_file = FileWrapper::append(tmp_location.clone()) .await .map_err(|e| { - tracing::error!("Could not open tmp file: {}", e); + tracing::error!("Could not open tmp file {}: {}", tmp_location.display(), e); match e.kind() { io::ErrorKind::NotFound => StorageBackendError::BlobNotFound(tmp_location), io::ErrorKind::AlreadyExists => StorageBackendError::InvalidContentRange, @@ -228,8 +200,8 @@ impl TrowStorageBackend { user_digest: &Digest, ) -> Result<(), StorageBackendError> { tracing::debug!("Complete blob write {upload_id}"); - let tmp_location = self.path.join(UPLOADS_DIR).join(upload_id.to_string()); - let final_location = self.path.join(BLOBS_DIR).join(user_digest.to_string()); + let tmp_location = self.uploads_dir.join(upload_id.to_string()); + let final_location = self.blobs_dir.join(user_digest.as_str()); // Should we even do this ? It breaks OCI tests: // let f = std::fs::File::open(&tmp_location)?; // let calculated_digest = Digest::digest_sha256(f)?; @@ -250,62 +222,19 @@ impl TrowStorageBackend { Ok(()) } - pub async fn write_image_manifest( - &self, - manifest: Bytes, - repo_name: &str, - digest: &Digest, - ) -> Result { - tracing::debug!("Write image manifest {repo_name}:{digest}"); - let manifest_stream = bytes_to_stream(manifest); - let location = self - .write_blob_stream(digest, pin!(manifest_stream), true) - .await?; - - Ok(location) - } - pub async fn delete_blob( &self, repo: &str, digest: &Digest, ) -> Result<(), StorageBackendError> { tracing::debug!("Delete blob {repo}:{digest}"); - let blob_path = self.path.join(BLOBS_DIR).join(digest.as_str()); + let blob_path = self.blobs_dir.join(digest.as_str()); tokio::fs::remove_file(blob_path).await?; Ok(()) } - // pub async fn delete_manifest( - // &self, - // repo_name: &str, - // digest: &Digest, - // ) -> Result<(), StorageBackendError> { - // let path = self.path.join(BLOBS_DIR).join(digest.to_string()); - // if let Err(e) = tokio::fs::remove_file(path).await { - // event!(Level::WARN, "Could not delete manifest file: {}", e); - // } - // let tags: [String; 0] = []; - // // let tags = self.list_repo_tags(repo_name).await?; - // for t in tags { - // let manifest_history_loc = self.path.join(BLOBS_DIR).join(repo_name).join(t); - // let history_raw = tokio::fs::read(&manifest_history_loc).await?; - // let old_history = String::from_utf8(history_raw).unwrap(); - // let new_history: String = old_history - // .lines() - // .filter(|l| !l.contains(&digest.to_string())) - // .collect(); - // if new_history.is_empty() { - // tokio::fs::remove_file(&manifest_history_loc).await?; - // } else if new_history.len() != old_history.len() { - // tokio::fs::write(&manifest_history_loc, new_history).await?; - // } - // } - // Ok(()) - // } - pub async fn is_ready(&self) -> Result<(), StorageBackendError> { - let path = self.path.join("fs-ready"); + let path = self.uploads_dir.join("fs-ready"); let mut file = tokio::fs::File::create(path).await?; let size = file.write(b"Hello World").await?; if size != 11 { @@ -327,26 +256,27 @@ impl TrowStorageBackend { } } -fn bytes_to_stream(bytes: Bytes) -> impl Stream> { - futures::stream::once(async move { Ok(bytes) }) -} - // fn is_proxy_repo(repo_name: &str) -> bool { // repo_name.starts_with(PROXY_DIR) // } #[cfg(test)] mod tests { - use bytes::Buf; + + use std::pin::pin; use super::*; - use crate::registry::manifest; + + fn bytes_to_stream(bytes: Bytes) -> impl Stream> { + futures::stream::once(async move { Ok(bytes) }) + } #[test] fn trow_storage_backend_new() { let dir = test_temp_dir::test_temp_dir!(); let store = TrowStorageBackend::new(dir.as_path_untracked().to_owned()).unwrap(); - assert!(store.path.join("blobs").exists()); + assert!(store.blobs_dir.exists()); + assert!(store.uploads_dir.exists()); drop(dir); } @@ -364,53 +294,9 @@ mod tests { assert_eq!( location, store - .path - .join("blobs") + .blobs_dir .join("sha256:123456789101112131415161718192021") ); drop(dir); } - - const IMAGE_MANIFEST: &str = r#"{ - "schemaVersion": 2, - "mediaType": "application/vnd.docker.distribution.manifest.v2+json", - "config": { - "mediaType": "application/vnd.docker.container.image.v1+json", - "size": 7027, - "digest": "sha256:3b4e5a3b4e5a3b4e5a3b4e5a3b4e5a3b4e5a3b4e5a3b4e5a3b4e5a3b4e5a3b4e" - }, - "layers": [] - }"#; - - #[tokio::test] - #[should_panic] - async fn trow_storage_backend_write_image_manifest_bad_digest() { - let dir = test_temp_dir::test_temp_dir!(); - let store = TrowStorageBackend::new(dir.as_path_untracked().to_owned()).unwrap(); - let manifest = manifest::OCIManifest::V2(serde_json::from_str(IMAGE_MANIFEST).unwrap()); - let manifest_bytes = serde_json::to_vec(&manifest).unwrap(); - let _location = store - .write_image_manifest( - Bytes::from(manifest_bytes), - "zozo/image", - &Digest::try_from_raw("sha256:none").unwrap(), - ) - .await - .unwrap(); - drop(dir); - } - - #[tokio::test] - async fn trow_storage_backend_write_image_manifest() { - let dir = test_temp_dir::test_temp_dir!(); - let store = TrowStorageBackend::new(dir.as_path_untracked().to_owned()).unwrap(); - let manifest = manifest::OCIManifest::V2(serde_json::from_str(IMAGE_MANIFEST).unwrap()); - let manifest_bytes = serde_json::to_vec(&manifest).unwrap(); - let digest = Digest::digest_sha256(manifest_bytes.clone().reader()).unwrap(); - let _location = store - .write_image_manifest(Bytes::from(manifest_bytes), "zozo/image", &digest) - .await - .unwrap(); - drop(dir); - } } diff --git a/src/registry/temporary_file.rs b/src/registry/temporary_file.rs index 5393c1e6..1eb0cb17 100644 --- a/src/registry/temporary_file.rs +++ b/src/registry/temporary_file.rs @@ -5,7 +5,9 @@ use bytes::Bytes; use futures::stream::Stream; use futures::StreamExt; use tokio::fs::{self, File}; -use tokio::io::{self, AsyncWriteExt}; +use tokio::io::{self, AsyncSeekExt, AsyncWriteExt}; + +use super::Digest; /// Designed for downloading files. The [`Drop`] implementation makes sure that /// the underlying file is deleted in case of an error. @@ -20,7 +22,12 @@ pub struct FileWrapper { impl FileWrapper { pub async fn new_tmp(path: PathBuf) -> io::Result { let mut open_opt = fs::OpenOptions::new(); - let file = open_opt.create_new(true).write(true).open(&path).await?; + let file = open_opt + .create_new(true) + .write(true) + .read(true) + .open(&path) + .await?; Ok(FileWrapper { file, @@ -62,6 +69,12 @@ impl FileWrapper { Ok(len) } + pub async fn digest(&mut self) -> io::Result { + self.file.seek(std::io::SeekFrom::Start(0)).await?; + Digest::digest_sha256(&mut self.file).await + } + + #[allow(unused)] pub fn path(&self) -> &Path { &self.path } diff --git a/src/routes/blob_upload.rs b/src/routes/blob_upload.rs index a9c00e82..3eea517b 100644 --- a/src/routes/blob_upload.rs +++ b/src/routes/blob_upload.rs @@ -70,8 +70,8 @@ mod utils { let size_i64 = size.total_stored as i64; sqlx::query!( r#" - INSERT INTO blob (digest, size, is_manifest) - VALUES ($1, $2, false) ON CONFLICT (digest) DO NOTHING + INSERT INTO blob (digest, size) + VALUES ($1, $2) ON CONFLICT (digest) DO NOTHING "#, digest_str, size_i64 @@ -347,8 +347,6 @@ pub fn route(mut app: Router>) -> Router/manifests/ # Parameters -name - The name of the image +name - The namespace of the repository reference - either a tag or a digest # Client Headers @@ -64,14 +65,13 @@ async fn get_manifest( } }; - let new_digest = proxy_cfg + proxy_cfg .download_remote_image(&image, &state.registry, &state.db) .await .map_err(|e| { tracing::error!("Error downloading image: {e}"); Error::InternalError - })?; - new_digest.to_string() + })? } else { let digest = match &reference { ManifestReference::Tag(_) => { @@ -93,13 +93,12 @@ async fn get_manifest( } ManifestReference::Digest(_) => raw_reference, }; - let maybe_manifest = sqlx::query!( + let maybe_digest = sqlx::query_scalar!( r#" - SELECT * FROM blob b - INNER JOIN repo_blob_association rba ON rba.blob_digest = b.digest - WHERE b.digest = $2 - AND b.is_manifest is true - AND rba.repo_name = $1 + SELECT m.digest + FROM manifest m + INNER JOIN repo_blob_association rba ON rba.blob_digest = m.digest + WHERE m.digest = $2 AND rba.repo_name = $1 "#, repo, digest @@ -107,29 +106,28 @@ async fn get_manifest( .fetch_optional(&mut *state.db.acquire().await?) .await?; - if maybe_manifest.is_none() { - return Err(Error::ManifestUnknown(format!("Unknown digest {digest}"))); + match maybe_digest { + Some(d) => d, + None => { + return Err(Error::ManifestUnknown(format!("Unknown digest {digest}"))); + } } - digest }; - let digest_parsed = Digest::try_from_raw(&digest).unwrap(); - let manifest_raw = state - .registry - .storage - .get_manifest(&repo, &digest_parsed) - .await?; + let res = sqlx::query!( + r#" + SELECT m.json ->> 'mediaType' as "media_type: String", m.blob + FROM manifest m + WHERE m.digest = $1 + "#, + digest + ) + .fetch_one(&mut *state.db.acquire().await?) + .await?; - let manifest_parsed: OCIManifest = serde_json::from_slice(&manifest_raw) - .map_err(|e| Error::ManifestInvalid(format!("serialization error: {e}")))?; - let content_type = manifest_parsed - .media_type() - .as_ref() - .map(|mt| mt.to_string()) - .unwrap_or("application/json".to_string()); - Ok(OciJson::new_raw(manifest_raw) - .set_digest(&digest_parsed) - .set_content_type(&content_type)) + Ok(OciJson::new_raw(res.blob.into()) + .set_digest(digest) + .set_content_type(&res.media_type.unwrap_or("application/json".to_owned()))) } endpoint_fn_7_levels!( @@ -167,48 +165,75 @@ async fn put_image_manifest( Error::ManifestInvalid(format!( "Manifest is bigger than limit of {MANIFEST_BODY_SIZE_LIMIT_MB}MiB" )) - })?; + })? + .to_vec(); let manifest_parsed = serde_json::from_slice::<'_, OCIManifest>(&manifest_bytes) .map_err(|e| Error::ManifestInvalid(format!("{e}")))?; - let assets = manifest_parsed.get_local_asset_digests(); - for digest in assets { - tracing::debug!("Checking asset: {repo_name} {digest}"); - let res = sqlx::query!( - r#" - SELECT b.digest FROM blob b - INNER JOIN repo_blob_association rba ON rba.blob_digest = b.digest - WHERE b.digest = $1 AND rba.repo_name = $2"#, - digest, - repo_name - ) - .fetch_optional(&mut *state.db.acquire().await?) - .await?; - if res.is_none() { - return Err(Error::ManifestInvalid(format!("Asset not found: {digest}"))); + match &manifest_parsed { + OCIManifest::List(m) => { + let assets = m + .manifests() + .iter() + .filter(|l| layer_is_distributable(l.media_type())) + .map(|m| m.digest().as_ref()); + for digest in assets { + let res = sqlx::query!( + r"SELECT m.digest FROM manifest m + INNER JOIN repo_blob_association rba ON rba.blob_digest = m.digest + WHERE m.digest = $1 AND rba.repo_name = $2", + digest, + repo_name + ) + .fetch_optional(&mut *state.db.acquire().await?) + .await?; + if res.is_none() { + return Err(Error::ManifestInvalid(format!( + "Manifest asset not found: {digest}" + ))); + } + } + } + OCIManifest::V2(m) => { + let assets = m + .layers() + .iter() + .filter(|l| layer_is_distributable(l.media_type())) + .map(|l| l.digest().as_ref()); + for digest in assets { + let res = sqlx::query!( + r"SELECT b.digest FROM blob b + INNER JOIN repo_blob_association rba ON rba.blob_digest = b.digest + WHERE b.digest = $1 AND rba.repo_name = $2", + digest, + repo_name + ) + .fetch_optional(&mut *state.db.acquire().await?) + .await?; + if res.is_none() { + return Err(Error::ManifestInvalid(format!( + "Blob asset not found: {digest}" + ))); + } + } } } - let size = manifest_bytes.len() as i64; - let computed_digest = Digest::digest_sha256(manifest_bytes.clone().reader()).unwrap(); + let computed_digest = Digest::digest_sha256_slice(&manifest_bytes); let computed_digest_str = computed_digest.as_str(); if !is_tag && computed_digest_str != reference { - return Err(Error::ManifestInvalid("Digest does not match".to_string())); + return Err(Error::ManifestInvalid( + "Given digest does not match".to_string(), + )); } - state - .registry - .storage - .write_image_manifest(manifest_bytes, &repo_name, &computed_digest) - .await?; - sqlx::query!( r#" - INSERT INTO blob (digest, size, is_manifest) - VALUES ($1, $2, true) + INSERT INTO manifest (digest, json, blob) + VALUES ($1, jsonb($2), $2) ON CONFLICT (digest) DO NOTHING "#, computed_digest_str, - size + manifest_bytes ) .execute(&mut *state.db.acquire().await?) .await?; @@ -240,11 +265,15 @@ async fn put_image_manifest( .await?; } + // check if the manifest has a Subject field, if so return the header OCI-Subject + let subject = manifest_parsed.subject().map(|s| s.digest().to_string()); + Ok(VerifiedManifest::new( Some(host), repo_name, computed_digest, reference, + subject, )) } endpoint_fn_7_levels!( @@ -281,10 +310,11 @@ async fn delete_image_manifest( .await?; } else { let digest = Digest::try_from_raw(&reference)?; + let digest_str = digest.as_str(); let res = sqlx::query!( "DELETE FROM repo_blob_association WHERE repo_name = $1 AND blob_digest = $2", repo, - reference + digest_str ) .execute(&mut *state.db.acquire().await?) .await?; @@ -292,13 +322,15 @@ async fn delete_image_manifest( if res.rows_affected() > 0 { let remaining_assoc = sqlx::query_scalar!( "SELECT COUNT(*) FROM repo_blob_association WHERE blob_digest = $1", - reference + digest_str ) .fetch_one(&mut *state.db.acquire().await?) .await?; if remaining_assoc == 0 { - state.registry.storage.delete_blob(&repo, &digest).await?; + sqlx::query!("DELETE FROM manifest where digest = $1", digest_str) + .execute(&mut *state.db.acquire().await?) + .await?; } } } diff --git a/src/routes/manifest_referrers.rs b/src/routes/manifest_referrers.rs new file mode 100644 index 00000000..ec60fddc --- /dev/null +++ b/src/routes/manifest_referrers.rs @@ -0,0 +1,207 @@ +use std::str::FromStr; +use std::sync::Arc; + +use axum::extract::{Path, State}; +use axum::routing::get; +use axum::Router; +use digest::Digest; +use oci_spec::image::{Descriptor, ImageIndex, MediaType}; +use sqlx::types::Json; + +use super::macros::endpoint_fn_7_levels; +use super::response::OciJson; +use crate::registry::digest; +use crate::registry::manifest::OCIManifest; +use crate::registry::server::PROXY_DIR; +use crate::routes::macros::route_7_levels; +use crate::routes::response::errors::Error; +use crate::routes::response::trow_token::TrowToken; +use crate::TrowServerState; + +/* +--- +Listing Referrers +GET /v2//referrers/ + +# Parameters +name - The namespace of the repository +digest - The digest of the manifest specified in the subject field. + +# Query Parameters +(TODO) artifactType: The type of artifact to list referrers for. + + */ +async fn get_referrers( + _auth_user: TrowToken, + State(state): State>, + Path((repo, digest)): Path<(String, String)>, +) -> Result, Error> { + if repo.starts_with(PROXY_DIR) { + return Err(Error::UnsupportedForProxiedRepo); + } + let _ = Digest::try_from_raw(&digest)?; + let referrers = sqlx::query!( + r#" + SELECT json(m.json) as "content!: Json", + m.digest, + length(m.blob) as "size!: i64" + FROM manifest m + INNER JOIN repo_blob_association rba ON rba.blob_digest = m.digest + WHERE rba.repo_name = $1 + AND (m.json -> 'subject' ->> 'digest') = $2 + "#, + repo, + digest + ) + .fetch_all(&mut *state.db.acquire().await?) + .await?; + + let mut descriptors = vec![]; + for row in referrers { + let parsed_manifest = row.content.0; + + let mediatype = parsed_manifest + .media_type() + .clone() + .unwrap_or(MediaType::ImageConfig); + + let mut descriptor = Descriptor::new( + mediatype, + row.size as u64, + oci_spec::image::Digest::from_str(&row.digest).unwrap(), + ); + descriptor.set_artifact_type(parsed_manifest.artifact_type()); + descriptor.set_annotations(parsed_manifest.annotations().clone()); + descriptors.push(descriptor); + } + + let mut response_manifest = ImageIndex::default(); + response_manifest.set_manifests(descriptors); + response_manifest.set_media_type(Some(MediaType::ImageIndex)); + let content_type = response_manifest.media_type().as_ref().unwrap().as_ref(); + + Ok(OciJson::new(&response_manifest).set_content_type(content_type)) +} + +endpoint_fn_7_levels!( + get_referrers( + auth_user: TrowToken, + state: State>; + path: [image_name, reference: String] + ) -> Result, Error> +); + +pub fn route(mut app: Router>) -> Router> { + #[rustfmt::skip] + route_7_levels!( + app, + "/v2" "/referrers/{digest}", + get(get_referrers, get_referrers_2level, get_referrers_3level, get_referrers_4level, get_referrers_5level, get_referrers_6level, get_referrers_7level) + ); + app +} + +#[cfg(test)] +mod tests { + use std::str::FromStr; + + use axum::body::Body; + use hyper::{Request, StatusCode}; + use oci_spec::image::{Descriptor, ImageIndex, ImageManifestBuilder, MediaType}; + use test_temp_dir::test_temp_dir; + use tower::ServiceExt; + + use super::*; + use crate::test_utilities; + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_get_referrers() { + let tmp_dir = test_temp_dir!(); + let (state, router) = test_utilities::trow_router(|_| {}, &tmp_dir).await; + + let man_referred = serde_json::to_vec(&ImageIndex::default()).unwrap(); + let man_referred_digest = Digest::digest_sha256_slice(&man_referred); + let subj = serde_json::to_vec( + &ImageManifestBuilder::default() + .schema_version(2u32) + .layers([]) + .media_type(MediaType::ImageManifest) + .config(Descriptor::new( + MediaType::EmptyJSON, + 2, + oci_spec::image::Digest::from_str( + "sha256:44136fa355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", + ) + .unwrap(), + )) + .subject(Descriptor::new( + MediaType::ImageIndex, + 2, + oci_spec::image::Digest::from_str(man_referred_digest.as_str()).unwrap(), + )) + .build() + .unwrap(), + ) + .unwrap(); + let subj_digest = Digest::digest_sha256_slice(&subj); + let nosubj = serde_json::to_vec( + &ImageManifestBuilder::default() + .schema_version(2u32) + .layers([]) + .media_type(MediaType::ImageManifest) + .config(Descriptor::new( + MediaType::EmptyJSON, + 2, + oci_spec::image::Digest::from_str( + "sha256:1111111355b3678a1146ad16f7e8649e94fb4fc21fe77e8310c060f61caaff8a", + ) + .unwrap(), + )) + .build() + .unwrap(), + ) + .unwrap(); + + for man in [man_referred, subj, nosubj] { + let digest = Digest::digest_sha256_slice(&man).to_string(); + sqlx::query!( + "INSERT INTO manifest (digest, json, blob) VALUES ($1, jsonb($2), $2)", + digest, + man, + ) + .execute(&mut *state.db.acquire().await.unwrap()) + .await + .unwrap(); + sqlx::query!( + r#"INSERT INTO repo_blob_association (repo_name, blob_digest) VALUES ("test", $1)"#, + digest + ) + .execute(&mut *state.db.acquire().await.unwrap()) + .await + .unwrap(); + } + + let resp = router + .oneshot( + Request::get(format!("/v2/test/referrers/{man_referred_digest}")) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::OK, + "unexpected status code: {resp:?}" + ); + let val: ImageIndex = test_utilities::response_body_json(resp).await; + let descriptors = val.manifests(); + assert_eq!(descriptors.len(), 1); + assert_eq!( + descriptors[0].media_type().clone(), + MediaType::ImageManifest + ); + assert_eq!(descriptors[0].digest().as_ref(), subj_digest.as_str()); + } +} diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 24ae4c85..793cffbb 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -5,6 +5,7 @@ mod blob_upload; mod catalog; mod health; mod manifest; +mod manifest_referrers; mod readiness; // helpers @@ -131,6 +132,7 @@ pub fn create_app(state: Arc) -> Router { app = blob_upload::route(app); app = catalog::route(app); app = manifest::route(app); + app = manifest_referrers::route(app); app = admission::route(app); app = add_router_layers(app, &state.config.cors); diff --git a/src/routes/response/mod.rs b/src/routes/response/mod.rs index 2e90d3b6..9128d272 100644 --- a/src/routes/response/mod.rs +++ b/src/routes/response/mod.rs @@ -23,8 +23,6 @@ use axum::http::{header, HeaderValue}; use axum::response::{IntoResponse, Response}; use bytes::Bytes; -use crate::registry::Digest; - #[derive(Debug, Default)] pub struct OciJson { response: Response, @@ -60,10 +58,10 @@ where } } - pub fn set_digest(mut self, digest: &Digest) -> Self { + pub fn set_digest>(mut self, digest: D) -> Self { self.response.headers_mut().insert( "Docker-Content-Digest", - HeaderValue::from_str(digest.as_str()).unwrap(), + HeaderValue::from_str(digest.as_ref()).unwrap(), ); self } diff --git a/src/routes/response/verified_manifest.rs b/src/routes/response/verified_manifest.rs index e66efd0b..165844d3 100644 --- a/src/routes/response/verified_manifest.rs +++ b/src/routes/response/verified_manifest.rs @@ -13,13 +13,14 @@ impl IntoResponse for VerifiedManifest { self.repo_name(), self.tag() ); - Response::builder() + let mut rbuilder = Response::builder() .header("Location", location) .header("Docker-Content-Digest", self.digest().to_string()) - .status(StatusCode::CREATED) - .body(body::Body::empty()) - .unwrap() - .into_response() + .status(StatusCode::CREATED); + if let Some(subject) = self.subject() { + rbuilder = rbuilder.header("OCI-Subject", subject); + } + rbuilder.body(body::Body::empty()).unwrap().into_response() } } @@ -41,6 +42,26 @@ mod test { ) .unwrap(), "ref".to_string(), + None, + ) + .into_response(); + assert_eq!(response.status(), StatusCode::CREATED); + } + + #[test] + fn with_subject() { + let response = VerifiedManifest::new( + Some("https://extrality.ai".to_string()), + "repo_name".to_string(), + Digest::try_from_raw( + "sha256:05c6e08f1d9fdafa03147fcb8f82f124c76d2f70e3d989dc8aadb5e7d7450bec", + ) + .unwrap(), + "ref".to_string(), + Some( + "sha256:05c6e08f1d9fdafa03147fcb8f82f124c76d2f70e3d989dc8aadb5e7d7450bec" + .to_string(), + ), ) .into_response(); assert_eq!(response.status(), StatusCode::CREATED); diff --git a/src/test_utilities.rs b/src/test_utilities.rs index 7de6d77f..324a97fb 100644 --- a/src/test_utilities.rs +++ b/src/test_utilities.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use axum::body::Body; use axum::Router; use http_body_util::BodyExt; -use hyper::body::Buf; use hyper::Response; use serde::de::DeserializeOwned; use test_temp_dir::TestTempDir; @@ -28,14 +27,20 @@ pub async fn trow_router( } pub async fn response_body_json(resp: Response) -> T { - let reader = resp + let buf = resp .into_body() .collect() .await .unwrap() - .aggregate() - .reader(); - serde_json::from_reader(reader).unwrap() + .to_bytes() + .to_vec(); + match serde_json::from_slice(&buf) { + Ok(val) => val, + Err(err) => { + let text = String::from_utf8_lossy(&buf); + panic!("unable to deserialize response body: {err:?} {text}"); + } + } } /// test_temp_dir if thread name != module path, which is the case in parametrized tests diff --git a/src/types.rs b/src/types.rs index 0e8e3d25..5fe94803 100644 --- a/src/types.rs +++ b/src/types.rs @@ -123,18 +123,30 @@ pub struct VerifiedManifest { repo_name: String, digest: Digest, tag: String, + subject: Option, } impl VerifiedManifest { - pub fn new(base_url: Option, repo_name: String, digest: Digest, tag: String) -> Self { + pub fn new( + base_url: Option, + repo_name: String, + digest: Digest, + tag: String, + subject: Option, + ) -> Self { Self { base_url, repo_name, digest, tag, + subject, } } + pub fn subject(&self) -> Option<&String> { + self.subject.as_ref() + } + pub fn digest(&self) -> &str { self.digest.as_str() } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 9d91b0f6..eececbca 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -2,7 +2,7 @@ #![allow(dead_code)] // Rustup thinks everything in here is dead code use std::fs::File; -use std::io::{BufReader, Read, Write}; +use std::io::{Read, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -110,7 +110,7 @@ pub async fn upload_fake_image(cl: &Router, name: &str, tag: &str) -> (Digest, D let range = resp.headers().get(RANGE_HEADER).unwrap().to_str().unwrap(); assert_eq!(range, format!("0-{}", (blob.len() - 1))); //note first byte is 0, hence len - 1 - let blob_digest = Digest::digest_sha256(BufReader::new(blob.as_slice())).unwrap(); + let blob_digest = Digest::digest_sha256_slice(blob.as_slice()); let resp = cl .clone() .oneshot( @@ -165,7 +165,7 @@ pub async fn upload_fake_image(cl: &Router, name: &str, tag: &str) -> (Digest, D }}] }}"# ); - let manifest_digest = Digest::digest_sha256(BufReader::new(raw_manifest.as_bytes())).unwrap(); + let manifest_digest = Digest::digest_sha256_slice(raw_manifest.as_bytes()); let _: manifest::OCIManifest = serde_json::from_str(&raw_manifest).unwrap(); let manifest_addr = format!("/v2/{}/manifests/{}", name, tag); diff --git a/tests/proxy_registry.rs b/tests/proxy_registry.rs index 75375c59..0a8ae7fc 100644 --- a/tests/proxy_registry.rs +++ b/tests/proxy_registry.rs @@ -170,12 +170,10 @@ mod proxy_registry { // Special case: docker/library // check that it works and that manifests are written in the correct location get_manifest(&trow, "f/docker/alpine", "3.13.4").await; - let digest = sqlx::query_scalar!("SELECT manifest_digest FROM tag WHERE repo = 'f/docker/library/alpine' AND tag = '3.13.4'") + sqlx::query_scalar!("SELECT manifest_digest FROM tag WHERE repo = 'f/docker/library/alpine' AND tag = '3.13.4'") .fetch_one(&mut *state.db.acquire().await.unwrap()) .await .expect("Tag not found in database"); - let file = data_dir.join(format!("./blobs/{digest}")); - assert!(file.exists()); } #[tokio::test] diff --git a/tests/registry_interface.rs b/tests/registry_interface.rs index 219a1d82..08a328b9 100644 --- a/tests/registry_interface.rs +++ b/tests/registry_interface.rs @@ -2,7 +2,6 @@ mod common; mod registry_interface { - use std::io::BufReader; use axum::body::Body; use axum::http::HeaderValue; @@ -187,7 +186,7 @@ mod registry_interface { //used by oci_manifest_test let config = "{}\n".as_bytes(); - let digest = digest::Digest::digest_sha256(BufReader::new(config)).unwrap(); + let digest = digest::Digest::digest_sha256_slice(config); let loc = &format!("/v2/{}/blobs/uploads/{}?digest={}", name, uuid, digest); let resp = cl @@ -207,7 +206,7 @@ mod registry_interface { async fn upload_blob_with_post(cl: &Router, repo_name: &str) { let blob_content = "{ }\n".as_bytes(); - let digest = digest::Digest::digest_sha256(BufReader::new(blob_content)).unwrap(); + let digest = digest::Digest::digest_sha256_slice(blob_content); let resp = cl .clone() @@ -234,7 +233,7 @@ mod registry_interface { async fn push_oci_manifest(cl: &Router, name: &str, tag: &str) -> String { //Note config was uploaded as blob in earlier test let config = "{ }\n".as_bytes(); - let config_digest = digest::Digest::digest_sha256(BufReader::new(config)).unwrap(); + let config_digest = digest::Digest::digest_sha256_slice(config); let manifest = format!( r#"{{ "mediaType": "application/vnd.oci.image.manifest.v1+json", @@ -258,7 +257,7 @@ mod registry_interface { assert_eq!(resp.status(), StatusCode::CREATED); // Try pulling by digest - let digest = digest::Digest::digest_sha256(BufReader::new(manifest.as_bytes())).unwrap(); + let digest = digest::Digest::digest_sha256_slice(manifest.as_bytes()); digest.to_string() } @@ -295,7 +294,7 @@ mod registry_interface { assert_eq!(resp.status(), StatusCode::CREATED); // Try pulling by digest - let digest = digest::Digest::digest_sha256(BufReader::new(manifest.as_bytes())).unwrap(); + let digest = digest::Digest::digest_sha256_slice(manifest.as_bytes()); digest.to_string() } @@ -306,7 +305,7 @@ mod registry_interface { ) -> String { //Note config was uploaded as blob in earlier test let config = "{ }\n".as_bytes(); - let config_digest = digest::Digest::digest_sha256(BufReader::new(config)).unwrap(); + let config_digest = digest::Digest::digest_sha256_slice(config); upload_blob_with_post(cl, repo_name).await; @@ -341,7 +340,7 @@ mod registry_interface { assert_eq!(resp.status(), StatusCode::CREATED); // Try pulling by digest - let digest = digest::Digest::digest_sha256(BufReader::new(manifest.as_bytes())).unwrap(); + let digest = digest::Digest::digest_sha256_slice(manifest.as_bytes()); digest.to_string() }