From 2d082073ab3539dc82eb81eb036aba41ee1fa921 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=9F=83=E6=8B=89?= Date: Wed, 6 Nov 2024 19:52:51 +0800 Subject: [PATCH] wip: portforward --- Cargo.toml | 3 ++- src/cluster/mod.rs | 33 ++++++++++++++++++++++++++------- src/web/router/proxy/mod.rs | 31 ++++++------------------------- 3 files changed, 34 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2aa1a04d..2b4dff6c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ description = "The CdsCTF project is an open-source, high-performance, Jeopardy- # Async async-trait = { version = "0.1" } tokio = { version = "1.41", features = ["full"] } +tokio-util = { version = "0.7.12" } futures = { version = "^0.3" } futures-util = { version = "^0.3" } tower = { version = "0.5" } @@ -90,7 +91,7 @@ fred = { version = "9.3", features = [ ] } # Containerization & Orchestration -kube = { version = "0.96", features = ["runtime", "derive"] } +kube = { version = "0.96", features = ["runtime", "derive", "rustls-tls", "ws"] } k8s-openapi = { version = "0.23", features = ["latest"] } # Miscellaneous diff --git a/src/cluster/mod.rs b/src/cluster/mod.rs index ac216af8..ab99e453 100644 --- a/src/cluster/mod.rs +++ b/src/cluster/mod.rs @@ -1,5 +1,5 @@ use std::{collections::BTreeMap, process, sync::OnceLock, time::Duration}; - +use axum::extract::ws::WebSocket; use k8s_openapi::api::core::v1::{ Container as K8sContainer, ContainerPort, EnvVar, Pod, PodSpec, Service, ServicePort, ServiceSpec, @@ -11,6 +11,7 @@ use kube::{ Client as K8sClient, Config, }; use tracing::{error, info}; +use tokio_util::codec::Framed; use crate::config; @@ -48,7 +49,7 @@ pub async fn create( injected_flag: crate::model::challenge::Flag, ) -> Result, anyhow::Error> { let client = get_k8s_client().clone(); - let pods: Api = Api::namespaced( + let api: Api = Api::namespaced( client.clone(), config::get_config().cluster.namespace.as_str(), ); @@ -93,7 +94,10 @@ pub async fn create( name: name.clone(), image: challenge.image_name.clone(), env: Some(env_vars), - ports: Some(container_ports), + ports: Some(match config::get_config().cluster.proxy.enabled { + true => vec![], + false => container_ports, + }), ..Default::default() }], ..Default::default() @@ -101,9 +105,9 @@ pub async fn create( ..Default::default() }; - pods.create(&PostParams::default(), &pod).await?; + api.create(&PostParams::default(), &pod).await?; - kube::runtime::wait::await_condition(pods.clone(), &name, conditions::is_pod_running()).await?; + kube::runtime::wait::await_condition(api.clone(), &name, conditions::is_pod_running()).await?; // let pod = pods.get(&name).await?; @@ -172,9 +176,24 @@ pub async fn create( } pub async fn delete(name: String) { - let pods: Api = Api::namespaced( + let api: Api = Api::namespaced( get_k8s_client().clone(), config::get_config().cluster.namespace.as_str(), ); - let _ = pods.delete(&name, &DeleteParams::default()).await; + let _ = api.delete(&name, &DeleteParams::default()).await; } + +pub async fn wsrx(name: String, port: u16, ws: WebSocket) -> Result<(), anyhow::Error> { + let api: Api = Api::namespaced( + get_k8s_client().clone(), + config::get_config().cluster.namespace.as_str(), + ); + let mut pf = api.portforward(&name, &[port]).await?; + let pfw = pf.take_stream(port); + if let Some(pfw) = pfw { + let stream = Framed::new(pfw, wsrx::proxy::MessageCodec::new()); + let ws: wsrx::WrappedWsStream = ws.into(); + wsrx::proxy::proxy_stream(stream, ws).await?; + } + return Ok(()); +} \ No newline at end of file diff --git a/src/web/router/proxy/mod.rs b/src/web/router/proxy/mod.rs index d5d50c36..11aee78f 100644 --- a/src/web/router/proxy/mod.rs +++ b/src/web/router/proxy/mod.rs @@ -5,7 +5,7 @@ use axum::{ }; use sea_orm::{ColumnTrait, EntityTrait, QueryFilter}; use serde::Deserialize; - +use tracing::debug; use crate::{config, database::get_db, web::traits::WebError}; pub fn router() -> Router { @@ -25,31 +25,12 @@ pub async fn link( } let ws = ws.unwrap(); - - let pod = crate::model::pod::Entity::find() - .filter(crate::model::pod::Column::Name.eq(token)) - .one(&get_db()) - .await - .unwrap(); - - if pod.is_none() { - return Err(WebError::NotFound(String::from(""))); - } - - let pod = pod.unwrap(); - - let target_nat = pod.nats.iter().find(|p| p.src == query.port.to_string()); - - if target_nat.is_none() { - return Err(WebError::NotFound(String::from(""))); - } - - let target_nat = target_nat.unwrap(); - let target_port = target_nat.dst.clone().unwrap(); - let target_url = format!("{}:{}", config::get_config().cluster.entry, target_port); + let port = query.port; return Ok(ws.on_upgrade(move |socket| async move { - let tcp = tokio::net::TcpStream::connect(target_url).await.unwrap(); - let _ = wsrx::proxy(socket.into(), tcp).await; + let result = crate::cluster::wsrx(token, port as u16, socket).await; + if let Err(e) = result { + debug!("Failed to link pods: {:?}", e); + } })); }