Skip to content

Commit

Permalink
feat(policy): Add http protocol configuration
Browse files Browse the repository at this point in the history
This adds `"http"` and `"kubernetes.io/h2c"` as a valid values for service port `appProtocol`.

Signed-off-by: Scott Fleener <[email protected]>
  • Loading branch information
sfleen committed Feb 27, 2025
1 parent 4c471cf commit 7a18c94
Show file tree
Hide file tree
Showing 8 changed files with 487 additions and 22 deletions.
4 changes: 4 additions & 0 deletions policy-controller/core/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ pub type RouteSet<T> = HashMap<GroupKindNamespaceName, T>;

#[derive(Debug, Clone, PartialEq)]
pub enum AppProtocol {
Http1,
Http2,
Opaque,
Unknown(Arc<str>),
}
Expand All @@ -53,6 +55,8 @@ impl FromStr for AppProtocol {

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let protocol = match s.to_ascii_lowercase().as_str() {
"http" => AppProtocol::Http1,
"kubernetes.io/h2c" => AppProtocol::Http2,
"linkerd.io/tcp" | "linkerd.io/opaque" => AppProtocol::Opaque,
protocol => AppProtocol::Unknown(Arc::from(protocol)),
};
Expand Down
65 changes: 46 additions & 19 deletions policy-controller/grpc/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,37 +372,64 @@ fn to_proto(
) -> outbound::OutboundPolicy {
let backend: outbound::Backend = default_backend(&policy, original_dst);

let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
kind: Some(match accrual {
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
max_failures,
backoff,
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
outbound::failure_accrual::ConsecutiveFailures {
max_failures,
backoff: Some(outbound::ExponentialBackoff {
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
jitter_ratio: backoff.jitter,
}),
},
),
}),
});

let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();

let kind = match &policy.app_protocol {
Some(AppProtocol::Opaque) => {
outbound::proxy_protocol::Kind::Opaque(outbound::proxy_protocol::Opaque {
routes: vec![default_outbound_opaq_route(backend, &policy.parent_info)],
})
}
Some(AppProtocol::Http1) => {
http_routes.sort_by(timestamp_then_name);
http::http1_only_protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
}
Some(AppProtocol::Http2) => {
http_routes.sort_by(timestamp_then_name);
http::http2_only_protocol(
backend,
http_routes.into_iter(),
accrual,
policy.http_retry.clone(),
policy.timeouts.clone(),
allow_l5d_request_headers,
&policy.parent_info,
original_dst,
)
}
None | Some(AppProtocol::Unknown(_)) => {
if let Some(AppProtocol::Unknown(protocol)) = &policy.app_protocol {
tracing::debug!(resource = ?policy.parent_info, port = policy.port.get(), "Unknown appProtocol \"{protocol}\"");
}

let accrual = policy.accrual.map(|accrual| outbound::FailureAccrual {
kind: Some(match accrual {
linkerd_policy_controller_core::outbound::FailureAccrual::Consecutive {
max_failures,
backoff,
} => outbound::failure_accrual::Kind::ConsecutiveFailures(
outbound::failure_accrual::ConsecutiveFailures {
max_failures,
backoff: Some(outbound::ExponentialBackoff {
min_backoff: convert_duration("min_backoff", backoff.min_penalty),
max_backoff: convert_duration("max_backoff", backoff.max_penalty),
jitter_ratio: backoff.jitter,
}),
},
),
}),
});

let mut grpc_routes = policy.grpc_routes.clone().into_iter().collect::<Vec<_>>();
let mut http_routes = policy.http_routes.clone().into_iter().collect::<Vec<_>>();
let mut tls_routes = policy.tls_routes.clone().into_iter().collect::<Vec<_>>();
let mut tcp_routes = policy.tcp_routes.clone().into_iter().collect::<Vec<_>>();

Expand Down
97 changes: 97 additions & 0 deletions policy-controller/grpc/src/outbound/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,103 @@ pub(crate) fn protocol(
})
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn http1_only_protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> outbound::proxy_protocol::Kind {
outbound::proxy_protocol::Kind::Http1(outbound::proxy_protocol::Http1 {
routes: base_http_routes(
default_backend,
routes,
service_retry,
service_timeouts,
allow_l5d_request_headers,
parent_info,
original_dst,
),
failure_accrual: accrual,
})
}

#[allow(clippy::too_many_arguments)]
pub(crate) fn http2_only_protocol(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
accrual: Option<outbound::FailureAccrual>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> outbound::proxy_protocol::Kind {
outbound::proxy_protocol::Kind::Http2(outbound::proxy_protocol::Http2 {
routes: base_http_routes(
default_backend,
routes,
service_retry,
service_timeouts,
allow_l5d_request_headers,
parent_info,
original_dst,
),
failure_accrual: accrual,
})
}

fn base_http_routes(
default_backend: outbound::Backend,
routes: impl Iterator<Item = (GroupKindNamespaceName, HttpRoute)>,
service_retry: Option<RouteRetry<HttpRetryCondition>>,
service_timeouts: RouteTimeouts,
allow_l5d_request_headers: bool,
parent_info: &ParentInfo,
original_dst: Option<SocketAddr>,
) -> Vec<outbound::HttpRoute> {
let mut routes = routes
.map(|(gknn, route)| {
convert_outbound_route(
gknn,
route,
default_backend.clone(),
service_retry.clone(),
service_timeouts.clone(),
allow_l5d_request_headers,
parent_info,
original_dst,
)
})
.collect::<Vec<_>>();

match parent_info {
ParentInfo::Service { .. } => {
if routes.is_empty() {
routes.push(default_outbound_service_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
));
}
}
ParentInfo::EgressNetwork { traffic_policy, .. } => {
routes.push(default_outbound_egress_route(
default_backend,
service_retry.clone(),
service_timeouts.clone(),
traffic_policy,
));
}
}

routes
}

#[allow(clippy::too_many_arguments)]
fn convert_outbound_route(
gknn: GroupKindNamespaceName,
Expand Down
40 changes: 40 additions & 0 deletions policy-test/src/outbound_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,46 @@ where
}
}

