diff --git a/Cargo.toml b/Cargo.toml index 9c80efcde..c3294e6a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,8 +26,12 @@ serde_json = { version = "1.0", features = ["std"] } thiserror = { version = "1.0" } tokio = { version = "1.41" } tracing = { version = "0.1" } -uuid = { version = "1.11", default-features = false, features = ["v4", "serde"] } +uuid = { version = "1.11", default-features = false, features = [ + "v4", + "serde", +] } once_cell = { version = "1.20" } +hyper = { version = "0.14", default-features = false } [profile.release] lto = true # Optimize our binary at link stage. diff --git a/integration/ducks/Cargo.toml b/integration/ducks/Cargo.toml index ab7a081fd..0ac552e66 100644 --- a/integration/ducks/Cargo.toml +++ b/integration/ducks/Cargo.toml @@ -11,7 +11,7 @@ publish = false anyhow = "1.0" bytes = { workspace = true } entropy = "0.4" -hyper = { version = "0.14", features = ["server"] } +hyper = { workspace = true, features = ["server", "backports", "deprecated"] } once_cell = { workspace = true } shared = { path = "../shared" } sketches-ddsketch = "0.3" diff --git a/integration/ducks/src/main.rs b/integration/ducks/src/main.rs index 849bb5bf7..a93063525 100644 --- a/integration/ducks/src/main.rs +++ b/integration/ducks/src/main.rs @@ -16,7 +16,7 @@ use anyhow::Context; use bytes::BytesMut; use hyper::{ - body, + body::HttpBody, server::conn::{AddrIncoming, AddrStream}, service::{make_service_fn, service_fn}, Body, Method, Request, StatusCode, @@ -127,7 +127,7 @@ impl From<&SocketCounters> for SocketMetrics { #[tracing::instrument(level = "trace")] async fn http_req_handler(req: Request) -> Result, hyper::Error> { let (parts, body) = req.into_parts(); - let body = body::to_bytes(body).await?; + let body = body.collect().await?.to_bytes(); { let metric = HTTP_COUNTERS.get().expect("HTTP_COUNTERS not initialized"); diff --git a/lading/Cargo.toml b/lading/Cargo.toml index 5c2360931..662364e3a 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -37,7 +37,7 @@ futures = "0.3.31" fuser = { version = "0.15", optional = true } http = "0.2" http-serde = "1.1" -hyper = { version = "0.14", features = ["client"] } +hyper = { workspace = true, features = ["client", "backports", "deprecated"] } is_executable = "1.0.4" metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } diff --git a/lading/src/blackhole/http.rs b/lading/src/blackhole/http.rs index dff1e3ccd..61a817d0f 100644 --- a/lading/src/blackhole/http.rs +++ b/lading/src/blackhole/http.rs @@ -10,7 +10,8 @@ use std::{net::SocketAddr, time::Duration}; use http::{header::InvalidHeaderValue, status::InvalidStatusCode, HeaderMap}; use hyper::{ - body, header, + body::HttpBody, + header, server::conn::{AddrIncoming, AddrStream}, service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, @@ -137,7 +138,7 @@ async fn srv( let (parts, body) = req.into_parts(); - let bytes = body::to_bytes(body).await?; + let bytes = body.collect().await?.to_bytes(); match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) { Err(response) => Ok(response), diff --git a/lading/src/blackhole/splunk_hec.rs b/lading/src/blackhole/splunk_hec.rs index 02cbae751..d799c2246 100644 --- a/lading/src/blackhole/splunk_hec.rs +++ b/lading/src/blackhole/splunk_hec.rs @@ -16,7 +16,8 @@ use std::{ }; use hyper::{ - body, header, + body::HttpBody, + header, server::conn::{AddrIncoming, AddrStream}, service::{make_service_fn, service_fn}, Body, Method, Request, Response, Server, StatusCode, @@ -93,7 +94,7 @@ async fn srv( metrics::counter!("requests_received", &*labels).increment(1); let (parts, body) = req.into_parts(); - let bytes = body::to_bytes(body).await?; + let bytes = body.collect().await?.to_bytes(); match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) { Err(response) => Ok(response), diff --git a/lading/src/blackhole/sqs.rs b/lading/src/blackhole/sqs.rs index 39ab608a3..dbf15936d 100644 --- a/lading/src/blackhole/sqs.rs +++ b/lading/src/blackhole/sqs.rs @@ -9,7 +9,7 @@ use std::{fmt::Write, net::SocketAddr}; use hyper::{ - body, + body::HttpBody, server::conn::{AddrIncoming, AddrStream}, service::{make_service_fn, service_fn}, Body, Request, Response, Server, StatusCode, @@ -243,7 +243,7 @@ async fn srv( ) -> Result, Error> { requests_received.increment(1); - let bytes = body::to_bytes(req).await?; + let bytes = req.collect().await?.to_bytes(); bytes_received.increment(bytes.len() as u64); let action: Action = serde_qs::from_bytes(&bytes)?; diff --git a/lading/src/generator/splunk_hec.rs b/lading/src/generator/splunk_hec.rs index 698713e78..062982605 100644 --- a/lading/src/generator/splunk_hec.rs +++ b/lading/src/generator/splunk_hec.rs @@ -16,7 +16,7 @@ mod acknowledgements; -use std::{num::NonZeroU32, thread, time::Duration}; +use std::{future::ready, num::NonZeroU32, thread, time::Duration}; use acknowledgements::Channels; use byte_unit::ByteError; @@ -24,7 +24,7 @@ use http::{ header::{AUTHORIZATION, CONTENT_LENGTH}, Method, Request, Uri, }; -use hyper::{client::HttpConnector, Body, Client}; +use hyper::{body::HttpBody, client::HttpConnector, Body, Client}; use lading_throttle::Throttle; use metrics::{counter, gauge}; use once_cell::sync::OnceCell; @@ -120,6 +120,9 @@ pub enum Error { /// Wrapper around [`acknowledgements::Error`] #[error(transparent)] Acknowledge(#[from] acknowledgements::Error), + /// Wrapper around [`hyper::Error`]. + #[error("HTTP error: {0}")] + Hyper(#[from] hyper::Error), } /// Defines a task that emits variant lines to a Splunk HEC server controlling @@ -337,14 +340,10 @@ async fn send_hec_request( let mut status_labels = labels.clone(); status_labels.push(("status_code".to_string(), status.as_u16().to_string())); counter!("request_ok", &status_labels).increment(1); - channel - .send(async { - let body_bytes = hyper::body::to_bytes(body).await.expect("unable to convert response body to bytes"); - let hec_ack_response = - serde_json::from_slice::(&body_bytes).expect("unable to parse response body"); - hec_ack_response.ack_id - }) - .await?; + let body_bytes = body.collect().await?.to_bytes(); + let hec_ack_response = + serde_json::from_slice::(&body_bytes).expect("unable to parse response body"); + channel.send(ready(hec_ack_response.ack_id)).await?; } Err(err) => { let mut error_labels = labels.clone(); diff --git a/lading/src/generator/splunk_hec/acknowledgements.rs b/lading/src/generator/splunk_hec/acknowledgements.rs index a0dd4fc08..ab13629d7 100644 --- a/lading/src/generator/splunk_hec/acknowledgements.rs +++ b/lading/src/generator/splunk_hec/acknowledgements.rs @@ -3,7 +3,7 @@ use std::time::Duration; use futures::Future; use http::{header::AUTHORIZATION, Method, Request, StatusCode, Uri}; -use hyper::{client::HttpConnector, Body, Client}; +use hyper::{body::HttpBody, client::HttpConnector, Body, Client}; use metrics::counter; use rustc_hash::FxHashMap; use serde::Deserialize; @@ -202,7 +202,7 @@ async fn ack_request( let status = parts.status; counter!("ack_status_request_ok", "channel_id" => channel_id.clone(), "status" => status.to_string()).increment(1); if status == StatusCode::OK { - let body = hyper::body::to_bytes(body).await?; + let body = body.collect().await?.to_bytes(); let ack_status = serde_json::from_slice::(&body)?; let mut ack_ids_acked: u32 = 0; diff --git a/lading_signal/Cargo.toml b/lading_signal/Cargo.toml index d22631bfe..663d03732 100644 --- a/lading_signal/Cargo.toml +++ b/lading_signal/Cargo.toml @@ -24,4 +24,4 @@ loom = { version = "0.7", features = ["futures", "checkpoint"] } doctest = false [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } \ No newline at end of file +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } diff --git a/lading_throttle/Cargo.toml b/lading_throttle/Cargo.toml index dcb0fdd6e..f84a47001 100644 --- a/lading_throttle/Cargo.toml +++ b/lading_throttle/Cargo.toml @@ -22,4 +22,4 @@ tokio = { workspace = true, features = ["time"] } doctest = false [lints.rust] -unexpected_cfgs = { level = "warn", check-cfg = ['cfg(kani)'] } \ No newline at end of file +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(kani)'] }