Skip to content

Commit

Permalink
ducks adds backports and deprecated features (#1107)
Browse files Browse the repository at this point in the history
### What does this PR do?

I'm attempting to upgrade hyper et al in our project. This is a delicate
business and I want to go slowly. I'm following the migration guide
one crate at a time.
  • Loading branch information
blt authored Nov 12, 2024
1 parent f64a74d commit c38e915
Show file tree
Hide file tree
Showing 11 changed files with 30 additions and 25 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ serde_json = { version = "1.0", features = ["std"] }
thiserror = { version = "1.0" }
tokio = { version = "1.41" }
tracing = { version = "0.1" }
uuid = { version = "1.11", default-features = false, features = ["v4", "serde"] }
uuid = { version = "1.11", default-features = false, features = [
"v4",
"serde",
] }
once_cell = { version = "1.20" }
hyper = { version = "0.14", default-features = false }

[profile.release]
lto = true # Optimize our binary at link stage.
Expand Down
2 changes: 1 addition & 1 deletion integration/ducks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ publish = false
anyhow = "1.0"
bytes = { workspace = true }
entropy = "0.4"
hyper = { version = "0.14", features = ["server"] }
hyper = { workspace = true, features = ["server", "backports", "deprecated"] }
once_cell = { workspace = true }
shared = { path = "../shared" }
sketches-ddsketch = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions integration/ducks/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
use anyhow::Context;
use bytes::BytesMut;
use hyper::{
body,
body::HttpBody,
server::conn::{AddrIncoming, AddrStream},
service::{make_service_fn, service_fn},
Body, Method, Request, StatusCode,
Expand Down Expand Up @@ -127,7 +127,7 @@ impl From<&SocketCounters> for SocketMetrics {
#[tracing::instrument(level = "trace")]
async fn http_req_handler(req: Request<Body>) -> Result<hyper::Response<Body>, hyper::Error> {
let (parts, body) = req.into_parts();
let body = body::to_bytes(body).await?;
let body = body.collect().await?.to_bytes();

{
let metric = HTTP_COUNTERS.get().expect("HTTP_COUNTERS not initialized");
Expand Down
2 changes: 1 addition & 1 deletion lading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ futures = "0.3.31"
fuser = { version = "0.15", optional = true }
http = "0.2"
http-serde = "1.1"
hyper = { version = "0.14", features = ["client"] }
hyper = { workspace = true, features = ["client", "backports", "deprecated"] }
is_executable = "1.0.4"
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions lading/src/blackhole/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use std::{net::SocketAddr, time::Duration};

use http::{header::InvalidHeaderValue, status::InvalidStatusCode, HeaderMap};
use hyper::{
body, header,
body::HttpBody,
header,
server::conn::{AddrIncoming, AddrStream},
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
Expand Down Expand Up @@ -137,7 +138,7 @@ async fn srv(

let (parts, body) = req.into_parts();

let bytes = body::to_bytes(body).await?;
let bytes = body.collect().await?.to_bytes();

match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
Err(response) => Ok(response),
Expand Down
5 changes: 3 additions & 2 deletions lading/src/blackhole/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use std::{
};

use hyper::{
body, header,
body::HttpBody,
header,
server::conn::{AddrIncoming, AddrStream},
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server, StatusCode,
Expand Down Expand Up @@ -93,7 +94,7 @@ async fn srv(
metrics::counter!("requests_received", &*labels).increment(1);

let (parts, body) = req.into_parts();
let bytes = body::to_bytes(body).await?;
let bytes = body.collect().await?.to_bytes();

match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) {
Err(response) => Ok(response),
Expand Down
4 changes: 2 additions & 2 deletions lading/src/blackhole/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use std::{fmt::Write, net::SocketAddr};

use hyper::{
body,
body::HttpBody,
server::conn::{AddrIncoming, AddrStream},
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
Expand Down Expand Up @@ -243,7 +243,7 @@ async fn srv(
) -> Result<Response<Body>, Error> {
requests_received.increment(1);

let bytes = body::to_bytes(req).await?;
let bytes = req.collect().await?.to_bytes();
bytes_received.increment(bytes.len() as u64);

let action: Action = serde_qs::from_bytes(&bytes)?;
Expand Down
19 changes: 9 additions & 10 deletions lading/src/generator/splunk_hec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

mod acknowledgements;

use std::{num::NonZeroU32, thread, time::Duration};
use std::{future::ready, num::NonZeroU32, thread, time::Duration};

use acknowledgements::Channels;
use byte_unit::ByteError;
use http::{
header::{AUTHORIZATION, CONTENT_LENGTH},
Method, Request, Uri,
};
use hyper::{client::HttpConnector, Body, Client};
use hyper::{body::HttpBody, client::HttpConnector, Body, Client};
use lading_throttle::Throttle;
use metrics::{counter, gauge};
use once_cell::sync::OnceCell;
Expand Down Expand Up @@ -120,6 +120,9 @@ pub enum Error {
/// Wrapper around [`acknowledgements::Error`]
#[error(transparent)]
Acknowledge(#[from] acknowledgements::Error),
/// Wrapper around [`hyper::Error`].
#[error("HTTP error: {0}")]
Hyper(#[from] hyper::Error),
}

/// Defines a task that emits variant lines to a Splunk HEC server controlling
Expand Down Expand Up @@ -337,14 +340,10 @@ async fn send_hec_request(
let mut status_labels = labels.clone();
status_labels.push(("status_code".to_string(), status.as_u16().to_string()));
counter!("request_ok", &status_labels).increment(1);
channel
.send(async {
let body_bytes = hyper::body::to_bytes(body).await.expect("unable to convert response body to bytes");
let hec_ack_response =
serde_json::from_slice::<HecAckResponse>(&body_bytes).expect("unable to parse response body");
hec_ack_response.ack_id
})
.await?;
let body_bytes = body.collect().await?.to_bytes();
let hec_ack_response =
serde_json::from_slice::<HecAckResponse>(&body_bytes).expect("unable to parse response body");
channel.send(ready(hec_ack_response.ack_id)).await?;
}
Err(err) => {
let mut error_labels = labels.clone();
Expand Down
4 changes: 2 additions & 2 deletions lading/src/generator/splunk_hec/acknowledgements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use futures::Future;
use http::{header::AUTHORIZATION, Method, Request, StatusCode, Uri};
use hyper::{client::HttpConnector, Body, Client};
use hyper::{body::HttpBody, client::HttpConnector, Body, Client};
use metrics::counter;
use rustc_hash::FxHashMap;
use serde::Deserialize;
Expand Down Expand Up @@ -202,7 +202,7 @@ async fn ack_request(
let status = parts.status;
counter!("ack_status_request_ok", "channel_id" => channel_id.clone(), "status" => status.to_string()).increment(1);
if status == StatusCode::OK {
let body = hyper::body::to_bytes(body).await?;
let body = body.collect().await?.to_bytes();
let ack_status = serde_json::from_slice::<HecAckStatusResponse>(&body)?;

let mut ack_ids_acked: u32 = 0;
Expand Down
2 changes: 1 addition & 1 deletion lading_signal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ loom = { version = "0.7", features = ["futures", "checkpoint"] }
doctest = false

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }
2 changes: 1 addition & 1 deletion lading_throttle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ tokio = { workspace = true, features = ["time"] }
doctest = false

[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(kani)'] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(kani)'] }

0 comments on commit c38e915

Please sign in to comment.