Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refac: simplify logger shutdown system #752

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ pub mod utils;
use std::{env, path::PathBuf};

use cli::CliArgs;
use error::{Error, Result};
use once_cell::sync::Lazy;
use utils::{QuestionAction, QuestionPolicy};

use crate::utils::logger::spawn_logger_thread;
use self::{
error::{Error, Result},
utils::{
logger::{shutdown_logger_and_wait, spawn_logger_thread},
QuestionAction, QuestionPolicy,
},
};

// Used in BufReader and BufWriter to perform less syscalls
const BUFFER_CAPACITY: usize = 1024 * 32;
Expand All @@ -27,9 +31,9 @@ static CURRENT_DIRECTORY: Lazy<PathBuf> = Lazy::new(|| env::current_dir().unwrap
pub const EXIT_FAILURE: i32 = libc::EXIT_FAILURE;

fn main() {
let handler = spawn_logger_thread();
spawn_logger_thread();
let result = run();
handler.shutdown_and_wait();
shutdown_logger_and_wait();

if let Err(err) = result {
eprintln!("{err}");
Expand Down
84 changes: 31 additions & 53 deletions src/utils/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,16 @@ pub use logger_thread::spawn_logger_thread;
use super::colors::{ORANGE, RESET, YELLOW};
use crate::accessible::is_running_in_accessible_mode;

/// Asks logger to shutdown and waits till it flushes all pending messages.
#[track_caller]
pub fn shutdown_logger_and_wait() {
logger_thread::send_shutdown_command_and_wait();
}

/// Asks logger to flush all messages, useful before starting STDIN interaction.
#[track_caller]
pub fn flush_messages() {
logger_thread::send_flush_message_and_wait();
logger_thread::send_flush_command_and_wait();
}

/// An `[INFO]` log to be displayed if we're not running accessibility mode.
Expand Down Expand Up @@ -36,7 +42,7 @@ pub fn info_accessible(contents: String) {

#[track_caller]
fn info_with_accessibility(contents: String, accessible: bool) {
logger_thread::send_log_message(PrintMessage {
logger_thread::send_print_command(PrintMessage {
contents,
accessible,
level: MessageLevel::Info,
Expand All @@ -45,7 +51,7 @@ fn info_with_accessibility(contents: String, accessible: bool) {

#[track_caller]
pub fn warning(contents: String) {
logger_thread::send_log_message(PrintMessage {
logger_thread::send_print_command(PrintMessage {
contents,
// Warnings are important and unlikely to flood, so they should be displayed
accessible: true,
Expand All @@ -57,7 +63,7 @@ pub fn warning(contents: String) {
enum LoggerCommand {
Print(PrintMessage),
Flush { finished_barrier: Arc<Barrier> },
FlushAndShutdown,
FlushAndShutdown { finished_barrier: Arc<Barrier> },
}

/// Message object used for sending logs from worker threads to a logging thread via channels.
Expand All @@ -70,7 +76,7 @@ struct PrintMessage {
}

impl PrintMessage {
fn to_processed_message(&self) -> Option<String> {
fn to_formatted_message(&self) -> Option<String> {
match self.level {
MessageLevel::Info => {
if self.accessible {
Expand Down Expand Up @@ -128,71 +134,44 @@ mod logger_thread {
}

#[track_caller]
pub(super) fn send_log_message(msg: PrintMessage) {
pub(super) fn send_print_command(msg: PrintMessage) {
get_sender()
.send(LoggerCommand::Print(msg))
.expect("Failed to send print message");
}

#[track_caller]
fn send_shutdown_message() {
get_sender()
.send(LoggerCommand::FlushAndShutdown)
.expect("Failed to send shutdown message");
.expect("Failed to send print command");
}

#[track_caller]
pub(super) fn send_flush_message_and_wait() {
pub(super) fn send_flush_command_and_wait() {
let barrier = Arc::new(Barrier::new(2));

get_sender()
.send(LoggerCommand::Flush {
finished_barrier: barrier.clone(),
})
.expect("Failed to send shutdown message");
.expect("Failed to send flush command");

barrier.wait();
}

pub struct LoggerThreadHandle {
shutdown_barrier: Arc<Barrier>,
}
#[track_caller]
pub(super) fn send_shutdown_command_and_wait() {
let barrier = Arc::new(Barrier::new(2));

impl LoggerThreadHandle {
/// Tell logger to shutdown and waits till it does.
pub fn shutdown_and_wait(self) {
// Signal the shutdown
send_shutdown_message();
// Wait for confirmation
self.shutdown_barrier.wait();
}
}
get_sender()
.send(LoggerCommand::FlushAndShutdown {
finished_barrier: barrier.clone(),
})
.expect("Failed to send shutdown command");

#[cfg(test)]
// shutdown_and_wait must be called manually, but to keep 'em clean, in
// case of tests just do it on drop
impl Drop for LoggerThreadHandle {
fn drop(&mut self) {
send_shutdown_message();
self.shutdown_barrier.wait();
}
barrier.wait();
}

pub fn spawn_logger_thread() -> LoggerThreadHandle {
pub fn spawn_logger_thread() {
let log_receiver = setup_channel();

let shutdown_barrier = Arc::new(Barrier::new(2));

let handle = LoggerThreadHandle {
shutdown_barrier: shutdown_barrier.clone(),
};

rayon::spawn(move || run_logger(log_receiver, shutdown_barrier));

handle
rayon::spawn(move || run_logger(log_receiver));
}

fn run_logger(log_receiver: LogReceiver, shutdown_barrier: Arc<Barrier>) {
fn run_logger(log_receiver: LogReceiver) {
const FLUSH_TIMEOUT: Duration = Duration::from_millis(200);

let mut buffer = Vec::<String>::with_capacity(16);
Expand All @@ -210,7 +189,7 @@ mod logger_thread {
match msg {
LoggerCommand::Print(msg) => {
// Append message to buffer
if let Some(msg) = msg.to_processed_message() {
if let Some(msg) = msg.to_formatted_message() {
buffer.push(msg);
}

Expand All @@ -222,14 +201,13 @@ mod logger_thread {
flush_logs_to_stderr(&mut buffer);
finished_barrier.wait();
}
LoggerCommand::FlushAndShutdown => {
LoggerCommand::FlushAndShutdown { finished_barrier } => {
flush_logs_to_stderr(&mut buffer);
break;
finished_barrier.wait();
return;
}
}
}

shutdown_barrier.wait();
}

fn flush_logs_to_stderr(buffer: &mut Vec<String>) {
Expand Down