#[track_caller]
pub fn http1_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::HttpRoute] {
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Http1(grpc::outbound::proxy_protocol::Http1 {
routes,
failure_accrual: _,
}) = kind
{
routes
} else {
panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}")
}
}

#[track_caller]
pub fn http2_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::HttpRoute] {
let kind = config
.protocol
.as_ref()
.expect("must have proxy protocol")
.kind
.as_ref()
.expect("must have kind");
if let grpc::outbound::proxy_protocol::Kind::Http2(grpc::outbound::proxy_protocol::Http2 {
routes,
failure_accrual: _,
}) = kind
{
routes
} else {
panic!("proxy protocol must be Grpc; actually got:\n{kind:#?}")
}
}

#[track_caller]
pub fn grpc_routes(config: &grpc::outbound::OutboundPolicy) -> &[grpc::outbound::GrpcRoute] {
let kind = config
Expand Down
75 changes: 74 additions & 1 deletion policy-test/tests/outbound_api_app_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use linkerd_policy_controller_k8s_api::gateway;
use linkerd_policy_test::{
assert_resource_meta, create,
outbound_api::{
assert_route_is_default, assert_singleton, retry_watch_outbound_policy, tcp_routes,
assert_route_is_default, assert_singleton, http1_routes, http2_routes,
retry_watch_outbound_policy, tcp_routes,
},
test_route::TestParent,
with_temp_ns,
Expand Down Expand Up @@ -45,3 +46,75 @@ async fn opaque_parent() {

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http1_parent() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
// let parent = P::create_parent(&client.clone(), &ns).await;
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("http".to_string())),
)
.await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = http1_routes(&config);
let route = assert_singleton(routes);
assert_route_is_default::<gateway::HTTPRoute>(route, &parent.obj_ref(), port);
})
.await;
}

test::<k8s::Service>().await;
}

#[tokio::test(flavor = "current_thread")]
async fn http2_parent() {
async fn test<P: TestParent>() {
tracing::debug!(
parent = %P::kind(&P::DynamicType::default()),
);
with_temp_ns(|client, ns| async move {
let port = 4191;
// Create a parent with no routes.
// let parent = P::create_parent(&client.clone(), &ns).await;
let parent = create(
&client,
P::make_parent_with_protocol(&ns, Some("kubernetes.io/h2c".to_string())),
)
.await;

let mut rx = retry_watch_outbound_policy(&client, &ns, parent.ip(), port).await;
let config = rx
.next()
.await
.expect("watch must not fail")
.expect("watch must return an initial config");
tracing::trace!(?config);

assert_resource_meta(&config.metadata, parent.obj_ref(), port);

let routes = http2_routes(&config);
let route = assert_singleton(routes);
assert_route_is_default::<gateway::HTTPRoute>(route, &parent.obj_ref(), port);
})
.await;
}

test::<k8s::Service>().await;
}
Loading

0 comments on commit 7a18c94

Please sign in to comment.