Skip to content

Commit

Permalink
Refactor actor log stream api
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Oct 25, 2023
1 parent 80ba024 commit aaf731f
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 25 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.6.5"
version = "0.6.6"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/amphitheatre-app/amphitheatre"
Expand Down
70 changes: 52 additions & 18 deletions apiserver/src/handlers/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::convert::Infallible;
use std::sync::Arc;
use std::time::Duration;

use amp_common::sync::Synchronization;
use axum::extract::{Path, State};
Expand All @@ -23,13 +22,12 @@ use axum::response::sse::{Event, KeepAlive};
use axum::response::{IntoResponse, Sse};
use axum::Json;

use futures::AsyncBufReadExt;
use futures::Stream;
use tokio_stream::StreamExt;

use futures::{AsyncBufReadExt, Stream};
use k8s_openapi::api::core::v1::Pod;
use kube::api::LogParams;
use kube::Api;
use kube::api::{ListParams, LogParams};
use kube::{Api, ResourceExt};
use tokio_stream::StreamExt;
use tracing::{debug, info};
use uuid::Uuid;

use super::Result;
Expand Down Expand Up @@ -93,22 +91,58 @@ pub async fn logs(
State(ctx): State<Arc<Context>>,
Path((pid, name)): Path<(Uuid, String)>,
) -> Sse<impl Stream<Item = axum::response::Result<Event, Infallible>>> {
let api: Api<Pod> = Api::namespaced(ctx.k8s.clone(), &pid.to_string());
let params = LogParams::default();

let stream = api
.log_stream(&name, &params)
.await
.unwrap()
.lines()
info!("Get logs of actor {}/{}", pid, name);

let api: Api<Pod> = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}"));
let param = ListParams {
label_selector: Some(format!("app.kubernetes.io/managed-by=Amphitheatre, app.kubernetes.io/name={name}")),
..Default::default()
};
let pods = api.list(&param).await.unwrap();

// Create a stream that combines logs from all specified containers
let mut streams = Vec::new();

for pod in pods {
if pod.spec.is_none() {
continue;
}

let pod_name = pod.name_any();
let spec = pod.spec.unwrap();

if let Some(init_containers) = spec.init_containers {
for container in init_containers {
debug!("init container: {}", container.name);
streams.push(stream(&api, &pod_name, &container.name).await);
}
}
for container in spec.containers {
debug!("container: {}", container.name);
streams.push(stream(&api, &pod_name, &container.name).await);
}
}

let combined_stream = futures::stream::select_all(streams);
Sse::new(combined_stream).keep_alive(KeepAlive::default())
}

/// Get the log stream of a container
async fn stream(
api: &Api<Pod>,
name: &str,
container: &str,
) -> impl Stream<Item = axum::response::Result<Event, Infallible>> {
let params = LogParams { container: Some(container.into()), follow: true, timestamps: true, ..Default::default() };
let stream = api.log_stream(name, &params).await.unwrap().lines();

stream
.map(|result| match result {
Ok(line) => Event::default().data(line),
Err(err) => Event::default().event("error").data(err.to_string()),
})
.map(Ok)
.throttle(Duration::from_secs(1));

Sse::new(stream).keep_alive(KeepAlive::default())
// .throttle(Duration::from_secs(1))
}

/// Returns a actor's info, including environments, volumes...
Expand Down

0 comments on commit aaf731f

Please sign in to comment.