diff --git a/src/analytics/enrich_account_funding.rs b/src/analytics/enrich_account_funding.rs index 7a9d095..7de6aa4 100644 --- a/src/analytics/enrich_account_funding.rs +++ b/src/analytics/enrich_account_funding.rs @@ -9,7 +9,10 @@ use std::{ io::Read, }; -use crate::schema_exchange_orders::ExchangeOrder; +use crate::schema_exchange_orders::{ExchangeOrder, OrderType}; + +#[cfg(test)] +use crate::date_util::parse_date; #[cfg(test)] use crate::date_util::parse_date; @@ -46,8 +49,8 @@ impl BalanceTracker { pub fn process_transaction_alt(&mut self, order: &ExchangeOrder) { let date = order.filled_at; - match order.order_type.as_str() { - "Buy" => { + match order.order_type { + OrderType::Buy => { // user offered to buy coins (Buyer) // he sends USD // accepter sends coins. (Seller) @@ -55,16 +58,13 @@ impl BalanceTracker { self.update_balance_and_flows_alt(order.user, date, order.amount, true); self.update_balance_and_flows_alt(order.accepter, date, order.amount, false); } - "Sell" => { + OrderType::Sell => { // user offered to sell coins (Seller) // he sends Coins // accepter sends USD. (Buyer) self.update_balance_and_flows_alt(order.accepter, date, order.amount, true); self.update_balance_and_flows_alt(order.user, date, order.amount, false); } - _ => { - println!("ERROR: not a valid Buy/Sell order, {:?}", &order); - } } } fn update_balance_and_flows_alt( @@ -256,7 +256,7 @@ fn test_replay_transactions() { // user_1 sends USD, user_2 moves 10 coins. ExchangeOrder { user: 1, - order_type: "Buy".to_string(), + order_type: OrderType::Buy, amount: 10.0, price: 2.0, created_at: parse_date("2024-03-01"), @@ -266,13 +266,13 @@ fn test_replay_transactions() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, ExchangeOrder { // user 2 creates an offer to SELL, user 3 accepts. // user 3 sends USD user 2 moves amount of coins. user: 2, - order_type: "Sell".to_string(), + order_type: OrderType::Sell, amount: 5.0, price: 3.0, created_at: parse_date("2024-03-05"), @@ -282,13 +282,13 @@ fn test_replay_transactions() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, // user 3 creates an offer to BUY, user 1 accepts. // user 3 sends USD user 1 moves amount of coins. ExchangeOrder { user: 3, - order_type: "Buy".to_string(), + order_type: OrderType::Buy, amount: 15.0, price: 1.5, created_at: parse_date("2024-03-10"), @@ -298,7 +298,7 @@ fn test_replay_transactions() { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, ]; diff --git a/src/analytics/enrich_rms.rs b/src/analytics/enrich_rms.rs index f24b9b6..2f245e8 100644 --- a/src/analytics/enrich_rms.rs +++ b/src/analytics/enrich_rms.rs @@ -1,7 +1,7 @@ use chrono::Duration; use std::collections::VecDeque; -use crate::schema_exchange_orders::ExchangeOrder; +use crate::schema_exchange_orders::{CompetingOffers, ExchangeOrder, OrderType}; fn calculate_rms(data: &[f64]) -> f64 { let (sum, count) = data @@ -80,93 +80,75 @@ pub fn include_rms_stats(swaps: &mut [ExchangeOrder]) { } } -pub fn process_sell_order_shill(swaps: &mut [ExchangeOrder]) { - swaps.sort_by_key(|swap| swap.filled_at); // Sort by filled_at - - for i in 0..swaps.len() { - let current_swap = &swaps[i]; - // TODO: move this to a filter on the enclosing scope - if current_swap.shill_bid.is_some() { +fn get_competing_offers( + current_order: &ExchangeOrder, + all_offers: &[ExchangeOrder], +) -> CompetingOffers { + let mut competition = CompetingOffers { + offer_type: current_order.order_type.clone(), + ..Default::default() + }; + + for other in all_offers { + if competition.offer_type != other.order_type { continue; - }; - - // Filter for open trades - let open_orders = swaps - .iter() - .filter(|&other_swap| { - other_swap.filled_at > current_swap.filled_at - && other_swap.created_at <= current_swap.filled_at - }) - .collect::>(); + } - // Determine if the current swap took the best price - let is_shill_bid = match current_swap.order_type.as_str() { - // Signs of shill trades. - // For those offering to SELL coins, as the tx.user (offerer) - // I should offer to sell near the current clearing price. - // If I'm making shill bids, I'm creating trades above the current clearing price. An honest actor wouldn't expect those to get filled immediately. - // If an accepter is buying coins at a higher price than other orders which could be filled, then they are likely colluding to increase the price. - "Sell" => open_orders.iter().any(|other_swap| - // if there are cheaper SELL offers, - // for smaller sizes, then the rational honest actor - // will pick one of those. - // So we find the list of open orders which would be - // better than the one taken how. - // if there are ANY available, then this SELL order was - // filled dishonestly. - other_swap.price <= current_swap.price && - other_swap.amount <= current_swap.amount), - _ => false, - }; + // is the other offer created in the past, and still not filled + if other.created_at < current_order.filled_at && other.filled_at > current_order.filled_at { + competition.open_same_type += 1; + if other.amount <= current_order.amount { + competition.within_amount += 1; - // Update the swap with the best price flag - swaps[i].shill_bid = Some(is_shill_bid); + if other.price <= current_order.price { + competition.within_amount_lower_price += 1; + } + } + } } -} - -pub fn process_buy_order_shill(swaps: &mut [ExchangeOrder]) { - // NEED to sort by created_at to identify shill created BUY orders - swaps.sort_by_key(|swap| swap.created_at); - - for i in 0..swaps.len() { - let current_swap = &swaps[i]; - - // TODO: move this to a filter on the enclosing scope - if current_swap.shill_bid.is_some() { - continue; - }; - - // Filter for open trades - let open_orders = swaps - .iter() - .filter(|&other_swap| { - other_swap.filled_at > current_swap.created_at - && other_swap.created_at <= current_swap.created_at - }) - .collect::>(); - // Determine if the current swap took the best price - let is_shill_bid = match current_swap.order_type.as_str() { - // Signs of shill trades. - // For those offering to BUY coins, as the tx.user (offerer) - // An honest and rational actor would not create a buy order - // higher than other SELL offers which have not been filled. - // The shill bidder who is colluding will create a BUY order at a higher price than other SELL orders which currently exist. - "Buy" => open_orders.iter().any(|other_swap| { - if other_swap.order_type == *"Sell" { - // this is not a rational trade if there are - // SELL offers of the same amount (or smaller) - // at a price equal or lower. - return other_swap.price <= current_swap.price - && other_swap.amount <= current_swap.amount; + competition +} +pub fn process_shill(all_transactions: &mut [ExchangeOrder]) { + all_transactions.sort_by_key(|el| el.filled_at); // Sort by filled_at + + // TODO: gross, see what you make me do, borrow checker. + let temp_tx = all_transactions.to_vec(); + + for current_order in all_transactions.iter_mut() { + let comp = get_competing_offers(current_order, &temp_tx); + + // We can only evaluate if an "accepter" is engaged in shill behavior. + // the "offerer" may create unreasonable offers, but the shill trade requires someone accepting. + + match comp.offer_type { + // An accepter may be looking to dispose of coins. + // They must fill someone else's "BUY" offer. + + // Rationally would want to dispose at the highest price possible. + // so if we find that there were more HIGHER offers to buy which this accepter did not take, we must wonder why they are taking a lower price voluntarily. + // it would indicate they are shilling_down + OrderType::Buy => { + if let Some(higher_priced_orders) = comp + .within_amount + .checked_sub(comp.within_amount_lower_price) + { + if higher_priced_orders > 0 { + current_order.accepter_shill_down = true + } } - false - }), - _ => false, - }; - - // Update the swap with the best price flag - swaps[i].shill_bid = Some(is_shill_bid); + // Similarly an accepter may be looking to accumulate coins. + // They rationally will do so at the lowest price available + // We want to check if they are ignoring lower priced offers + // of the same or lower amount. + // If so it means they are pushing the price up. + } + OrderType::Sell => { + if comp.within_amount_lower_price > 0 { + current_order.accepter_shill_up = true + } + } + } } } @@ -186,12 +168,12 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 100.0, - order_type: "Buy".into(), + order_type: OrderType::Buy, rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, // less than 12 hours later next trade 5/6/2024 8AM ExchangeOrder { @@ -205,12 +187,12 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 4.0, - order_type: "Buy".into(), + order_type: OrderType::Buy, rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, // less than one hour later ExchangeOrder { @@ -224,12 +206,12 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 4.0, - order_type: "Buy".into(), + order_type: OrderType::Buy, rms_hour: 0.0, rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, // same time as previous but different traders ExchangeOrder { @@ -243,12 +225,7 @@ fn test_rms_pipeline() { .unwrap() .with_timezone(&Utc), price: 32.0, - order_type: "Sell".into(), - rms_hour: 0.0, - rms_24hour: 0.0, - price_vs_rms_hour: 0.0, - price_vs_rms_24hour: 0.0, - shill_bid: None, + ..Default::default() }, ]; @@ -267,6 +244,6 @@ fn test_rms_pipeline() { assert!(s3.rms_hour == 4.0); assert!((s3.rms_24hour > 57.0) && (s3.rms_24hour < 58.0)); - process_sell_order_shill(&mut swaps); + process_shill(&mut swaps); dbg!(&swaps); } diff --git a/src/load_exchange_orders.rs b/src/load_exchange_orders.rs index 963a4c4..e17045d 100644 --- a/src/load_exchange_orders.rs +++ b/src/load_exchange_orders.rs @@ -10,7 +10,7 @@ use crate::{ schema_exchange_orders::ExchangeOrder, }; -pub async fn swap_batch( +pub async fn exchange_txs_batch( txs: &[ExchangeOrder], pool: &Graph, batch_size: usize, @@ -93,14 +93,13 @@ pub async fn load_from_json(path: &Path, pool: &Graph, batch_size: usize) -> Res info!("completed rms statistics"); // find likely shill bids - enrich_rms::process_sell_order_shill(&mut orders); - enrich_rms::process_buy_order_shill(&mut orders); + enrich_rms::process_shill(&mut orders); info!("completed shill bid calculation"); let mut balances = BalanceTracker::new(); balances.replay_transactions(&mut orders)?; let ledger_inserts = balances.submit_ledger(pool).await?; - info!("exchange ledger relations inserted: {}", ledger_inserts); + info!("exchange UserLedger state inserted: {}", ledger_inserts); - swap_batch(&orders, pool, batch_size).await + exchange_txs_batch(&orders, pool, batch_size).await } diff --git a/src/neo4j_init.rs b/src/neo4j_init.rs index f2eb51d..31620bb 100644 --- a/src/neo4j_init.rs +++ b/src/neo4j_init.rs @@ -29,6 +29,9 @@ pub static INDEX_TX_HASH: &str = pub static INDEX_TX_FUNCTION: &str = "CREATE INDEX tx_function IF NOT EXISTS FOR ()-[r:Tx]-() ON (r.function)"; +pub static INDEX_TX_RELATION: &str = + "CREATE INDEX tx_relation IF NOT EXISTS FOR ()-[r:Tx]-() ON (r.relation)"; + pub static INDEX_SWAP_ID: &str = "CREATE INDEX swap_account_id IF NOT EXISTS FOR (n:SwapAccount) ON (n.swap_id)"; @@ -74,6 +77,7 @@ pub async fn maybe_create_indexes(graph: &Graph) -> Result<()> { INDEX_TX_TIMESTAMP, INDEX_TX_HASH, INDEX_TX_FUNCTION, + INDEX_TX_RELATION, INDEX_SWAP_ID, INDEX_EXCHANGE_LEDGER, INDEX_EXCHANGE_LINK_LEDGER, diff --git a/src/schema_exchange_orders.rs b/src/schema_exchange_orders.rs index 47868dc..8a98feb 100644 --- a/src/schema_exchange_orders.rs +++ b/src/schema_exchange_orders.rs @@ -1,14 +1,32 @@ +use std::fmt; + use anyhow::Result; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Deserializer}; +use serde::{Deserialize, Deserializer, Serialize}; + +#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq)] +pub enum OrderType { + Buy, + #[default] + Sell, +} + +impl fmt::Display for OrderType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match &self { + OrderType::Buy => write!(f, "Buy"), + OrderType::Sell => write!(f, "Sell"), + } + } +} -#[derive(Clone, Debug, Deserialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] #[allow(dead_code)] pub struct ExchangeOrder { pub user: u32, #[serde(rename = "orderType")] - pub order_type: String, + pub order_type: OrderType, #[serde(deserialize_with = "deserialize_amount")] pub amount: f64, #[serde(deserialize_with = "deserialize_amount")] @@ -25,14 +43,25 @@ pub struct ExchangeOrder { #[serde(skip_deserializing)] pub price_vs_rms_24hour: f64, #[serde(skip_deserializing)] - pub shill_bid: Option, // New field to indicate if it took the best price + pub accepter_shill_down: bool, // an accepter pushing price down + #[serde(skip_deserializing)] + pub accepter_shill_up: bool, // an accepter pushing price up + #[serde(skip_deserializing)] + pub competing_offers: Option, // New field to indicate if it took the best price +} +#[derive(Clone, Debug, Default, Deserialize, Serialize)] +pub struct CompetingOffers { + pub offer_type: OrderType, + pub open_same_type: u64, + pub within_amount: u64, + pub within_amount_lower_price: u64, } impl Default for ExchangeOrder { fn default() -> Self { Self { user: 0, - order_type: "Sell".to_string(), + order_type: OrderType::Sell, amount: 1.0, price: 1.0, created_at: DateTime::::from_timestamp_nanos(0), @@ -42,7 +71,9 @@ impl Default for ExchangeOrder { rms_24hour: 0.0, price_vs_rms_hour: 0.0, price_vs_rms_24hour: 0.0, - shill_bid: None, + accepter_shill_down: false, + accepter_shill_up: false, + competing_offers: None, } } } @@ -52,7 +83,7 @@ impl ExchangeOrder { /// Note original data was in an RFC rfc3339 with Z for UTC, Cypher seems to prefer with offsets +00000 pub fn to_cypher_object_template(&self) -> String { format!( - r#"{{user: {}, accepter: {}, order_type: "{}", amount: {}, price:{}, created_at: datetime("{}"), created_at_ts: {}, filled_at: datetime("{}"), filled_at_ts: {}, shill_bid: {}, rms_hour: {}, rms_24hour: {}, price_vs_rms_hour: {}, price_vs_rms_24hour: {} }}"#, + r#"{{user: {}, accepter: {}, order_type: "{}", amount: {}, price:{}, created_at: datetime("{}"), created_at_ts: {}, filled_at: datetime("{}"), filled_at_ts: {}, accepter_shill_down: {}, accepter_shill_up: {}, rms_hour: {}, rms_24hour: {}, price_vs_rms_hour: {}, price_vs_rms_24hour: {} }}"#, self.user, self.accepter, self.order_type, @@ -62,7 +93,8 @@ impl ExchangeOrder { self.created_at.timestamp_micros(), self.filled_at.to_rfc3339(), self.filled_at.timestamp_micros(), - self.shill_bid.unwrap_or(false), + self.accepter_shill_down, + self.accepter_shill_up, self.rms_hour, self.rms_24hour, self.price_vs_rms_hour, @@ -97,7 +129,8 @@ impl ExchangeOrder { created_at_ts: tx.created_at_ts, filled_at: tx.filled_at, filled_at_ts: tx.filled_at_ts, - shill_bid: tx.shill_bid, + accepter_shill_up: tx.accepter_shill_up, + accepter_shill_down: tx.accepter_shill_down, rms_hour: tx.rms_hour, rms_24hour: tx.rms_24hour, price_vs_rms_hour: tx.price_vs_rms_hour, @@ -147,7 +180,7 @@ fn test_deserialize_orders() { // Check that the result matches the expected values assert_eq!(orders.len(), 4); assert_eq!(orders[0].user, 1); - assert_eq!(orders[0].order_type, "Sell"); + assert_eq!(orders[0].order_type, OrderType::Sell); assert_eq!(orders[0].amount, 40000.000); assert_eq!(orders[0].accepter, 3768); } diff --git a/src/warehouse_cli.rs b/src/warehouse_cli.rs index 14cce96..0d75e7f 100644 --- a/src/warehouse_cli.rs +++ b/src/warehouse_cli.rs @@ -179,12 +179,16 @@ impl WarehouseCli { let pool = try_db_connection_pool(self).await?; neo4j_init::maybe_create_indexes(&pool).await?; - load_exchange_orders::load_from_json( + let (merged, ignored) = load_exchange_orders::load_from_json( swap_record_json, &pool, batch_size.unwrap_or(250), ) .await?; + info!( + "SUCCESS: exchange transactions merged: {}, ignored: {}", + merged, ignored + ); } Sub::EnrichExchangeOnramp { onboarding_json } => { info!("exchange onramp"); diff --git a/tests/test_analytics.rs b/tests/test_analytics.rs index 1875bd6..8076e37 100644 --- a/tests/test_analytics.rs +++ b/tests/test_analytics.rs @@ -30,7 +30,7 @@ async fn test_rms_single() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; // get just one analytics result, never more than one (but can be empty) let list = analytics::exchange_stats::query_rms_analytics_chunk(&graph, 900, 1, false).await?; @@ -60,7 +60,7 @@ async fn test_rms_single_persist() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; // get just one analytics result, never more than one (but can be empty) let list = analytics::exchange_stats::query_rms_analytics_chunk(&graph, 900, 1, true).await?; @@ -90,7 +90,7 @@ async fn test_rms_batch() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; let list = analytics::exchange_stats::query_rms_analytics_concurrent(&graph, None, None, false) .await?; diff --git a/tests/test_enrich_exchange.rs b/tests/test_enrich_exchange.rs index a6ca3d9..1ba5397 100644 --- a/tests/test_enrich_exchange.rs +++ b/tests/test_enrich_exchange.rs @@ -43,26 +43,17 @@ fn test_enrich_rms() { } #[test] -fn test_sell_order_shill() { +fn test_sell_shill_up() { let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); assert!(orders.len() == 25450); - enrich_rms::process_sell_order_shill(&mut orders); + enrich_rms::process_shill(&mut orders); - let count_shill = orders.iter().fold(0, |mut acc, el| { - if let Some(is_shill) = el.shill_bid { - if is_shill { - acc += 1 - } - } - acc - }); - - dbg!(&count_shill); + let count_shill: Vec<_> = orders.iter().filter(|el| el.accepter_shill_up).collect(); - // assert!(count_shill == 13723); + assert!(count_shill.len() == 6039); assert!(orders.len() == 25450); } @@ -79,26 +70,17 @@ fn test_enrich_account_funding() { } #[test] -fn test_enrich_buy_shill() { +fn test_enrich_shill_down() { let path = env!("CARGO_MANIFEST_DIR"); let buf = PathBuf::from(path).join("tests/fixtures/savedOlOrders2.json"); let mut orders = extract_exchange_orders::read_orders_from_file(buf).unwrap(); assert!(orders.len() == 25450); - enrich_rms::process_buy_order_shill(&mut orders); - - let count_shill = orders.iter().fold(0, |mut acc, el| { - if let Some(is_shill) = el.shill_bid { - if is_shill { - acc += 1 - } - } - acc - }); + enrich_rms::process_shill(&mut orders); - dbg!(&count_shill); + let count_shill_down: Vec<_> = orders.iter().filter(|el| el.accepter_shill_down).collect(); - // assert!(count_shill == 13723); + assert!(count_shill_down.len() == 2319); assert!(orders.len() == 25450); } @@ -159,7 +141,7 @@ async fn e2e_swap_data() -> Result<()> { assert!(orders.len() == 25450); // load 1000 orders - load_exchange_orders::swap_batch(&orders[..1000], &graph, 1000).await?; + load_exchange_orders::exchange_txs_batch(&orders[..1000], &graph, 1000).await?; // now check data was loaded let mut result = graph