diff --git a/Cargo.lock b/Cargo.lock index e3d6989..83e25b8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,12 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" +[[package]] +name = "base64" +version = "0.21.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" + [[package]] name = "bitflags" version = "1.3.2" @@ -352,13 +358,29 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -367,28 +389,62 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.46", +] + [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -775,10 +831,12 @@ dependencies = [ "async-recursion", "async-stream", "async-trait", + "base64 0.21.7", "bytes", "clap", "console", "flate2", + "futures", "futures-core", "http", "hyper", @@ -846,7 +904,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9" dependencies = [ - "base64", + "base64 0.13.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 86fc5ff..6a8da8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,10 +25,12 @@ anyhow = "1.0.79" async-recursion = "1.0.5" async-stream = "0.3.5" async-trait = "0.1.77" +base64 = "0.21.7" bytes = "1.4.0" clap = { version = "4.4.18", features = ["derive", "env"] } console = "0.15.8" flate2 = "1.0.28" +futures = "0.3.30" futures-core = "0.3.30" http = "0.2.11" hyper = { version = "0.14.27", features = ["full"] } diff --git a/src/registry/http/blob.rs b/src/registry/http/blob.rs index 5c503c4..e9fb65d 100644 --- a/src/registry/http/blob.rs +++ b/src/registry/http/blob.rs @@ -5,10 +5,8 @@ use crate::hash::sha256_value::Sha256Value; use crate::registry::ops::BYTES_IN_MB; use crate::registry::BlobStore; use anyhow::{bail, Context, Error}; +use http::StatusCode; use http::Uri; -use http::{Response, StatusCode}; - -use hyper::Body; use indicatif::ProgressBar; use sha2::Digest; @@ -18,19 +16,17 @@ use tokio::sync::Mutex; use tokio_stream::StreamExt; use tokio_util::io::ReaderStream; -use super::util::{dump_body_to_string, redirect_uri_fetch}; +use super::util::dump_body_to_string; #[async_trait::async_trait] impl BlobStore for super::HttpRegistry { async fn blob_exists(&self, digest: &str) -> Result { let uri = self.repository_uri_from_path(format!("/blobs/{}", digest))?; - let mut r = redirect_uri_fetch( - &self.http_client, - |req| req.method(http::Method::HEAD), - &uri, - ) - .await?; + let mut r = self + .http_client + .request_simple(&uri, http::Method::HEAD, 3) + .await?; if r.status() == StatusCode::NOT_FOUND { Ok(false) @@ -56,9 +52,10 @@ impl BlobStore for super::HttpRegistry { let target_file = target_file.to_path_buf(); let uri = self.repository_uri_from_path(format!("/blobs/{}", digest))?; - let mut response = - redirect_uri_fetch(&self.http_client, |req| req.method(http::Method::GET), &uri) - .await?; + let mut response = self + .http_client + .request_simple(&uri, http::Method::GET, 3) + .await?; if response.status() != StatusCode::OK { bail!( @@ -123,11 +120,12 @@ impl BlobStore for super::HttpRegistry { ) -> Result<(), Error> { let post_target_uri = self.repository_uri_from_path("/blobs/uploads/")?; // We expect our POST request to get a location header of where to perform the real upload to. - let req_builder = http::request::Builder::default() - .method(http::Method::POST) - .uri(post_target_uri.clone()); - let request = req_builder.body(Body::from(""))?; - let mut r: Response = self.http_client.request(request).await?; + + let mut r = self + .http_client + .request_simple(&post_target_uri, http::Method::POST, 0) + .await?; + if r.status() != StatusCode::ACCEPTED { bail!( "Expected to get a ACCEPTED/202 for upload post request to {:?}, but got {:?}", @@ -145,41 +143,62 @@ impl BlobStore for super::HttpRegistry { bail!("Was a redirection response code, but missing Location header, invalid response from server, body:\n{:#?}", body); }; - let f = tokio::fs::File::open(local_path).await?; - - let mut file_reader_stream = ReaderStream::new(f); - let total_uploaded_bytes = Arc::new(Mutex::new(0)); - let stream_byte_ref = Arc::clone(&total_uploaded_bytes); - - let stream = async_stream::try_stream! { - - while let Some(chunk) = file_reader_stream.next().await { - let chunk = chunk?; - let mut cntr = stream_byte_ref.lock().await; - *cntr += chunk.len(); - - if let Some(progress_bar) = &progress_bar { - progress_bar.set_position(*cntr as u64 / BYTES_IN_MB); - } - yield chunk - } - }; - - let body = - hyper::Body::wrap_stream::<_, bytes::Bytes, Box>( - stream, - ); - let req_builder = http::request::Builder::default() - .method(http::Method::PUT) - .uri(location_uri.clone()) - .header("Content-Length", length.to_string()) - .header("Content-Type", "application/octet-stream"); - - let request = req_builder.body(body)?; - - let mut r: Response = self.http_client.request(request).await?; + struct Context { + progress_bar: Option, + length: u64, + local_path: std::path::PathBuf, + } + let mut r = self + .http_client + .request( + &location_uri, + Arc::new(Context { + progress_bar, + length, + local_path: local_path.to_path_buf(), + }), + |context, builder| async move { + let f = tokio::fs::File::open(context.local_path.clone()).await?; + + let stream = futures::stream::unfold( + (context.progress_bar.clone(), ReaderStream::new(f), 0), + |(progress_bar_cp, mut reader_stream, read_bytes)| async move { + let nxt_chunk = reader_stream.next().await?; + + match nxt_chunk { + Ok(chunk) => { + let read_bytes: usize = read_bytes + chunk.len(); + if let Some(progress_bar) = &progress_bar_cp { + progress_bar.set_position(read_bytes as u64 / BYTES_IN_MB); + } + Some((Ok(chunk), (progress_bar_cp, reader_stream, read_bytes))) + } + Err(ex) => { + let e: Box = Box::new(ex); + Some((Err(e), (progress_bar_cp, reader_stream, read_bytes))) + } + } + }, + ); + + let body = hyper::Body::wrap_stream::< + _, + bytes::Bytes, + Box, + >(stream); + + builder + .method(http::Method::PUT) + .header("Content-Length", context.length) + .header("Content-Type", "application/octet-stream") + .body(body) + .map_err(|e| e.into()) + }, + 0, + ) + .await?; let total_uploaded_bytes: usize = { let m = total_uploaded_bytes.lock().await; diff --git a/src/registry/http/copy_operations.rs b/src/registry/http/copy_operations.rs index c1d39a3..6a67075 100644 --- a/src/registry/http/copy_operations.rs +++ b/src/registry/http/copy_operations.rs @@ -3,8 +3,6 @@ use anyhow::{bail, Error}; use http::StatusCode; -use super::util::redirect_uri_fetch; - #[async_trait::async_trait] impl CopyOperations for super::HttpRegistry { fn registry_name(&self) -> RegistryName { @@ -21,12 +19,19 @@ impl CopyOperations for super::HttpRegistry { digest, source_registry_name ))?; - let r = redirect_uri_fetch( - &self.http_client, - |req| req.method(http::Method::POST), - &uri, - ) - .await?; + let r = self + .http_client + .request( + &uri, + (), + |_, c| async { + c.method(http::Method::POST) + .body(hyper::Body::from("")) + .map_err(|e| e.into()) + }, + 3, + ) + .await?; if r.status() == StatusCode::CREATED { Ok(()) diff --git a/src/registry/http/http_cli/authentication_flow.rs b/src/registry/http/http_cli/authentication_flow.rs new file mode 100644 index 0000000..e3963e3 --- /dev/null +++ b/src/registry/http/http_cli/authentication_flow.rs @@ -0,0 +1,100 @@ +use crate::registry::http::util::dump_body_to_string; + +use anyhow::Context; + +use http::Uri; + +use hyper::{Body, Client}; + +use serde::{Deserialize, Serialize}; + +use super::private_impl::{run_single_request, BearerConfig}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AuthResponse { + pub token: Option, + pub access_token: Option, + pub expires_in: Option, + pub issued_at: Option, +} + +pub async fn authenticate_request( + auth_fail: &BearerConfig, + inner_client: &Client>, +) -> anyhow::Result { + let mut parts = auth_fail.realm.clone().into_parts(); + let new_query_items = if let Some(scope) = &auth_fail.scope { + format!("service={}&scope={}", auth_fail.service, scope) + } else { + format!("service={}", auth_fail.service) + }; + let existing_path_and_query = parts + .path_and_query + .as_ref() + .map(|e| e.as_str()) + .unwrap_or(""); + let new_path_q = if existing_path_and_query.contains("?") { + format!("{}&{}", existing_path_and_query, new_query_items) + } else { + format!("{}?{}", existing_path_and_query, new_query_items) + }; + parts.path_and_query = Some( + new_path_q + .as_str() + .try_into() + .with_context(|| format!("Failed to parse path and query from {:?}", new_path_q))?, + ); + let new_uri = Uri::from_parts(parts).with_context(|| { + format!( + "Failed to parse uri from installing new path and query of {}", + new_path_q + ) + })?; + // The wiring to supply basic_auth_info is TBD next + // right now it can never be set. Need to use the docker credentials helpers to supply this. + let basic_auth_info: Option = None; + let mut response = run_single_request( + Default::default(), + &new_uri, + basic_auth_info, + |basic_auth_info, builder| async { + use base64::prelude::*; + + let b2 = builder.method(http::Method::GET); + let b3 = if let Some(ai) = basic_auth_info { + b2.header( + "Authorization", + format!("Basic {}", BASE64_STANDARD.encode(ai)), + ) + } else { + b2 + }; + + b3.body(Body::empty()).map_err(|e| e.into()) + }, + &inner_client, + ) + .await + .with_context(|| { + format!( + "Failed to run new request to try authenticate to {:?}", + new_uri + ) + })?; + + if response.status().is_success() { + let response_body = dump_body_to_string(&mut response).await?; + let response_auth_info: AuthResponse = serde_json::from_str(&response_body)?; + return Ok(response_auth_info); + } else { + let try_response_body = dump_body_to_string(&mut response) + .await + .unwrap_or("".to_string()); + anyhow::bail!( + "Failed to authenticate to {:?}, got status code: {:?}, body:\n{}", + new_uri, + response.status(), + try_response_body + ); + } +} diff --git a/src/registry/http/http_cli/mod.rs b/src/registry/http/http_cli/mod.rs new file mode 100644 index 0000000..cb25a84 --- /dev/null +++ b/src/registry/http/http_cli/mod.rs @@ -0,0 +1,102 @@ +mod authentication_flow; +mod private_impl; + +use std::sync::Arc; + +use anyhow::Context; + +use http::Response; +use http::Uri; + +use hyper::{Body, Client}; +use tokio::sync::Mutex; + +use self::authentication_flow::AuthResponse; +use self::private_impl::{run_single_request, RequestFailType}; + +// https://raw.githubusercontent.com/google/go-containerregistry/main/images/credhelper-basic.svg +pub struct HttpCli { + pub inner_client: Client>, + pub credentials: Arc>>, + pub auth_info: Arc>>, +} + +impl HttpCli { + pub async fn request_simple( + &self, + uri: &Uri, + method: http::Method, + retries: usize, + ) -> Result, anyhow::Error> { + self.request( + uri, + method, + |method, c| async { c.method(method).body(Body::from("")).map_err(|e| e.into()) }, + retries, + ) + .await + } + + pub async fn request( + &self, + uri: &Uri, + context: B, + complete_request: F, + retries: usize, + ) -> Result, anyhow::Error> + where + F: Fn(B, http::request::Builder) -> Fut, + Fut: std::future::Future>>, + B: Send + 'static + Sync + Clone, + { + let mut uri = uri.clone(); + let mut attempt = 0; + let mut last_error: Option = None; + while attempt < retries + 1 { + attempt += 1; + match run_single_request( + self.auth_info.clone(), + &uri, + context.clone(), + &complete_request, + &self.inner_client, + ) + .await + { + Ok(o) => return Ok(o), + Err(err) => { + last_error = Some(err); + // Unwrap safe because we set the line right before this. + match &last_error.as_ref().unwrap() { + RequestFailType::Redirection(new_url) => { + uri = new_url.parse::().with_context(|| { + format!("Failed to parse new url {:?}", new_url) + })?; + continue; + } + RequestFailType::ConnectError(_) => continue, + RequestFailType::HyperError(_) => break, // terminal. + RequestFailType::AnyhowError(_) => break, // terminal. + RequestFailType::AuthFailure(auth_fail) => { + let auth_info = authentication_flow::authenticate_request( + auth_fail, + &self.inner_client, + ) + .await?; + let mut ai = self.auth_info.lock().await; + *ai = Some(auth_info); + drop(ai); + attempt -= 1; + continue; + } + } + } + } + } + match last_error { + None => anyhow::bail!("We failed in trying to issue http requests, but we have no last error. Unexpected state. Attempting to query: {:?}", uri), + Some(ex) => + Err(ex).with_context(|| format!("Exhausted attempts, or ran into terminal error issuing http requests to URI: {:?}", uri)) + } + } +} diff --git a/src/registry/http/http_cli/private_impl.rs b/src/registry/http/http_cli/private_impl.rs new file mode 100644 index 0000000..b87b2e5 --- /dev/null +++ b/src/registry/http/http_cli/private_impl.rs @@ -0,0 +1,171 @@ +use std::sync::Arc; + +use anyhow::Context; +use http::Uri; +use http::{Response, StatusCode}; + +use hyper::{Body, Client}; + +use super::authentication_flow::AuthResponse; + +#[derive(Debug, Clone)] +pub struct BearerConfig { + pub realm: Uri, + pub service: String, + pub scope: Option, +} +impl std::fmt::Display for BearerConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{{ realm: {}, service: {}, scope: {} }}", + self.realm, + self.service, + self.scope.as_ref().map(|e| e.as_str()).unwrap_or("") + ) + } +} + +impl BearerConfig { + pub fn from_auth_header(auth_header: &str) -> anyhow::Result { + let mut realm = None; + let mut scope = None; + let mut service = None; + + let mut auth_header = auth_header + .strip_prefix("Bearer") + .ok_or_else(|| anyhow::anyhow!("Invalid auth header"))?; + auth_header = auth_header.trim_start_matches(' '); + for part in auth_header.split(',') { + let mut part = part.split('='); + let key = part + .next() + .ok_or_else(|| anyhow::anyhow!("Invalid auth header"))? + .trim(); + let value = part + .next() + .ok_or_else(|| anyhow::anyhow!("Invalid auth header"))? + .trim() + .trim_matches('"'); + match key { + "realm" => { + realm = Some( + value + .parse() + .with_context(|| format!("Failed to parse realm from {:?}", value))?, + ) + } + "service" => service = Some(value.to_string()), + "scope" => scope = Some(value.to_string()), + _ => (), + } + } + + match (realm, service) { + (Some(realm), Some(service)) => Ok(Self { + realm, + service, + scope, + }), + _ => Err(anyhow::anyhow!("Invalid auth header")), + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum RequestFailType { + #[error("Failed to connect: '{0}'")] + ConnectError(hyper::Error), + #[error("Generic hyper error: '{0}'")] + HyperError(hyper::Error), + #[error("Internal error: '{0}'")] + AnyhowError(anyhow::Error), + #[error("Auth failed: '{0}'")] + AuthFailure(BearerConfig), + #[error("Got a redirection code: '{0}'")] + Redirection(String), +} +impl From for RequestFailType { + fn from(e: anyhow::Error) -> Self { + RequestFailType::AnyhowError(e) + } +} +pub async fn run_single_request( + auth_info: Arc>>, + uri: &Uri, + context: B, + complete_uri: F, + inner_client: &Client>, +) -> Result, RequestFailType> +where + F: Fn(B, http::request::Builder) -> Fut, + Fut: std::future::Future>>, + B: Send + 'static + Sync, +{ + let req_builder = http::request::Builder::default().uri(uri); + + let li = auth_info.lock().await; + let auth_token = li.as_ref().and_then(|e| e.token.clone()); + drop(li); + let req_builder = if let Some(token) = auth_token { + req_builder.header(http::header::AUTHORIZATION, format!("Bearer {}", token)) + } else { + req_builder + }; + let request = complete_uri(context, req_builder).await?; + + let r: Response = match inner_client.request(request).await { + Err(e) => { + if e.is_connect() { + return Err(RequestFailType::ConnectError(e)); + } else { + return Err(RequestFailType::HyperError(e)); + } + } + Ok(r) => { + if r.status() == StatusCode::UNAUTHORIZED { + if let Some(auth_header) = r + .headers() + .get("WWW-Authenticate") + .map(|e| e.to_str().ok()) + .flatten() + { + let b = BearerConfig::from_auth_header(auth_header).with_context(|| { + format!( + "unable to parse auth header when issuing request, got header {:?}", + auth_header + ) + })?; + return Err(RequestFailType::AuthFailure(b)); + } + } + if r.status().is_redirection() { + if let Some(location_header) = r.headers().get(http::header::LOCATION) { + let location_str = location_header.to_str().with_context(|| { + format!("Unable to parse redirection header {:?}", location_header) + })?; + return Err(RequestFailType::Redirection(location_str.to_string())); + } + } + r + } + }; + Ok(r) +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_decode_auth_header() { + let header = "Bearer realm=\"https://auth.docker.io/token\",service=\"registry.docker.io\""; + + let hdr = BearerConfig::from_auth_header(&header).expect("Should be able to decode header"); + assert_eq!( + hdr.realm, + "https://auth.docker.io/token".parse::().unwrap() + ); + assert_eq!(hdr.service, "registry.docker.io"); + assert_eq!(hdr.service, "registry.docker.io"); + } +} diff --git a/src/registry/http/mod.rs b/src/registry/http/mod.rs index ced983f..e56750f 100644 --- a/src/registry/http/mod.rs +++ b/src/registry/http/mod.rs @@ -1,10 +1,13 @@ mod blob; mod copy_operations; +mod http_cli; mod util; +use bytes::Bytes; +use http_cli::HttpCli; use std::time::Duration; use crate::container_specs::manifest::Manifest; -use crate::registry::http::util::{dump_body_to_string, redirect_uri_fetch}; +use crate::registry::http::util::dump_body_to_string; use anyhow::{bail, Context, Error}; use http::Uri; @@ -19,8 +22,6 @@ use self::util::request_path_in_repository_as_string; use super::ContentAndContentType; -type HttpCli = Client>; - pub struct HttpRegistry { registry_uri: Uri, name: String, @@ -44,7 +45,7 @@ impl super::RegistryCore for HttpRegistry { manifest: &Manifest, tag: &str, ) -> Result, Error> { - let manifest_bytes = manifest.to_bytes()?; + let manifest_bytes = Bytes::from(manifest.to_bytes()?); if let Ok(content_and_type) = self.fetch_manifest_as_string(tag).await { if manifest_bytes == content_and_type.content.as_bytes() { @@ -53,13 +54,27 @@ impl super::RegistryCore for HttpRegistry { } let post_target_uri = self.repository_uri_from_path(format!("/manifests/{}", tag))?; - let req_builder = http::request::Builder::default() - .method(http::Method::PUT) - .uri(post_target_uri.clone()) - .header("Content-Type", manifest.media_type()); - let request = req_builder.body(Body::from(manifest_bytes))?; - let mut r: Response = self.http_client.request(request).await?; + let response = self + .http_client + .request( + &post_target_uri, + (), + |_, builder| async { + builder + .method(http::Method::PUT) + .header("Content-Type", manifest.media_type()) + .body(Body::from(manifest_bytes.clone())) + .map_err(|e| e.into()) + }, + 0, + ) + .await; + + eprintln!("Got response: {:?}", response); + + let mut r: Response = response?; + eprintln!("Got response code: {:?}", r.status()); if r.status() != StatusCode::CREATED { bail!("Expected to get status code CREATED, but got {:#?}, hitting url: {:#?},\nUploading {:#?}\nResponse:{}", r.status(), post_target_uri, manifest, dump_body_to_string(&mut r).await? ) @@ -106,15 +121,18 @@ impl HttpRegistry { let reg = HttpRegistry { registry_uri: registry_uri.clone(), name: name.as_ref().to_string(), - http_client, + http_client: HttpCli { + inner_client: http_client, + credentials: Default::default(), + auth_info: Default::default(), + }, }; let req_uri = reg.v2_from_path("/")?; - let req_future = redirect_uri_fetch( - ®.http_client, - |req| req.method(http::Method::HEAD), - &req_uri, - ); + + let req_future = reg + .http_client + .request_simple(&req_uri, http::Method::HEAD, 3); let mut resp = match timeout(Duration::from_millis(4000), req_future).await { Err(_) => bail!( diff --git a/src/registry/http/util.rs b/src/registry/http/util.rs index b5904a9..4323773 100644 --- a/src/registry/http/util.rs +++ b/src/registry/http/util.rs @@ -1,7 +1,5 @@ -use std::time::Duration; - use crate::registry::ContentAndContentType; -use anyhow::{bail, Context, Error}; +use anyhow::{bail, Error}; use http::Uri; use http::{Response, StatusCode}; use hyper::body::HttpBody as _; @@ -12,73 +10,6 @@ use tokio::io::AsyncWriteExt; use super::HttpCli; -#[async_recursion::async_recursion] -async fn inner_redirect_uri_fetch( - client: &HttpCli, - configure_request_builder: F, - uri: &Uri, - retries: usize, -) -> Result, Error> -where - F: Fn(http::request::Builder) -> http::request::Builder + Send + Sync, -{ - let req_builder = http::request::Builder::default().uri(uri); - let req_builder = configure_request_builder(req_builder); - - let request = req_builder.body(Body::from(""))?; - - let r: Response = match client.request(request).await { - Err(e) => { - if e.is_connect() && retries < 10 { - tokio::time::sleep(Duration::from_millis((retries * 2) as u64)).await; - return inner_redirect_uri_fetch( - client, - configure_request_builder, - uri, - retries + 1, - ) - .await; - } else { - return Err(e.into()); - } - } - Ok(r) => r, - }; - - let status = r.status(); - if status.is_redirection() { - if let Some(location_header) = r.headers().get(http::header::LOCATION) { - let location_str = location_header.to_str()?; - return inner_redirect_uri_fetch( - client, - configure_request_builder, - &location_str.parse::()?, - retries, - ) - .await - .with_context(|| { - format!( - "Failure when attempting to query, url we were redirected to {}", - location_str - ) - }); - } - } - - Ok(r) -} - -pub(super) async fn redirect_uri_fetch( - client: &HttpCli, - configure_request_builder: F, - uri: &Uri, -) -> Result, Error> -where - F: Fn(http::request::Builder) -> http::request::Builder + Send + Sync, -{ - inner_redirect_uri_fetch(client, configure_request_builder, uri, 0).await -} - pub(super) async fn dump_body_to_string(response: &mut Response) -> Result { let mut buffer = Vec::default(); while let Some(chunk) = response.body_mut().data().await { @@ -92,17 +23,23 @@ pub(super) async fn request_path_in_repository_as_string( client: &HttpCli, uri: &Uri, ) -> Result { - let mut r = redirect_uri_fetch( - client, - |req| { - req.header( - "Accept", - "application/vnd.docker.distribution.manifest.v2+json", - ) - }, - uri, - ) - .await?; + let mut r = client + .request( + &uri, + (), + |_, c| async { + c.method(http::Method::GET) + .header( + "Accept", + "application/vnd.docker.distribution.manifest.v2+json", + ) + .body(Body::from("")) + .map_err(|e| e.into()) + }, + 3, + ) + .await?; + let metadata = dump_body_to_string(&mut r).await?; let status = r.status();