Skip to content

Commit

Permalink
refactor: new logger for get actor logs
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo committed Feb 5, 2024
1 parent 3c91bfe commit 39cd38d
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 97 deletions.
17 changes: 9 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.8.11"
version = "0.8.12"
edition = "2021"
license = "Apache-2.0"
repository = "https://github.com/amphitheatre-app/amphitheatre"
Expand Down Expand Up @@ -36,6 +36,7 @@ dotenv = "0.15.0"
futures = "0.3"
kube = { version = "0.88.0", default-features = false, features = ["runtime", "derive", "rustls-tls"] }
k8s-openapi = { version = "0.21.0", default-features = false, features = ["schemars", "v1_28"] }
lazy_static = "1.4.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_yaml = "0.9.31"
Expand Down
95 changes: 7 additions & 88 deletions apiserver/src/handlers/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::convert::Infallible;
use std::sync::Arc;

Expand All @@ -23,22 +22,17 @@ use axum::response::sse::{Event, KeepAlive};
use axum::response::{IntoResponse, Sse};
use axum::Json;

use futures::{AsyncBufReadExt, Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::{ContainerStatus, Pod};
use kube::api::LogParams;
use kube::Api;
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use futures::{Stream, StreamExt};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, info};
use uuid::Uuid;

use kube::runtime::{watcher, WatchStreamExt};
use tracing::info;
use uuid::Uuid;

use super::Result;
use crate::context::Context;
use crate::errors::ApiError;
use crate::services::actor::ActorService;
use crate::services::logger::Logger;

// The Actors Service Handlers.
// See [API Documentation: actor](https://docs.amphitheatre.app/api/actor)
Expand Down Expand Up @@ -99,92 +93,17 @@ pub async fn logs(
info!("Start to tail the log stream of actor {} in {}...", name, pid);
let (sender, receiver) = tokio::sync::mpsc::channel(100);

// Watch the status of the pod, if the pod is running, then create a stream for it.
// Start to watch the status of the pod.
tokio::spawn(async move {
let api: Api<Pod> = Api::namespaced(ctx.k8s.clone(), &format!("amp-{pid}"));
let config = watcher::Config::default().labels(&format!("amphitheatre.app/character={name}"));
let mut watcher = watcher(api.clone(), config).applied_objects().boxed();
let subs = Arc::new(RwLock::new(HashSet::new()));

while let Some(pod) = watcher.try_next().await.unwrap() {
if pod.status.is_none() {
continue;
}

let status = pod.status.unwrap();
let pod_name = pod.metadata.name.unwrap();

// check the init container status, if it's not running, then skip it.
if let Some(init_containers) = status.init_container_statuses {
for status in init_containers {
log(&api, &pod_name, &status, &sender, subs.clone()).await;
}
}

// check the container status, if it's not running, then skip it.
if let Some(containers) = status.container_statuses {
for status in containers {
log(&api, &pod_name, &status, &sender, subs.clone()).await;
}
}
}
Logger::new(ctx.k8s.clone(), sender.clone(), pid, name).start().await;
});

let stream = ReceiverStream::new(receiver);
let stream = stream.map(|line| Event::default().data(line)).map(Ok);
let stream = stream.map(Ok);

Sse::new(stream).keep_alive(KeepAlive::default())
}

async fn log(
api: &Api<Pod>,
pod: &str,
status: &ContainerStatus,
sender: &Sender<String>,
subs: Arc<RwLock<HashSet<String>>>,
) {
let pod = pod.to_string();
let name = status.name.clone();
let subscription_id: String = format!("{pod}-{name}", pod = pod, name = name);

debug!("container status: {:?}", status);

// If the container is not running, skip it.
if let Some(state) = &status.state {
if state.running.is_none() {
debug!("Skip log stream of container {} because it's not running.", name);
return;
}
}
// If job handle already exists in subscribe list, skip it.
if subs.read().await.contains(&subscription_id) {
debug!("Skip log stream of container {} because it's already subscribed.", name);
return;
}

let api = api.clone();
let sender = sender.clone();

tokio::spawn(async move {
let params = LogParams {
container: Some(name.clone()),
follow: true,
tail_lines: Some(100),
timestamps: true,
..Default::default()
};
let mut stream = api.log_stream(&pod, &params).await.map_err(ApiError::KubernetesError).unwrap().lines();

info!("Start to receive the log stream of container {} in {}...", name, pod);
while let Some(line) = stream.try_next().await.unwrap() {
let _ = sender.send(line).await;
}
});

// save the job handle to subscribe list.
subs.write().await.insert(subscription_id);
}

