From 46a1a8e325e932fcc844344e6d693c796e9d5f04 Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Mon, 25 Mar 2024 15:07:04 -0300 Subject: [PATCH] Updated tiers watcher to polling (#44) --- proxy/src/config.rs | 11 ++++++++++- proxy/src/tiers.rs | 47 +++++++++++++++------------------------------ 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 41f649a..0525022 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -1,10 +1,11 @@ -use std::{env, path::PathBuf}; +use std::{env, path::PathBuf, time::Duration}; #[derive(Debug, Clone)] pub struct Config { pub proxy_addr: String, pub proxy_namespace: String, pub proxy_tiers_path: PathBuf, + pub proxy_tiers_poll_interval: Duration, pub prometheus_addr: String, pub ssl_crt_path: String, pub ssl_key_path: String, @@ -20,6 +21,14 @@ impl Config { proxy_tiers_path: env::var("PROXY_TIERS_PATH") .map(|v| v.into()) .expect("PROXY_TIERS_PATH must be set"), + proxy_tiers_poll_interval: env::var("PROXY_TIERS_POLL_INTERVAL") + .map(|v| { + Duration::from_secs( + v.parse::() + .expect("PROXY_TIERS_POLL_INTERVAL must be a number in seconds. eg: 2"), + ) + }) + .unwrap_or(Duration::from_secs(2)), prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"), ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"), ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"), diff --git a/proxy/src/tiers.rs b/proxy/src/tiers.rs index 23d865b..3ed1fcb 100644 --- a/proxy/src/tiers.rs +++ b/proxy/src/tiers.rs @@ -2,10 +2,9 @@ use std::error::Error; use std::{fs, sync::Arc}; use async_trait::async_trait; -use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher}; +use notify::{PollWatcher, RecursiveMode, Watcher}; use pingora::{server::ShutdownWatch, services::background::BackgroundService}; use serde_json::Value; -use tokio::runtime::{Handle, Runtime}; use tracing::{error, info, warn}; use crate::{config::Config, State, Tier}; @@ -42,16 +41,6 @@ impl TierBackgroundService { } } -fn runtime_handle() -> Handle { - match Handle::try_current() { - Ok(h) => h, - Err(_) => { - let rt = Runtime::new().unwrap(); - rt.handle().clone() - } - } -} - #[async_trait] impl BackgroundService for TierBackgroundService { async fn start(&self, mut _shutdown: ShutdownWatch) { @@ -59,20 +48,13 @@ impl BackgroundService for TierBackgroundService { error!(error = err.to_string(), "error to update tiers"); return; } + let (tx, rx) = std::sync::mpsc::channel(); - let (tx, mut rx) = tokio::sync::mpsc::channel::(1); + let watcher_config = notify::Config::default() + .with_compare_contents(true) + .with_poll_interval(self.config.proxy_tiers_poll_interval); - let watcher_result = RecommendedWatcher::new( - move |result: Result| { - let event = result.unwrap(); - if event.kind.is_modify() { - runtime_handle().block_on(async { - tx.send(event).await.unwrap(); - }); - } - }, - notify::Config::default(), - ); + let watcher_result = PollWatcher::new(tx, watcher_config); if let Err(err) = watcher_result { error!(error = err.to_string(), "error to watcher tier"); return; @@ -85,14 +67,17 @@ impl BackgroundService for TierBackgroundService { return; } - loop { - if rx.recv().await.is_some() { - if let Err(err) = self.update_tiers().await { - error!(error = err.to_string(), "error to update tiers"); - continue; - } + for result in rx { + match result { + Ok(_event) => { + if let Err(err) = self.update_tiers().await { + error!(error = err.to_string(), "error to update tiers"); + continue; + } - info!("tiers modified"); + info!("tiers modified"); + } + Err(err) => error!(error = err.to_string(), "watch error"), } } }