Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed global user vector #107

Merged
merged 2 commits into from
Sep 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 6 additions & 13 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,29 @@ use crate::app::cancel::cancel_action;
use crate::app::dispute::dispute_action;
use crate::app::fiat_sent::fiat_sent_action;
use crate::app::order::order_action;
use crate::app::rate_user::{send_user_rates, update_user_reputation_action};
use crate::app::rate_user::update_user_reputation_action;
use crate::app::release::release_action;
use crate::app::take_buy::take_buy_action;
use crate::app::take_sell::take_sell_action;
use crate::lightning::LndConnector;
use crate::CLEAR_USER_VEC;

use anyhow::Result;
use mostro_core::{Action, Message};
use nostr_sdk::prelude::*;
use sqlx::{Pool, Sqlite};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Mutex;

pub async fn run(
my_keys: Keys,
client: Client,
ln_client: &mut LndConnector,
pool: Pool<Sqlite>,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<()> {
loop {
let mut notifications = client.notifications();

let mut rate_list: Vec<Event> = vec![];

// Check if we can send user rates updates
if CLEAR_USER_VEC.load(Ordering::Relaxed) {
send_user_rates(&rate_list, &client).await?;
CLEAR_USER_VEC.store(false, Ordering::Relaxed);
rate_list.clear();
}

while let Ok(notification) = notifications.recv().await {
if let RelayPoolNotification::Event(_, event) = notification {
if let Kind::EncryptedDirectMessage = event.kind {
Expand Down Expand Up @@ -101,7 +94,7 @@ pub async fn run(
&my_keys,
&client,
&pool,
&mut rate_list,
rate_list.clone(),
)
.await?;
}
Expand Down
19 changes: 3 additions & 16 deletions src/app/rate_user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use mostro_core::{Action, Content, Message, Rating};
use nostr_sdk::prelude::*;
use sqlx::{Pool, Sqlite};
use sqlx_crud::Crud;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::timeout;
use uuid::Uuid;

Expand Down Expand Up @@ -140,7 +142,7 @@ pub async fn update_user_reputation_action(
my_keys: &Keys,
client: &Client,
pool: &Pool<Sqlite>,
rate_list: &mut Vec<Event>,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<()> {
let order_id = msg.order_id.unwrap();
let order = match Order::by_id(pool, order_id).await? {
Expand Down Expand Up @@ -258,18 +260,3 @@ pub async fn update_user_reputation_action(

Ok(())
}

pub async fn send_user_rates(rate_list: &[Event], client: &Client) -> Result<()> {
for ev in rate_list.iter() {
// Send event to relay
match client.send_event(ev.clone()).await {
Ok(id) => {
info!("Updated rate event with id {:?}", id)
}
Err(e) => {
info!("Error on updating rate event {:?}", e.to_string())
}
}
}
Ok(())
}
14 changes: 10 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@ use nostr_sdk::prelude::*;
use scheduler::start_scheduler;
use settings::Settings;
use settings::{init_default_dir, init_global_settings};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::{env::args, path::PathBuf, sync::OnceLock};
use tokio::sync::Mutex;

static CLEAR_USER_VEC: AtomicBool = AtomicBool::new(false);
static MOSTRO_CONFIG: OnceLock<Settings> = OnceLock::new();

#[tokio::main]
async fn main() -> Result<()> {
pretty_env_logger::init();

let rate_list: Arc<Mutex<Vec<Event>>> = Arc::new(Mutex::new(vec![]));

// File settings path
let mut config_path = PathBuf::new();

Expand Down Expand Up @@ -65,9 +67,13 @@ async fn main() -> Result<()> {
let mut ln_client = LndConnector::new().await;

// Start scheduler for tasks
start_scheduler().await.unwrap().start().await?;
start_scheduler(rate_list.clone())
.await
.unwrap()
.start()
.await?;

run(my_keys, client, &mut ln_client, pool).await
run(my_keys, client, &mut ln_client, pool, rate_list.clone()).await
}

#[cfg(test)]
Expand Down
38 changes: 31 additions & 7 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,35 @@ use crate::db::*;
use crate::lightning::LndConnector;
use crate::settings::Settings;
use crate::util::update_order_event;
use crate::CLEAR_USER_VEC;
use std::sync::atomic::Ordering;

use anyhow::Result;
use mostro_core::Status;
use nostr_sdk::Event;
use std::error::Error;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_cron_scheduler::{Job, JobScheduler};
use tracing::{info, warn, Level};
use tracing_subscriber::FmtSubscriber;

pub async fn start_scheduler() -> Result<JobScheduler, Box<dyn Error>> {
pub async fn start_scheduler(
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<JobScheduler, Box<dyn Error>> {
let subscriber = FmtSubscriber::builder()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).expect("Setting default subscriber failed");
info!("Creating scheduler");
let sched = JobScheduler::new().await?;
cron_scheduler(&sched).await?;
cron_scheduler(&sched, rate_list).await?;

Ok(sched)
}

pub async fn cron_scheduler(sched: &JobScheduler) -> Result<(), anyhow::Error> {
pub async fn cron_scheduler(
sched: &JobScheduler,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<(), anyhow::Error> {
// This job find older Pending orders and mark them Expired
let job_expire_pending_older_orders = Job::new_async("0 * * * * *", move |uuid, mut l| {
Box::pin(async move {
Expand Down Expand Up @@ -165,11 +171,29 @@ pub async fn cron_scheduler(sched: &JobScheduler) -> Result<(), anyhow::Error> {
.unwrap();

let job_update_rate_events = Job::new_async("0 0 * * * *", move |uuid, mut l| {
// Clone for closure owning with Arc
let inner_list = rate_list.clone();

Box::pin(async move {
// Connect to relays
let client = crate::util::connect_nostr().await.unwrap();

info!("I run async every hour - update rate event of users");

// Clear list after sending
CLEAR_USER_VEC.store(true, Ordering::Relaxed);
for ev in inner_list.lock().await.iter() {
// Send event to relay
match client.send_event(ev.clone()).await {
Ok(id) => {
info!("Updated rate event with id {:?}", id)
}
Err(e) => {
info!("Error on updating rate event {:?}", e.to_string())
}
}
}

// Clear list after send events
inner_list.lock().await.clear();

let next_tick = l.next_tick_for_job(uuid).await;
match next_tick {
Expand Down
6 changes: 4 additions & 2 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ use nostr_sdk::prelude::*;
use sqlx::SqlitePool;
use sqlx::{Pool, Sqlite};
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use tokio::sync::mpsc::channel;
use tokio::sync::Mutex;
use tonic_openssl_lnd::lnrpc::invoice::InvoiceState;
use uuid::Uuid;

Expand Down Expand Up @@ -188,7 +190,7 @@ pub async fn update_user_rating_event(
order_id: Uuid,
keys: &Keys,
pool: &SqlitePool,
rate_list: &mut Vec<Event>,
rate_list: Arc<Mutex<Vec<Event>>>,
) -> Result<()> {
// let reputation = reput
// nip33 kind and d tag
Expand All @@ -205,7 +207,7 @@ pub async fn update_user_rating_event(
}

// Add event message to global list
rate_list.push(event);
rate_list.lock().await.push(event);

Ok(())
}
Expand Down