Skip to content

Commit

Permalink
try alternate calc
Browse files Browse the repository at this point in the history
  • Loading branch information
0o-de-lally committed Dec 10, 2024
1 parent b974a58 commit 9d1717a
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 65 deletions.
86 changes: 48 additions & 38 deletions src/analytics/offline_matching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
use anyhow::{bail, Result};
use chrono::{DateTime, Duration, Utc};
use diem_types::account_address::AccountAddress;
use log::{info, trace};
use neo4rs::Graph;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -195,29 +196,56 @@ impl Matching {
Ok((*ids.first().unwrap(), *ids.get(1).unwrap()))
}

pub async fn wide_search(
/// progressively scan for top_n funded exchange accounts
/// e.g. start with 5, and each time increase by 1, until reaching 50 for.
/// at each level deep, a breadth search is started
/// Thus every day in timeseries will do a shallow match of the top 5 accounts, and eliminate candidates. Deeper searches benefit from the information from the previous depth searches (exclude impossible matches)
pub async fn depth_search_by_top_n_accounts(
&mut self,
pool: &Graph,
top_n: u64,
start: DateTime<Utc>,
end: DateTime<Utc>,
save_dir: Option<PathBuf>,
) -> Result<()> {
let mut top_n = 10;
while top_n < 50 {
let _ = self
.breadth_search_by_dates(pool, top_n, start, end, &save_dir)
.await; // don't error
top_n += 5;
}
Ok(())
}

/// breadth search, for every day in timeseries, check all the top funded
/// accounts against the actual deposited on-chain
/// don't peek into the future, only replay information at each day
pub async fn breadth_search_by_dates(
&mut self,
pool: &Graph,
top_n: u64,
start: DateTime<Utc>,
end: DateTime<Utc>,
save_dir: &Option<PathBuf>,
) -> Result<()> {
// expand the search
// increase the search of top users by funding by expanding the window
// this may retry a number of users, but with more users discovered
// the search space gets smaller
for d in days_in_range(start, end) {
let next_list = get_exchange_users(&pool, top_n, start, d).await?;
dbg!(&next_list.len());
info!("day: {}", d);
let next_list = get_exchange_users(pool, top_n, start, d).await?;

// TODO: pick top of deposits
let deposits = get_date_range_deposits(pool, 100, start, d).await?;

for u in next_list {
let _r = self.search(&pool, u.user_id, start, end).await;
let _r = self.search(&u, &deposits).await;

// after each loop update the file
if let Some(p) = &save_dir {
let _ = self.write_definite_to_file(&p.join("definite.json"));
let _ = self.write_cache_to_file(&p);
let _ = self.write_cache_to_file(p);
}
}
}
Expand All @@ -226,30 +254,17 @@ impl Matching {
}
pub async fn search(
&mut self,
pool: &Graph,
user_a: u32,
start: DateTime<Utc>,
end: DateTime<Utc>,
user: &MinFunding,
deposits: &[Deposit],
) -> Result<AccountAddress> {
// exit early
if let Some(a) = self.definite.get(&user_a) {
if let Some(a) = self.definite.get(&user.user_id) {
return Ok(*a);
}

// loop each day, comparing deposits made to that point
// and funding required for user accounts only to that date
for d in days_in_range(start, end) {
let deposits = get_date_range_deposits(pool, 100, start, d).await?;

if let Some(funded_a) = get_one_exchange_user(pool, user_a, start, d).await?.first() {
self.eliminate_candidates(funded_a, &deposits);
self.eliminate_candidates(user, deposits);

if let Some(a) = self.definite.get(&user_a) {
return Ok(*a);
}
}
}
if let Some(a) = self.definite.get(&user_a) {
if let Some(a) = self.definite.get(&user.user_id) {
return Ok(*a);
}

Expand All @@ -258,10 +273,7 @@ impl Matching {

pub fn eliminate_candidates(&mut self, user: &MinFunding, deposits: &[Deposit]) {
// let mut filtered_depositors = deposits.clone();
let pending = self
.pending
.entry(user.user_id)
.or_insert(Candidates::default());
let pending = self.pending.entry(user.user_id).or_default();

let mut eval: Vec<AccountAddress> = vec![];
deposits.iter().for_each(|el| {
Expand All @@ -271,13 +283,11 @@ impl Matching {
// is also not already discovered
!self.definite.values().any(|found| found == &el.account)
{
if !eval.contains(&el.account) {
eval.push(el.account)
}
} else {
if !pending.impossible.contains(&value) {
pending.impossible.push(el.account)
if !eval.contains(&el.account) {
eval.push(el.account)
}
} else if !pending.impossible.contains(&el.account) {
pending.impossible.push(el.account)
}
});

Expand All @@ -287,12 +297,12 @@ impl Matching {
} else {
// we only keep addresses we see repeatedly (inner join)
eval.retain(|x| pending.maybe.contains(x));
if eval.len() > 0 {
if !eval.is_empty() {
pending.maybe = eval;
}
}

println!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len());
info!("user: {}, maybe: {}", &user.user_id, &pending.maybe.len());

if pending.maybe.len() == 1 {
// we found a definite match, update it so the next loop doesn't include it
Expand All @@ -311,7 +321,7 @@ impl Matching {
let mut file = File::create(&path)?;
file.write_all(json_string.as_bytes())?;

println!("Cache saved: {}", path.display());
trace!("Cache saved: {}", path.display());
Ok(())
}
pub fn read_cache_from_file(dir: &Path) -> Result<Self> {
Expand All @@ -332,7 +342,7 @@ impl Matching {
let mut file = File::create(path)?;
file.write_all(json_string.as_bytes())?;

println!("Data saved to path: {}", path.display());
trace!("Data saved to path: {}", path.display());
Ok(())
}
}
Expand Down
29 changes: 2 additions & 27 deletions tests/test_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,44 +261,19 @@ async fn test_offline_analytics_matching() -> Result<()> {

let dir: PathBuf = PathBuf::from(env!("CARGO_MANIFEST_DIR"));

let mut m = Matching::read_cache_from_file(&dir).unwrap_or(Matching::new());
let mut m = Matching::read_cache_from_file(&dir).unwrap_or_default();

let _ = m
.wide_search(
.depth_search_by_top_n_accounts(
&pool,
25,
parse_date("2024-01-07"),
parse_date("2024-07-22"),
Some(dir),
)
.await;
// let initial_list =
// offline_matching::get_exchange_users(&pool, 10, start_time, parse_date("2024-01-09"))
// .await?;

// for u in initial_list {
// let r = m
// .search(&pool, u.user_id, start_time, parse_date("2024-03-13"))
// .await;
// // dbg!(&r);
// }

dbg!(&m.definite);

// // expand the search
// for d in offline_matching::days_in_range(start_time, parse_date("2024-03-13")) {
// let next_list = offline_matching::get_exchange_users(&pool, 10, start_time, d)
// .await?;

// for u in next_list {
// let r = m
// .search(&pool, u.user_id, start_time, parse_date("2024-03-13"))
// .await;
// dbg!(&r);
// }

// dbg!(&m.definite);
// }

Ok(())
}

0 comments on commit 9d1717a

Please sign in to comment.