From ba2e896a450ebdff8183b8f63a4ba1369c695bbf Mon Sep 17 00:00:00 2001 From: zakstucke <44890343+zakstucke@users.noreply.github.com> Date: Thu, 13 Jun 2024 21:16:58 +0200 Subject: [PATCH] More random tools moved (#47) * misc with trying to generalise the redis temp list and the flexi logging trait * More randome tools moved --- .zetch.lock | 3 +- rust/Cargo.lock | 1 + rust/Cargo.toml | 9 +- rust/bitbazaar/misc/mod.rs | 2 + rust/bitbazaar/misc/periodic_updater.rs | 53 ++++++ rust/bitbazaar/threads/mod.rs | 4 + rust/bitbazaar/threads/run_cpu_intensive.rs | 177 ++++++++++++++++++++ rust/bitbazaar/utils/mod.rs | 1 - 8 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 rust/bitbazaar/misc/periodic_updater.rs create mode 100644 rust/bitbazaar/threads/run_cpu_intensive.rs delete mode 100644 rust/bitbazaar/utils/mod.rs diff --git a/.zetch.lock b/.zetch.lock index 70944b2e..016f62d1 100644 --- a/.zetch.lock +++ b/.zetch.lock @@ -18,6 +18,7 @@ "py_rust/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", "py_rust/README.zetch.md": "31c5059b153f58fb2e105a4a5dd36ac97d93c56ead9206cdea14daf665659429", "rust/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b", - "rust/README.zetch.md": "31c5059b153f58fb2e105a4a5dd36ac97d93c56ead9206cdea14daf665659429" + "rust/README.zetch.md": "31c5059b153f58fb2e105a4a5dd36ac97d93c56ead9206cdea14daf665659429", + "rust/pkg/LICENSE.zetch.md": "d2c12e539d357957b950a54a5477c3a9f87bd2b3ee707be7a4db7adaf5aacc2b" } } \ No newline at end of file diff --git a/rust/Cargo.lock b/rust/Cargo.lock index f29c3403..8bf89e8c 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -193,6 +193,7 @@ dependencies = [ "parking_lot", "portpicker", "rand", + "rayon", "redis", "redis-macros", "regex", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 111e6d95..3d25a879 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -80,13 +80,17 @@ opentelemetry-semantic-conventions = { version = "0.13.0", optional = true } # FEAT: system: sysinfo = { version = "0.30", optional = true } +# FEAT: rayon: +rayon = { version = "1", optional = true } + [target.'cfg(target_arch = "wasm32")'.dependencies] tracing-subscriber-wasm = "0.1.0" [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -tracing-appender = '0.2' # This includes threading (non-blocking stuff that can't be used in wasm) +# This includes threading (non-blocking stuff that can't be used in wasm) +tracing-appender = '0.2' hostname = "0.3.1" -tokio = { version = '1', features = ["time"] } +tokio = { version = '1', features = ["time", "sync"] } [dev-dependencies] rstest = "0.18" @@ -143,6 +147,7 @@ opentelemetry-http = [ # In general there's no point with this currently, made f 'opentelemetry-otlp/http-proto', 'opentelemetry-otlp/reqwest-client', ] +rayon = ['dep:rayon'] [profile.release] strip = "debuginfo" # Note: true or "symbols" seems to break static c linking e.g. with ffmpeg. diff --git a/rust/bitbazaar/misc/mod.rs b/rust/bitbazaar/misc/mod.rs index 84166bfb..6ccc5ac1 100644 --- a/rust/bitbazaar/misc/mod.rs +++ b/rust/bitbazaar/misc/mod.rs @@ -5,8 +5,10 @@ mod binary_search; mod flexi_logger; mod in_ci; mod is_tcp_port_listening; +mod periodic_updater; pub use binary_search::*; pub use flexi_logger::*; pub use in_ci::in_ci; pub use is_tcp_port_listening::is_tcp_port_listening; +pub use periodic_updater::*; diff --git a/rust/bitbazaar/misc/periodic_updater.rs b/rust/bitbazaar/misc/periodic_updater.rs new file mode 100644 index 00000000..17d2f8a4 --- /dev/null +++ b/rust/bitbazaar/misc/periodic_updater.rs @@ -0,0 +1,53 @@ +use std::{ + sync::atomic::AtomicU64, + time::{SystemTime, UNIX_EPOCH}, +}; + +/// A periodic updater. Run a callback at a specified time interval. Synchronous. Requires polling. +/// Useful in long running loops to do something at a specified time interval. [`PeriodicUpdater::maybe_update`] should be called during each loop. +pub struct PeriodicUpdater { + last_timestamp_ms: AtomicU64, + update_every: std::time::Duration, + on_progress: F, + _a: std::marker::PhantomData, +} + +impl PeriodicUpdater { + /// Create a new PeriodicUpdater. + /// Make sure to call [`PeriodicUpdater::maybe_update`] frequently. + /// + /// Arguments: + /// - `update_every`: The time interval at which to run the callback. + /// - `on_progress`: The callback to run. Is passed the exact time since the last call, and any other params externally configured. + pub fn new(update_every: std::time::Duration, on_progress: F) -> Self { + Self { + on_progress, + last_timestamp_ms: AtomicU64::new(0), + update_every, + _a: std::marker::PhantomData, + } + } + + /// Call this function frequently to check if the callback should be run. + pub fn maybe_update(&self, ext_params: A) { + let epoch_ms = get_epoch_ms(); + let elapsed = std::time::Duration::from_millis( + epoch_ms + - self + .last_timestamp_ms + .load(std::sync::atomic::Ordering::Relaxed), + ); + if elapsed >= self.update_every { + (self.on_progress)(elapsed, ext_params); + self.last_timestamp_ms + .store(epoch_ms, std::sync::atomic::Ordering::Relaxed); + } + } +} + +fn get_epoch_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64 +} diff --git a/rust/bitbazaar/threads/mod.rs b/rust/bitbazaar/threads/mod.rs index 25bda0b1..00c94f12 100644 --- a/rust/bitbazaar/threads/mod.rs +++ b/rust/bitbazaar/threads/mod.rs @@ -1,3 +1,7 @@ mod batch_futures; +#[cfg(feature = "rayon")] +mod run_cpu_intensive; pub use batch_futures::*; +#[cfg(feature = "rayon")] +pub use run_cpu_intensive::*; diff --git a/rust/bitbazaar/threads/run_cpu_intensive.rs b/rust/bitbazaar/threads/run_cpu_intensive.rs new file mode 100644 index 00000000..9a33b972 --- /dev/null +++ b/rust/bitbazaar/threads/run_cpu_intensive.rs @@ -0,0 +1,177 @@ +use crate::prelude::*; + +/// The following link explains the distinction between normal tokio, [`tokio::task::spawn_blocking`], and rayon. +/// For most cases you think to use [`tokio::task::spawn_blocking`], but isn't actual IO like file read/write that isn't async, and actually CPU bound stuff, rayon should be used. +/// It goes without saying, if you ever want to use anything from rayon like [`rayon::iter::ParallelIterator`], it should be inside [`run_cpu_intensive`] +/// https://ryhl.io/blog/async-what-is-blocking/#the-rayon-crate +/// +/// This also makes sure to maintain the active tracing span context across into the rayon block. +pub async fn run_cpu_intensive( + cb: impl FnOnce() -> RResult + Send + 'static, +) -> RResult { + // Rayon runs completely separately to this thread/span. + // By creating one outside, then using it inside, we can connect up the 2 worlds in regards to tracing: + // (this one is still in tokio/same thread, so will automatically connect to parent, we can then move this created span into the task to actually start there instead) + let connector_span = tracing::span!(tracing::Level::INFO, "run_cpu_intensive"); + + // The way we'll get the result back from rayon: + let (send, recv) = tokio::sync::oneshot::channel(); + + // Spawn a task on rayon. + rayon::spawn(move || { + // Run inside the original span to connect them up: + connector_span.in_scope(move || { + // Run the expensive function and send back to tokio: + let _ = send.send(cb()); + }) + }); + + // Wait for the rayon task's result: + recv.await.change_context(AnyErr)? +} + +// #[cfg(test)] +// mod tests { +// use bitbazaar::log::GlobalLog; +// use once_cell::sync::Lazy; +// use parking_lot::Mutex; + +// use crate::testing::prelude::*; + +// use super::*; + +// /// Need to confirm span contexts are maintained when transitioning into rayon from tokio: +// #[rstest] +// #[tokio::test] +// async fn test_run_cpu_intensive_spans_maintained() -> RResult<(), AnyErr> { +// // TODO this is silly to do like this Arc> would work if we change the callback signature, we can do better upstream now. +// static LOGS: Lazy>> = Lazy::new(Mutex::default); +// GlobalLog::builder() +// .custom(false, false, false, false, |log| { +// LOGS.lock() +// .push(String::from_utf8_lossy(log).trim().to_string()); +// }) +// .level_from(tracing::Level::DEBUG)? +// .build()? +// .register_global()?; + +// #[tracing::instrument(level = "INFO")] +// fn inner_span() { +// tracing::info!("Nested span inside rayon."); +// } +// #[tracing::instrument(level = "INFO")] +// async fn outer_span() -> RResult<(), AnyErr> { +// tracing::info!("Parent span outside rayon."); +// run_cpu_intensive(|| { +// tracing::info!("Inside rayon no nested span."); +// inner_span(); +// Ok(()) +// }) +// .await +// } +// outer_span().await?; + +// assert_eq!( +// LOGS.try_lock_for(std::time::Duration::from_secs(1)) +// .unwrap() +// .clone(), +// vec![ +// "INFO outer_span: Parent span outside rayon.", +// "INFO outer_span:run_cpu_intensive: Inside rayon no nested span.", +// "INFO outer_span:run_cpu_intensive:inner_span: Nested span inside rayon." +// ] +// ); +// Ok(()) +// } +// } + +// Haven't been able to get something like this to work yet. +// use std::sync::{atomic::AtomicBool, Arc}; + +// use bitbazaar::log::record_exception; +// use parking_lot::Mutex; +// pub fn run_external_multithreaded( +// max_threads: u16, +// cb: impl FnOnce(u16) -> RResult + Send + 'static, +// ) -> RResult { +// let max_threads = max_threads +// .min(rayon::current_num_threads() as u16) // Never use more threads than rayon configured for. +// .max(1); // Always make sure at least one thread is allowed. + +// if max_threads == 1 { +// // This is a no-op, just run the callback and return: +// cb(1) +// } else { +// // We need the Thread objects from each of the "locked" threads, so we can "unlock" at the end: +// let (parked_tx, parked_rx) = std::sync::mpsc::channel::(); + +// let total_locked = Arc::new(Mutex::new(1)); +// let locking_complete = Arc::new(AtomicBool::new(false)); + +// // Quoting from: https://docs.rs/rayon/latest/rayon/struct.ThreadPool.html#method.broadcast +// // Broadcasts are executed on each thread after they have exhausted their local work queue, before they attempt work-stealing from other threads. +// // The goal of that strategy is to run everywhere in a timely manner without being too disruptive to current work. +// // There may be alternative broadcast styles added in the future for more or less aggressive injection, if the need arises. +// let main_thread_id = std::thread::current().id(); +// let total_locked_clone = total_locked.clone(); +// let locking_complete_clone = locking_complete.clone(); +// rayon::spawn(move || { +// let spawn_thread_id = std::thread::current().id(); +// rayon::broadcast(|_ctx| { +// let cur_thread = std::thread::current(); +// // This one is intrinsically locked and shouldn't be parked. +// if cur_thread.id() == main_thread_id || cur_thread.id() == spawn_thread_id { +// return; +// } + +// // Don't keep locking after the locking period has finished, these threads were busy: +// if locking_complete_clone.load(std::sync::atomic::Ordering::Relaxed) { +// return; +// } + +// // Update the locked threads, or return if we've reached the max: +// // (in block to drop the mutex as fast as possible) +// { +// let mut tl = total_locked_clone.lock(); +// if *tl >= max_threads { +// // Mark locking complete given we've reached the max: +// locking_complete_clone.store(true, std::sync::atomic::Ordering::Relaxed); +// return; +// } +// *tl += 1; +// } +// // We want to "remove" this thread from rayon until the callback completes, +// // we'll do that by parking it, then unparking it at the end. +// let thread = std::thread::current(); +// match parked_tx.send(thread) { +// Ok(()) => std::thread::park(), +// Err(e) => { +// record_exception("Failed to send thread to parked_tx", format!("{:?}", e)) +// } +// } +// }); +// }); + +// // Wait up to 10ms, or until max_threads reached, for the locking period: +// let start = std::time::Instant::now(); +// while !locking_complete.load(std::sync::atomic::Ordering::Relaxed) +// && start.elapsed() < std::time::Duration::from_millis(10) +// { +// tracing::info!("WAITING FOR LOCKING TO COMPLETE..."); +// rayon::yield_local(); +// } +// locking_complete.store(true, std::sync::atomic::Ordering::Relaxed); + +// let threads_to_allow = (*total_locked.lock()).max(1); + +// // Run the callback whilst the threads are "locked". +// let result = cb(threads_to_allow); + +// // Unpark/"unlock" all parked/"locked" threads: +// while let Ok(thread) = parked_rx.try_recv() { +// thread.unpark(); +// } + +// result +// } +// } diff --git a/rust/bitbazaar/utils/mod.rs b/rust/bitbazaar/utils/mod.rs deleted file mode 100644 index 8b137891..00000000 --- a/rust/bitbazaar/utils/mod.rs +++ /dev/null @@ -1 +0,0 @@ -