Skip to content

Commit

Permalink
removed global user vector
Browse files Browse the repository at this point in the history
  • Loading branch information
arkanoider committed Sep 7, 2023
1 parent 460c082 commit 27404e4
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 38 deletions.
22 changes: 12 additions & 10 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,37 @@ use crate::app::cancel::cancel_action;
use crate::app::dispute::dispute_action;
use crate::app::fiat_sent::fiat_sent_action;
use crate::app::order::order_action;
use crate::app::rate_user::{send_user_rates, update_user_reputation_action};
use crate::app::rate_user::update_user_reputation_action;
use crate::app::release::release_action;
use crate::app::take_buy::take_buy_action;
use crate::app::take_sell::take_sell_action;
use crate::lightning::LndConnector;
use crate::CLEAR_USER_VEC;
// use crate::CLEAR_USER_VEC;
use anyhow::Result;
use mostro_core::{Action, Message};
use nostr_sdk::prelude::*;
use sqlx::{Pool, Sqlite};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Mutex;

pub async fn run(
my_keys: Keys,
client: Client,
ln_client: &mut LndConnector,
pool: Pool<Sqlite>,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<()> {
loop {
let mut notifications = client.notifications();

let mut rate_list: Vec<Event> = vec![];
// let mut rate_list: Vec<Event> = vec![];

// Check if we can send user rates updates
if CLEAR_USER_VEC.load(Ordering::Relaxed) {
send_user_rates(&rate_list, &client).await?;
CLEAR_USER_VEC.store(false, Ordering::Relaxed);
rate_list.clear();
}
// if CLEAR_USER_VEC.load(Ordering::Relaxed) {
// send_user_rates(&rate_list, &client).await?;
// CLEAR_USER_VEC.store(false, Ordering::Relaxed);
// rate_list.clear();
// }

while let Ok(notification) = notifications.recv().await {
if let RelayPoolNotification::Event(_, event) = notification {
Expand Down Expand Up @@ -101,7 +103,7 @@ pub async fn run(
&my_keys,
&client,
&pool,
&mut rate_list,
rate_list.clone(),
)
.await?;
}
Expand Down
32 changes: 17 additions & 15 deletions src/app/rate_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use mostro_core::{Action, Content, Message, Rating};
use nostr_sdk::prelude::*;
use sqlx::{Pool, Sqlite};
use sqlx_crud::Crud;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::timeout;
use uuid::Uuid;

Expand Down Expand Up @@ -140,7 +142,7 @@ pub async fn update_user_reputation_action(
my_keys: &Keys,
client: &Client,
pool: &Pool<Sqlite>,
rate_list: &mut Vec<Event>,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<()> {
let order_id = msg.order_id.unwrap();
let order = match Order::by_id(pool, order_id).await? {
Expand Down Expand Up @@ -259,17 +261,17 @@ pub async fn update_user_reputation_action(
Ok(())
}

pub async fn send_user_rates(rate_list: &[Event], client: &Client) -> Result<()> {
for ev in rate_list.iter() {
// Send event to relay
match client.send_event(ev.clone()).await {
Ok(id) => {
info!("Updated rate event with id {:?}", id)
}
Err(e) => {
info!("Error on updating rate event {:?}", e.to_string())
}
}
}
Ok(())
}
// pub async fn send_user_rates(rate_list: &[Event], client: &Client) -> Result<()> {
// for ev in rate_list.iter() {
// // Send event to relay
// match client.send_event(ev.clone()).await {
// Ok(id) => {
// info!("Updated rate event with id {:?}", id)
// }
// Err(e) => {
// info!("Error on updating rate event {:?}", e.to_string())
// }
// }
// }
// Ok(())
// }
14 changes: 10 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ use nostr_sdk::prelude::*;
use scheduler::start_scheduler;
use settings::Settings;
use settings::{init_default_dir, init_global_settings};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::{env::args, path::PathBuf, sync::OnceLock};
use tokio::sync::Mutex;

static CLEAR_USER_VEC: AtomicBool = AtomicBool::new(false);
static MOSTRO_CONFIG: OnceLock<Settings> = OnceLock::new();

#[tokio::main]
async fn main() -> Result<()> {
pretty_env_logger::init();

let rate_list: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(vec![]));

// File settings path
let mut config_path = PathBuf::new();

Expand Down Expand Up @@ -65,9 +67,13 @@ async fn main() -> Result<()> {
let mut ln_client = LndConnector::new().await;

// Start scheduler for tasks
start_scheduler().await.unwrap().start().await?;
start_scheduler(rate_list.clone())
.await
.unwrap()
.start()
.await?;

run(my_keys, client, &mut ln_client, pool).await
run(my_keys, client, &mut ln_client, pool, rate_list.clone()).await
}

#[cfg(test)]
Expand Down
38 changes: 31 additions & 7 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,35 @@ use crate::db::*;
use crate::lightning::LndConnector;
use crate::settings::Settings;
use crate::util::update_order_event;
use crate::CLEAR_USER_VEC;
use std::sync::atomic::Ordering;

use anyhow::Result;
use mostro_core::Status;
use nostr_sdk::Event;
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, warn, Level};
use tracing_subscriber::FmtSubscriber;

pub async fn start_scheduler() -> Result<JobScheduler, Box<dyn Error>> {
pub async fn start_scheduler(
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<JobScheduler, Box<dyn Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
info!("Creating scheduler");
let sched = JobScheduler::new().await?;
cron_scheduler(&sched).await?;
cron_scheduler(&sched, rate_list).await?;

Ok(sched)
}

pub async fn cron_scheduler(sched: &JobScheduler) -> Result<(), anyhow::Error> {
pub async fn cron_scheduler(
sched: &JobScheduler,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<(), anyhow::Error> {
// This job find older Pending orders and mark them Expired
let job_expire_pending_older_orders = Job::new_async("0 * * * * *", move |uuid, mut l| {
Box::pin(async move {
Expand Down Expand Up @@ -165,11 +171,29 @@ pub async fn cron_scheduler(sched: &JobScheduler) -> Result<(), anyhow::Error> {
.unwrap();

let job_update_rate_events = Job::new_async("0 0 * * * *", move |uuid, mut l| {
// Clone for closure owning with Arc
let inner_list = rate_list.clone();

Box::pin(async move {
// Connect to relays
let client = crate::util::connect_nostr().await.unwrap();

info!("I run async every hour - update rate event of users");

// Clear list after sending
CLEAR_USER_VEC.store(true, Ordering::Relaxed);
for ev in inner_list.lock().await.iter() {
// Send event to relay
match client.send_event(ev.clone()).await {
Ok(id) => {
info!("Updated rate event with id {:?}", id)
}
Err(e) => {
info!("Error on updating rate event {:?}", e.to_string())
}
}
}

// Clear list after send events
inner_list.lock().await.clear();

let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Expand Down
6 changes: 4 additions & 2 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use nostr_sdk::prelude::*;
use sqlx::SqlitePool;
use sqlx::{Pool, Sqlite};
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use tokio::sync::mpsc::channel;
use tokio::sync::Mutex;
use tonic_openssl_lnd::lnrpc::invoice::InvoiceState;
use uuid::Uuid;

Expand Down Expand Up @@ -188,7 +190,7 @@ pub async fn update_user_rating_event(
order_id: Uuid,
keys: &Keys,
pool: &SqlitePool,
rate_list: &mut Vec<Event>,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<()> {
// let reputation = reput
// nip33 kind and d tag
Expand All @@ -205,7 +207,7 @@ pub async fn update_user_rating_event(
}

// Add event message to global list
rate_list.push(event);
rate_list.lock().await.push(event);

Ok(())
}
Expand Down

0 comments on commit 27404e4

Please sign in to comment.