Skip to content

Commit

Permalink
Merge pull request #74 from skunkteam/live-logging-websocket-interface
Browse files Browse the repository at this point in the history
live logging websocket interface
  • Loading branch information
pavadeli authored May 17, 2024
2 parents d87cc10 + 470359b commit 66aab22
Show file tree
Hide file tree
Showing 12 changed files with 757 additions and 289 deletions.
606 changes: 385 additions & 221 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ axum = "0.6.20"
clap = "4.5.4"
color-eyre = "0.6.3"
console-subscriber = "0.2.0"
ctreg = "1.0.2"
futures = "0.3.30"
hyper = "0.14"
itertools = "0.12.1"
once_cell = "1.19.0"
pin-project = "1.1.5"
prost = "0.12"
rstest = "0.18.2"
serde = "1.0.197"
serde_json = "1.0.115"
serde_with = "3.7.0"
Expand All @@ -40,17 +43,12 @@ time = "0.3.34"
tokio = "1.37.0"
tokio-stream = "0.1.15"
tonic = "0.11"
tonic-build = "0.11"
tower = "0.4.13"
tower-http = "0.4.4"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

# [workspace.dev-dependencies]
rstest = "0.18.2"

# [workspace.build-dependencies]
tonic-build = "0.11"

[workspace.lints.rust]
unsafe_code = "forbid"
explicit_outlives_requirements = "forbid"
Expand Down
4 changes: 3 additions & 1 deletion crates/emulator-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ default = []
ui = []

[dependencies]
axum = { workspace = true, features = ["macros"] }
axum = { workspace = true, features = ["ws"] }
emulator-tracing = { path = "../emulator-tracing" }
firestore-database = { path = "../firestore-database" }
serde_json = { workspace = true }
tokio = { workspace = true }
tower-http = { workspace = true, features = ["full"] }
tracing = { workspace = true }

[lints]
workspace = true
32 changes: 7 additions & 25 deletions crates/emulator-http/src/routes.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,11 @@
use std::sync::Arc;

use axum::{
extract::State,
http::StatusCode,
response::Result,
routing::{get, put},
Router,
};
#[cfg(feature = "ui")]
use axum::{http::header, response::Html};
use emulator_tracing::{SetLogLevelsError, Tracing};
use axum::{routing::get, Router};
use emulator_tracing::Tracing;
use firestore_database::FirestoreProject;

mod emulator;
mod logging;

#[cfg(feature = "ui")]
const HTML: &str = concat!(
Expand All @@ -34,11 +27,11 @@ impl RouterBuilder {
)
}

pub fn add_dynamic_tracing(self, tracing: impl Tracing + Send + Sync + 'static) -> Self {
pub fn add_dynamic_tracing(self, tracing: &'static impl Tracing) -> Self {
let Self(router) = self;
let router = router.route(
"/emulator/v1/loglevels",
put(set_log_levels).with_state(Arc::new(tracing)),
let router = router.nest(
"/emulator/v1/logging",
logging::router().with_state(tracing),
);
Self(router)
}
Expand All @@ -65,14 +58,3 @@ impl RouterBuilder {
router
}
}

async fn set_log_levels(
State(tracing): State<Arc<impl Tracing + Send + Sync>>,
body: String,
) -> Result<()> {
tracing.set_log_levels(&body).map_err(|err| match err {
SetLogLevelsError::InvalidDirectives(_) => (StatusCode::BAD_REQUEST, err.to_string()),
SetLogLevelsError::ReloadError(_) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()),
})?;
Ok(())
}
79 changes: 79 additions & 0 deletions crates/emulator-http/src/routes/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::time::Duration;

use axum::{
extract::{
ws::{Message, WebSocket},
State, WebSocketUpgrade,
},
response::Response,
routing::get,
BoxError, Router,
};
use emulator_tracing::{DynamicSubscriber, Tracing};
use tracing::{debug, error, trace};

pub(crate) fn router<S: Tracing>() -> Router<&'static S> {
Router::new().route("/", get(logging_route))
}

async fn logging_route(
State(tracing): State<&'static impl Tracing>,
ws: WebSocketUpgrade,
) -> Response {
ws.on_upgrade(move |mut socket| async move {
if let Err(err) = logging_handler(&mut socket, tracing).await {
error!(error = err, "error in logging socket");
let _unused = socket.send(Message::Text(format!("ERROR: {err}"))).await;
};
debug!("WebSockets connection closing");
})
}

async fn logging_handler(
socket: &mut WebSocket,
tracing: &'static impl Tracing,
) -> Result<(), BoxError> {
debug!("WebSockets connection established, waiting for directives");
let Some(cmd) = socket.recv().await else {
// Closed before receiving the first message
return Ok(());
};
let Message::Text(dirs) = cmd? else {
return Err("unexpected message".into());
};

debug!("Received directives: {dirs:?}");
let subscription = tracing.subscribe(&dirs)?;

let mut interval = tokio::time::interval(Duration::from_millis(100));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
_ = interval.tick() => {
let logs = subscription.consume();
if !logs.is_empty() {
trace!("Flushing logs");
socket.send(Message::Text(String::from_utf8_lossy(&logs).into_owned())).await?;
}
}
cmd = socket.recv() => {
let Some(cmd) = cmd else {
return Ok(());
};
let Message::Text(cmd) = cmd? else {
return Err("unexpected command".into());
};
if cmd != "STOP" {
return Err("unexpected command".into());
}
debug!("Received STOP command, flushing logs");
let logs = subscription.consume();
if !logs.is_empty() {
debug!("Flushing logs");
socket.send(Message::Text(String::from_utf8_lossy(&logs).into_owned())).await?;
}
return Ok(());
}
}
}
}
9 changes: 8 additions & 1 deletion crates/emulator-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,16 @@ console = ["dep:console-subscriber"]

[dependencies]
console-subscriber = { workspace = true, optional = true }
ctreg = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
tracing-subscriber = { workspace = true, features = ["local-time", "env-filter"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = [
"local-time",
"env-filter",
] }

[lints]
workspace = true
Loading

0 comments on commit 66aab22

Please sign in to comment.