Skip to content

Commit

Permalink
Updated tiers watcher to use tokio channel (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan authored Apr 4, 2024
1 parent 9a23528 commit 7a8107e
Showing 1 changed file with 32 additions and 13 deletions.
45 changes: 32 additions & 13 deletions proxy/src/tiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::error::Error;
use std::{fs, sync::Arc};

use async_trait::async_trait;
use notify::{PollWatcher, RecursiveMode, Watcher};
use notify::{Event, PollWatcher, RecursiveMode, Watcher};
use pingora::{server::ShutdownWatch, services::background::BackgroundService};
use serde_json::Value;
use tokio::runtime::{Handle, Runtime};
use tracing::{error, info, warn};

use crate::{config::Config, State, Tier};
Expand Down Expand Up @@ -48,13 +49,23 @@ impl BackgroundService for TierBackgroundService {
error!(error = err.to_string(), "error to update tiers");
return;
}
let (tx, rx) = std::sync::mpsc::channel();

let (tx, mut rx) = tokio::sync::mpsc::channel::<Event>(1);

let watcher_config = notify::Config::default()
.with_compare_contents(true)
.with_poll_interval(self.config.proxy_tiers_poll_interval);

let watcher_result = PollWatcher::new(tx, watcher_config);
let watcher_result = PollWatcher::new(
move |res| {
if let Ok(event) = res {
runtime_handle()
.block_on(async { tx.send(event).await })
.unwrap();
}
},
watcher_config,
);
if let Err(err) = watcher_result {
error!(error = err.to_string(), "error to watcher tier");
return;
Expand All @@ -67,18 +78,26 @@ impl BackgroundService for TierBackgroundService {
return;
}

for result in rx {
match result {
Ok(_event) => {
if let Err(err) = self.update_tiers().await {
error!(error = err.to_string(), "error to update tiers");
continue;
}

info!("tiers modified");
loop {
let result = rx.recv().await;
if result.is_some() {
if let Err(err) = self.update_tiers().await {
error!(error = err.to_string(), "error to update tiers");
continue;
}
Err(err) => error!(error = err.to_string(), "watch error"),

info!("tiers modified");
}
}
}
}

fn runtime_handle() -> Handle {
match Handle::try_current() {
Ok(h) => h,
Err(_) => {
let rt = Runtime::new().unwrap();
rt.handle().clone()
}
}
}

0 comments on commit 7a8107e

Please sign in to comment.