diff --git a/src/app.rs b/src/app.rs index 219b2c8..6215541 100644 --- a/src/app.rs +++ b/src/app.rs @@ -17,35 +17,37 @@ use crate::app::cancel::cancel_action; use crate::app::dispute::dispute_action; use crate::app::fiat_sent::fiat_sent_action; use crate::app::order::order_action; -use crate::app::rate_user::{send_user_rates, update_user_reputation_action}; +use crate::app::rate_user::update_user_reputation_action; use crate::app::release::release_action; use crate::app::take_buy::take_buy_action; use crate::app::take_sell::take_sell_action; use crate::lightning::LndConnector; -use crate::CLEAR_USER_VEC; +// use crate::CLEAR_USER_VEC; use anyhow::Result; use mostro_core::{Action, Message}; use nostr_sdk::prelude::*; use sqlx::{Pool, Sqlite}; -use std::sync::atomic::Ordering; +use std::sync::Arc; +use tokio::sync::Mutex; pub async fn run( my_keys: Keys, client: Client, ln_client: &mut LndConnector, pool: Pool, + rate_list: Arc>>, ) -> Result<()> { loop { let mut notifications = client.notifications(); - let mut rate_list: Vec = vec![]; + // let mut rate_list: Vec = vec![]; // Check if we can send user rates updates - if CLEAR_USER_VEC.load(Ordering::Relaxed) { - send_user_rates(&rate_list, &client).await?; - CLEAR_USER_VEC.store(false, Ordering::Relaxed); - rate_list.clear(); - } + // if CLEAR_USER_VEC.load(Ordering::Relaxed) { + // send_user_rates(&rate_list, &client).await?; + // CLEAR_USER_VEC.store(false, Ordering::Relaxed); + // rate_list.clear(); + // } while let Ok(notification) = notifications.recv().await { if let RelayPoolNotification::Event(_, event) = notification { @@ -101,7 +103,7 @@ pub async fn run( &my_keys, &client, &pool, - &mut rate_list, + rate_list.clone(), ) .await?; } diff --git a/src/app/rate_user.rs b/src/app/rate_user.rs index accd195..d0d115d 100644 --- a/src/app/rate_user.rs +++ b/src/app/rate_user.rs @@ -7,7 +7,9 @@ use mostro_core::{Action, Content, Message, Rating}; use nostr_sdk::prelude::*; use sqlx::{Pool, Sqlite}; use sqlx_crud::Crud; +use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; use tokio::time::timeout; use uuid::Uuid; @@ -140,7 +142,7 @@ pub async fn update_user_reputation_action( my_keys: &Keys, client: &Client, pool: &Pool, - rate_list: &mut Vec, + rate_list: Arc>>, ) -> Result<()> { let order_id = msg.order_id.unwrap(); let order = match Order::by_id(pool, order_id).await? { @@ -259,17 +261,17 @@ pub async fn update_user_reputation_action( Ok(()) } -pub async fn send_user_rates(rate_list: &[Event], client: &Client) -> Result<()> { - for ev in rate_list.iter() { - // Send event to relay - match client.send_event(ev.clone()).await { - Ok(id) => { - info!("Updated rate event with id {:?}", id) - } - Err(e) => { - info!("Error on updating rate event {:?}", e.to_string()) - } - } - } - Ok(()) -} +// pub async fn send_user_rates(rate_list: &[Event], client: &Client) -> Result<()> { +// for ev in rate_list.iter() { +// // Send event to relay +// match client.send_event(ev.clone()).await { +// Ok(id) => { +// info!("Updated rate event with id {:?}", id) +// } +// Err(e) => { +// info!("Error on updating rate event {:?}", e.to_string()) +// } +// } +// } +// Ok(()) +// } diff --git a/src/main.rs b/src/main.rs index 00d8979..597149c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,16 +16,18 @@ use nostr_sdk::prelude::*; use scheduler::start_scheduler; use settings::Settings; use settings::{init_default_dir, init_global_settings}; -use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::{env::args, path::PathBuf, sync::OnceLock}; +use tokio::sync::Mutex; -static CLEAR_USER_VEC: AtomicBool = AtomicBool::new(false); static MOSTRO_CONFIG: OnceLock = OnceLock::new(); #[tokio::main] async fn main() -> Result<()> { pretty_env_logger::init(); + let rate_list: Arc>> = Arc::new(Mutex::new(vec![])); + // File settings path let mut config_path = PathBuf::new(); @@ -65,9 +67,13 @@ async fn main() -> Result<()> { let mut ln_client = LndConnector::new().await; // Start scheduler for tasks - start_scheduler().await.unwrap().start().await?; + start_scheduler(rate_list.clone()) + .await + .unwrap() + .start() + .await?; - run(my_keys, client, &mut ln_client, pool).await + run(my_keys, client, &mut ln_client, pool, rate_list.clone()).await } #[cfg(test)] diff --git a/src/scheduler.rs b/src/scheduler.rs index 257a776..3da360e 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,29 +2,35 @@ use crate::db::*; use crate::lightning::LndConnector; use crate::settings::Settings; use crate::util::update_order_event; -use crate::CLEAR_USER_VEC; -use std::sync::atomic::Ordering; use anyhow::Result; use mostro_core::Status; +use nostr_sdk::Event; use std::error::Error; +use std::sync::Arc; +use tokio::sync::Mutex; use tokio_cron_scheduler::{Job, JobScheduler}; use tracing::{info, warn, Level}; use tracing_subscriber::FmtSubscriber; -pub async fn start_scheduler() -> Result> { +pub async fn start_scheduler( + rate_list: Arc>>, +) -> Result> { let subscriber = FmtSubscriber::builder() .with_max_level(Level::INFO) .finish(); tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed"); info!("Creating scheduler"); let sched = JobScheduler::new().await?; - cron_scheduler(&sched).await?; + cron_scheduler(&sched, rate_list).await?; Ok(sched) } -pub async fn cron_scheduler(sched: &JobScheduler) -> Result<(), anyhow::Error> { +pub async fn cron_scheduler( + sched: &JobScheduler, + rate_list: Arc>>, +) -> Result<(), anyhow::Error> { // This job find older Pending orders and mark them Expired let job_expire_pending_older_orders = Job::new_async("0 * * * * *", move |uuid, mut l| { Box::pin(async move { @@ -165,11 +171,29 @@ pub async fn cron_scheduler(sched: &JobScheduler) -> Result<(), anyhow::Error> { .unwrap(); let job_update_rate_events = Job::new_async("0 0 * * * *", move |uuid, mut l| { + // Clone for closure owning with Arc + let inner_list = rate_list.clone(); + Box::pin(async move { + // Connect to relays + let client = crate::util::connect_nostr().await.unwrap(); + info!("I run async every hour - update rate event of users"); - // Clear list after sending - CLEAR_USER_VEC.store(true, Ordering::Relaxed); + for ev in inner_list.lock().await.iter() { + // Send event to relay + match client.send_event(ev.clone()).await { + Ok(id) => { + info!("Updated rate event with id {:?}", id) + } + Err(e) => { + info!("Error on updating rate event {:?}", e.to_string()) + } + } + } + + // Clear list after send events + inner_list.lock().await.clear(); let next_tick = l.next_tick_for_job(uuid).await; match next_tick { diff --git a/src/util.rs b/src/util.rs index 096db5e..f138e6d 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,8 +14,10 @@ use nostr_sdk::prelude::*; use sqlx::SqlitePool; use sqlx::{Pool, Sqlite}; use std::str::FromStr; +use std::sync::Arc; use std::thread; use tokio::sync::mpsc::channel; +use tokio::sync::Mutex; use tonic_openssl_lnd::lnrpc::invoice::InvoiceState; use uuid::Uuid; @@ -188,7 +190,7 @@ pub async fn update_user_rating_event( order_id: Uuid, keys: &Keys, pool: &SqlitePool, - rate_list: &mut Vec, + rate_list: Arc>>, ) -> Result<()> { // let reputation = reput // nip33 kind and d tag @@ -205,7 +207,7 @@ pub async fn update_user_rating_event( } // Add event message to global list - rate_list.push(event); + rate_list.lock().await.push(event); Ok(()) }