From 7a8107eb41007fabc558f92a674c64b190d1a24e Mon Sep 17 00:00:00 2001 From: Paulo Bressan Date: Thu, 4 Apr 2024 17:08:57 -0300 Subject: [PATCH] Updated tiers watcher to use tokio channel (#48) --- proxy/src/tiers.rs | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/proxy/src/tiers.rs b/proxy/src/tiers.rs index 3ed1fcb..ca462ef 100644 --- a/proxy/src/tiers.rs +++ b/proxy/src/tiers.rs @@ -2,9 +2,10 @@ use std::error::Error; use std::{fs, sync::Arc}; use async_trait::async_trait; -use notify::{PollWatcher, RecursiveMode, Watcher}; +use notify::{Event, PollWatcher, RecursiveMode, Watcher}; use pingora::{server::ShutdownWatch, services::background::BackgroundService}; use serde_json::Value; +use tokio::runtime::{Handle, Runtime}; use tracing::{error, info, warn}; use crate::{config::Config, State, Tier}; @@ -48,13 +49,23 @@ impl BackgroundService for TierBackgroundService { error!(error = err.to_string(), "error to update tiers"); return; } - let (tx, rx) = std::sync::mpsc::channel(); + + let (tx, mut rx) = tokio::sync::mpsc::channel::(1); let watcher_config = notify::Config::default() .with_compare_contents(true) .with_poll_interval(self.config.proxy_tiers_poll_interval); - let watcher_result = PollWatcher::new(tx, watcher_config); + let watcher_result = PollWatcher::new( + move |res| { + if let Ok(event) = res { + runtime_handle() + .block_on(async { tx.send(event).await }) + .unwrap(); + } + }, + watcher_config, + ); if let Err(err) = watcher_result { error!(error = err.to_string(), "error to watcher tier"); return; @@ -67,18 +78,26 @@ impl BackgroundService for TierBackgroundService { return; } - for result in rx { - match result { - Ok(_event) => { - if let Err(err) = self.update_tiers().await { - error!(error = err.to_string(), "error to update tiers"); - continue; - } - - info!("tiers modified"); + loop { + let result = rx.recv().await; + if result.is_some() { + if let Err(err) = self.update_tiers().await { + error!(error = err.to_string(), "error to update tiers"); + continue; } - Err(err) => error!(error = err.to_string(), "watch error"), + + info!("tiers modified"); } } } } + +fn runtime_handle() -> Handle { + match Handle::try_current() { + Ok(h) => h, + Err(_) => { + let rt = Runtime::new().unwrap(); + rt.handle().clone() + } + } +}