Skip to content

Commit

Permalink
Updated tiers watcher to polling (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan authored Mar 25, 2024
1 parent 54edd66 commit 46a1a8e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 32 deletions.
11 changes: 10 additions & 1 deletion proxy/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{env, path::PathBuf};
use std::{env, path::PathBuf, time::Duration};

#[derive(Debug, Clone)]
pub struct Config {
pub proxy_addr: String,
pub proxy_namespace: String,
pub proxy_tiers_path: PathBuf,
pub proxy_tiers_poll_interval: Duration,
pub prometheus_addr: String,
pub ssl_crt_path: String,
pub ssl_key_path: String,
Expand All @@ -20,6 +21,14 @@ impl Config {
proxy_tiers_path: env::var("PROXY_TIERS_PATH")
.map(|v| v.into())
.expect("PROXY_TIERS_PATH must be set"),
proxy_tiers_poll_interval: env::var("PROXY_TIERS_POLL_INTERVAL")
.map(|v| {
Duration::from_secs(
v.parse::<u64>()
.expect("PROXY_TIERS_POLL_INTERVAL must be a number in seconds. eg: 2"),
)
})
.unwrap_or(Duration::from_secs(2)),
prometheus_addr: env::var("PROMETHEUS_ADDR").expect("PROMETHEUS_ADDR must be set"),
ssl_crt_path: env::var("SSL_CRT_PATH").expect("SSL_CRT_PATH must be set"),
ssl_key_path: env::var("SSL_KEY_PATH").expect("SSL_KEY_PATH must be set"),
Expand Down
47 changes: 16 additions & 31 deletions proxy/src/tiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ use std::error::Error;
use std::{fs, sync::Arc};

use async_trait::async_trait;
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{PollWatcher, RecursiveMode, Watcher};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use serde_json::Value;
use tokio::runtime::{Handle, Runtime};
use tracing::{error, info, warn};

use crate::{config::Config, State, Tier};
Expand Down Expand Up @@ -42,37 +41,20 @@ impl TierBackgroundService {
}
}

fn runtime_handle() -> Handle {
match Handle::try_current() {
Ok(h) => h,
Err(_) => {
let rt = Runtime::new().unwrap();
rt.handle().clone()
}
}
}

#[async_trait]
impl BackgroundService for TierBackgroundService {
async fn start(&self, mut _shutdown: ShutdownWatch) {
if let Err(err) = self.update_tiers().await {
error!(error = err.to_string(), "error to update tiers");
return;
}
let (tx, rx) = std::sync::mpsc::channel();

let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(1);
let watcher_config = notify::Config::default()
.with_compare_contents(true)
.with_poll_interval(self.config.proxy_tiers_poll_interval);

let watcher_result = RecommendedWatcher::new(
move |result: Result<Event, notify::Error>| {
let event = result.unwrap();
if event.kind.is_modify() {
runtime_handle().block_on(async {
tx.send(event).await.unwrap();
});
}
},
notify::Config::default(),
);
let watcher_result = PollWatcher::new(tx, watcher_config);
if let Err(err) = watcher_result {
error!(error = err.to_string(), "error to watcher tier");
return;
Expand All @@ -85,14 +67,17 @@ impl BackgroundService for TierBackgroundService {
return;
}

loop {
if rx.recv().await.is_some() {
if let Err(err) = self.update_tiers().await {
error!(error = err.to_string(), "error to update tiers");
continue;
}
for result in rx {
match result {
Ok(_event) => {
if let Err(err) = self.update_tiers().await {
error!(error = err.to_string(), "error to update tiers");
continue;
}

info!("tiers modified");
info!("tiers modified");
}
Err(err) => error!(error = err.to_string(), "watch error"),
}
}
}
Expand Down

0 comments on commit 46a1a8e

Please sign in to comment.