/// Returns a actor's info, including environments, volumes...
#[utoipa::path(
get, path = "/v1/actors/{pid}/{name}/info",
Expand Down
165 changes: 165 additions & 0 deletions apiserver/src/services/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright (c) The Amphitheatre Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use axum::response::sse::Event;
use futures::AsyncBufReadExt;
use futures::StreamExt;
use futures::TryStreamExt;
use k8s_openapi::api::core::v1::ContainerStatus;
use k8s_openapi::api::core::v1::Pod;
use kube::api::LogParams;
use kube::runtime::watcher::Config;
use kube::runtime::{watcher, WatchStreamExt};
use kube::Api;
use kube::ResourceExt;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

pub struct Logger {
api: Api<Pod>, // The Kubernetes API client.
sender: Sender<Event>, // The sender of the log stream.
config: Config, // The configuration of watcher.
watches: HashMap<String, JoinHandle<()>>, // The map of watching containers.
}

impl Logger {
/// Creates a new logger.
pub fn new(client: kube::Client, sender: Sender<Event>, playbook: Uuid, actor: String) -> Self {
let api: Api<Pod> = Api::namespaced(client, &format!("amp-{playbook}"));
let label_selector = format!("amphitheatre.app/character={actor}");
let config = Config::default().labels(&label_selector);

Self { api, sender, config, watches: HashMap::new() }
}

/// Starts the logger.
pub async fn start(&mut self) {
let watcher = watcher(self.api.clone(), self.config.clone());
let mut watcher = watcher.touched_objects().boxed();

while let Some(pod) = watcher.try_next().await.unwrap() {
let pod_name = pod.name_any();
if let Some(status) = pod.status {
// Unsubscribe all the watches of the pod if pod is terminating.
// and then break the loop to exit the function.
if status.phase == Some("Terminating".into()) {
self.unsubscribe_all(&pod_name);
return;
}

self.watches(&pod_name, status.init_container_statuses).await;
self.watches(&pod_name, status.container_statuses).await;
}
}
}

/// Watches the containers of the pod.
async fn watches(&mut self, pod: &str, containers: Option<Vec<ContainerStatus>>) {
if containers.is_none() {
warn!("No container statuses found in pod {}.", pod);
return;
}

// Iterate the containers of the pod.
for container in containers.unwrap() {
if container.state.is_none() {
warn!("No state found in container {} of {}.", container.name, pod);
continue;
}
let state = container.state.unwrap();

// If the container is running, then subscribe the log stream.
if state.running.is_some_and(|s| s.started_at.is_some()) {
let key = &format!("{}-{}", pod, container.name);
if self.watches.contains_key(key) {
debug!("Skip container {} of {} because it's watching.", &container.name, pod);
continue;
}
self.subscribe(pod, &container.name).await;
}

// If the container is terminated, then unsubscribe the log stream.
if state.terminated.is_some_and(|s| s.finished_at.is_some()) {
debug!("Container {} in {} has been terminated.", container.name, pod);
self.unsubscribe(pod, &container.name);
}
}
}

/// Subscribes the log stream of the container.
async fn subscribe(&mut self, pod: &str, container: &str) {
let key = format!("{}-{}", pod, container);

let api = self.api.clone();
let sender = self.sender.clone();
let container = container.to_string();
let pod = pod.to_string();

let task = tokio::spawn(async move {
Self::tail(api, sender, pod, container).await;
});

self.watches.insert(key, task);
}

/// Tails the log stream of the container.
async fn tail(api: Api<Pod>, sender: Sender<Event>, pod: String, container: String) {
let params = LogParams {
container: Some(container.to_string()),
follow: true,
tail_lines: Some(100),
timestamps: true,
..Default::default()
};

match api.log_stream(&pod, &params).await {
Ok(stream) => {
info!("Start to receive the log stream of container {} in {}...", container, pod);
let mut lines = stream.lines();
while let Ok(Some(line)) = lines.try_next().await {
_ = sender.send(Event::default().data(line)).await;
}
}
Err(err) => {
let message =
format!("Some error occurred while log stream for container {} in {}: {}.", container, pod, err);
error!("{}", message);
_ = sender.send(Event::default().data(message)).await;
}
}
}

/// Unsubscribes all the log streams.
fn unsubscribe_all(&mut self, pod: &str) {
for (key, task) in self.watches.drain() {
if key.starts_with(pod) {
info!("Unsubscribe the log stream of container {} in {}.", key, pod);
task.abort();
}
}
}

/// Unsubscribes the log stream of the container.
fn unsubscribe(&mut self, pod: &str, container: &str) {
let key = &format!("{}-{}", pod, container);
if let Some(task) = self.watches.remove(key) {
info!("Unsubscribe the log stream of container {} in {}.", container, pod);
task.abort();
}
}
}
1 change: 1 addition & 0 deletions apiserver/src/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

pub mod actor;
pub mod logger;
pub mod playbook;

pub type Result<T, E = crate::errors::ApiError> = std::result::Result<T, E>;

0 comments on commit 39cd38d

Please sign in to comment.