Skip to content

Commit

Permalink
add caching
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Dec 9, 2024
1 parent ca9bf67 commit ceb4bf6
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 16 deletions.
46 changes: 37 additions & 9 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{collections::BTreeMap, fs::File, io::Write, path::{Path, PathBuf}};
use std::{
collections::BTreeMap,
fs::{self, File},
io::Write,
path::{Path, PathBuf},
};

use anyhow::{bail, Result};
use chrono::{DateTime, Duration, Utc};
Expand Down Expand Up @@ -139,12 +144,13 @@ pub async fn get_one_exchange_user(
Ok(min_funding)
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Matching {
pub definite: BTreeMap<u32, AccountAddress>,
pub pending: BTreeMap<u32, Candidates>,
}

#[derive(Clone, Default, Debug)]
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
pub struct Candidates {
pub maybe: Vec<AccountAddress>,
pub impossible: Vec<AccountAddress>,
Expand Down Expand Up @@ -195,23 +201,24 @@ impl Matching {
top_n: u64,
start: DateTime<Utc>,
end: DateTime<Utc>,
save_file: Option<PathBuf>,
save_dir: Option<PathBuf>,
) -> Result<()> {
// expand the search
// increase the search of top users by funding by expanding the window
// this may retry a number of users, but with more users discovered
// the search space gets smaller
for d in days_in_range(start, end) {
let next_list = get_exchange_users(&pool, top_n, start, d).await?;
dbg!(&next_list.len());

for u in next_list {
let _r = self.search(&pool, u.user_id, start, end).await;

// after each loop update the file
if let Some(p) = &save_file {
self.write_definite_to_file(&p)?;
if let Some(p) = &save_dir {
let _ = self.write_definite_to_file(&p.join("definite.json"));
let _ = self.write_cache_to_file(&p);
}

}
}

Expand Down Expand Up @@ -281,21 +288,42 @@ impl Matching {
}
}

println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len());
println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len());

if pending.maybe.len() == 1 {
// we found a definite match, update it so the next loop doesn't include it
self.definite
.insert(user.user_id, *pending.maybe.first().unwrap());
}


// candidates
}

pub fn write_cache_to_file(&self, dir: &Path) -> Result<()> {
let json_string =
serde_json::to_string(&self).expect("Failed to serialize");

// Save the JSON string to a file
let path = dir.join("cache.json");
let mut file = File::create(&path)?;
file.write_all(json_string.as_bytes())?;

println!("Cache saved: {}", path.display());
Ok(())
}
pub fn read_cache_from_file(dir: &Path) -> Result<Self> {
// Read the file content into a string
let file_path = dir.join("cache.json");
let json_string = fs::read_to_string(file_path)?;

// Deserialize the JSON string into a BTreeMap
Ok(serde_json::from_str(&json_string)?)
}

pub fn write_definite_to_file(&self, path: &Path) -> Result<()> {
// Serialize the BTreeMap to a JSON string
let json_string = serde_json::to_string_pretty(&self.definite).expect("Failed to serialize");
let json_string =
serde_json::to_string_pretty(&self.definite).expect("Failed to serialize");

// Save the JSON string to a file
let mut file = File::create(path)?;
Expand Down
24 changes: 17 additions & 7 deletions tests/test_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ use anyhow::Result;
use std::path::PathBuf;

use libra_forensic_db::{
analytics::{self, enrich_account_funding::BalanceTracker, offline_matching},
analytics::{
self,
enrich_account_funding::BalanceTracker,
offline_matching::{self, Matching},
},
date_util::parse_date,
extract_exchange_orders, load_exchange_orders,
neo4j_init::{self, get_neo4j_localhost_pool, maybe_create_indexes},
Expand Down Expand Up @@ -255,13 +259,19 @@ async fn test_offline_analytics_matching() -> Result<()> {
let (uri, user, pass) = neo4j_init::get_credentials_from_env()?;
let pool = neo4j_init::get_neo4j_remote_pool(&uri, &user, &pass).await?;

let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));

let mut m = offline_matching::Matching::new();
let mut m = Matching::read_cache_from_file(&dir).unwrap_or(Matching::new());

let p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let p = p.join("definite.json");

let _ = m.wide_search(&pool, 10, parse_date("2024-01-07"), parse_date("2024-03-13"), Some(p)).await;
let _ = m
.wide_search(
&pool,
25,
parse_date("2024-01-07"),
parse_date("2024-07-22"),
Some(dir),
)
.await;
// let initial_list =
// offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09"))
// .await?;
Expand All @@ -288,7 +298,7 @@ async fn test_offline_analytics_matching() -> Result<()> {
// }

// dbg!(&m.definite);
// }
// }

Ok(())
}

0 comments on commit ceb4bf6

Please sign in to comment.